Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.4.0
-----
* Add safety check to Live Migration data copy task endpoint (CASSSIDECAR-409)
* Define common operational job tracking interface and refactor current operational job tracker (CASSSIDECAR-372)

0.3.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class LiveMigrationDataCopyRequest
public final int maxConcurrency;

/**
* Creates a new request with auto-generated ID.
* Creates a new live migration data copy request.
*/
@JsonCreator
public LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int maxIterations,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ void repairReplicasTimeout()
private void pollStatusForState(String uuid)
{
String status = "/api/v1/cassandra/operational-jobs/" + uuid;
loopAssert(30, 500, () -> {
loopAssert(60, 500, () -> {
HttpResponse<Buffer> resp = getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", status)
.send());
logger.info("Success Status Response code: {}", resp.statusCode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected void handleInternal(RoutingContext context,
.onFailure(throwable -> {
if (throwable instanceof LiveMigrationInvalidRequestException)
{
LOGGER.error("Input payload is not valid.", throwable);
LOGGER.error("Invalid live migration request.", throwable);
context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, throwable.getMessage(), throwable));
}
else if (throwable instanceof LiveMigrationDataCopyInProgressException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.vertx.core.Future;
import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
import org.apache.cassandra.sidecar.config.SidecarConfiguration;
import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException;
import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
import org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
Expand Down Expand Up @@ -101,39 +103,76 @@ Future<LiveMigrationTask> createDataCopyTask(LiveMigrationDataCopyRequest reques
String source,
InstanceMetadata localInstanceMetadata)
{
LiveMigrationTask newTask = createTask(request,
source,
sidecarConfiguration.serviceConfiguration().port(),
localInstanceMetadata);

// It is possible to serve only one live migration data copy request per instance at a time.
// Checking if there is another migration is in progress before accepting new one.
boolean accepted = currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
if (taskInMap == null)
{
return newTask;
}
// Fast local JMX check before creating task - prevents task creation if Cassandra is running
return verifyCassandraNotRunning(localInstanceMetadata)
.compose(v -> {
LiveMigrationTask newTask = createTask(request,
source,
sidecarConfiguration.serviceConfiguration().port(),
localInstanceMetadata);

// It is possible to serve only one live migration data copy request per instance at a time.
// Checking if there is another migration is in progress before accepting new one.
boolean accepted = newTask == currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
if (taskInMap == null)
{
return newTask;
}

// Accept new task if and only if the existing task has completed.
return taskInMap.isCompleted() ? newTask : taskInMap;
});

if (!accepted)
{
return Future.failedFuture(
new LiveMigrationDataCopyInProgressException("Another task is already under progress. Cannot accept new task."));
Comment on lines +128 to +129
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is identical if you just throw the exception.

Suggested change
return Future.failedFuture(
new LiveMigrationDataCopyInProgressException("Another task is already under progress. Cannot accept new task."));
throw new LiveMigrationDataCopyInProgressException("Another task is already under progress. Cannot accept new task.");

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to return the future in both positive and negative scenarios because it seems simpler than throwing an exception, catching it elsewhere, and converting it to a failed future. I can change it if it makes a difference.

}
LOGGER.info("Starting data copy task with id={}, source={}, destination={}",
newTask.id(), source, localInstanceMetadata.host());
newTask.start();
return Future.succeededFuture(newTask);
});
}

if (!taskInMap.isCompleted())
{
// Accept new task if and only if the existing task has completed.
return taskInMap;
}
else
/**
* Initiating data copy once a Cassandra instance starts is not acceptable. This method checks whether
* Cassandra is running or not at the moment on the destination instance by checking if Sidecar
* is able to connect to the Cassandra instance's JMX port or native (CQL) port. It returns a failed
* future if Sidecar is able to connect to either port of Cassandra.
*
* @param localInstance metadata for the local Cassandra instance
* @return Future that succeeds if Cassandra is not running, fails if it is running
*/
private Future<Void> verifyCassandraNotRunning(InstanceMetadata localInstance)
{
try
{
CassandraAdapterDelegate delegate = localInstance.delegate();

if (delegate.isJmxUp() || delegate.isNativeUp())
{
return newTask;
return Future.failedFuture(new LiveMigrationInvalidRequestException(
"Cannot start data copy: Cassandra is currently running on this instance " +
"(JMX or native connectivity established). Data copy cannot proceed while Cassandra is active."));
}
}) == newTask;

if (!accepted)
// JMX and native are down - Cassandra is not running (or at least wasn't during last health check)
LOGGER.debug("Local JMX and native check passed: Cassandra not detected as running on {}", localInstance.host());
return Future.succeededFuture();
}
catch (CassandraUnavailableException e)
{
return Future.failedFuture(
new LiveMigrationDataCopyInProgressException("Another task is already under progress. Cannot accept new task."));
// No delegate available - Cassandra is not running
LOGGER.debug("No Cassandra delegate available for {} (Cassandra not running)", localInstance.host());
return Future.succeededFuture();
}
catch (Exception e)
{
// Unexpected error - be conservative and reject for safety
LOGGER.warn("Unable to verify Cassandra status on {}, rejecting for safety", localInstance.host(), e);
return Future.failedFuture(e);
}
LOGGER.info("Starting data copy task with id={}, source={}, destination={}",
newTask.id(), source, localInstanceMetadata.host());
newTask.start();
return Future.succeededFuture(newTask);
}

LiveMigrationTask createTask(LiveMigrationDataCopyRequest request,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.livemigration;

import io.vertx.core.Future;
import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
import org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;

/**
* A pluggable pre-check hook that runs before each file download iteration in the live migration
* data copy process. Since the data copy involves deleting local files and downloading files from
* the source, implementations can perform safety validations (e.g., verifying cluster state via gossip,
* checking instance readiness) to prevent data corruption or unsafe operations.
*
* <p>The pre-check is invoked at the beginning of every download iteration (not just the first one),
* allowing implementations to continuously validate that conditions remain safe throughout the
* multi-iteration copy process.</p>
*
* <p>A {@link #DEFAULT} no-op implementation is provided for cases where no pre-check is needed.
* Custom implementations can be bound via Guice to override the default behavior.</p>
*
* <p>Example use cases:</p>
* <ul>
* <li>Gossip-based validation: verify the source node is present and the destination node
* is absent from cluster gossip, preventing data copy to a node that has already joined</li>
Comment on lines +40 to +41
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation does not exist

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This documentation is suggesting the kind of checks users can add (line: 35-38). But do not claim that they are implemented.

* <li>Instance state checks: verify the local Cassandra process is not running</li>
* <li>Disk space validation: ensure sufficient space before downloading</li>
* </ul>
*
* @see LiveMigrationFileDownloader#downloadFiles()
*/
public interface LiveMigrationFileDownloadPreCheck
{
/**
* Default no-op implementation that always succeeds. Used when no pre-check validation is required.
*/
LiveMigrationFileDownloadPreCheck DEFAULT = context -> Future.succeededFuture();

/**
* Performs a safety check before a file download iteration begins.
*
* <p>Implementations should return a succeeded future if the check passes (it is safe to proceed),
* or a failed future with a descriptive exception if the check fails (download should be aborted).</p>
*
* @param context provides access to the source host, destination instance metadata,
* sidecar port, and the data copy request parameters needed for validation
* @return a succeeded {@link Future} if the pre-check passes, a failed {@link Future} otherwise
*/
Future<Void> doCheck(PreCheckContext context);

/**
* Encapsulates the contextual information available to a {@link LiveMigrationFileDownloadPreCheck}
* implementation. This context is populated by the {@link LiveMigrationFileDownloader} before each
* download iteration.
*
* <p>Provides access to:</p>
* <ul>
* <li>{@link #source()} - hostname of the source instance being copied from</li>
* <li>{@link #destinationInstanceMetadata()} - metadata of the local/destination instance,
* including host, data directories, and the Cassandra adapter delegate</li>
* <li>{@link #sidecarPort()} - port on which Sidecar is running, useful for making
* Sidecar API calls to other cluster instances</li>
* <li>{@link #request()} - the original data copy request containing task parameters</li>
* </ul>
*/
interface PreCheckContext
{
/**
* @return the hostname of the source instance from which files are being copied
*/
String source();

/**
* @return the metadata for the destination (local) instance where files will be written
*/
InstanceMetadata destinationInstanceMetadata();

/**
* @return the Sidecar service port, useful for contacting other Sidecar instances in the cluster
*/
int sidecarPort();

/**
* @return the original data copy request containing task parameters such as max iterations,
* success threshold, and max concurrency
*/
LiveMigrationDataCopyRequest request();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class LiveMigrationFileDownloader
private final int port;
private final String logPrefix;
private final ExecutorPools executorPools;
private final LiveMigrationFileDownloadPreCheck preCheck;
private OperationStatus operationStatus;
private AsyncConcurrentTaskExecutor<Void> concurrentTaskExecutor;

Expand All @@ -97,6 +98,7 @@ protected LiveMigrationFileDownloader(Builder builder)
this.source = builder.source;
this.port = builder.port;
this.executorPools = builder.executorPools;
this.preCheck = builder.preCheck;

this.operationStatus = OperationStatus.startingState();
this.logPrefix = String.format("liveMigrationRequest=%s iteration=%s ", id, iteration);
Expand All @@ -115,7 +117,8 @@ public static Builder builder()
*/
public Future<OperationStatus> downloadFiles()
{
return checkLiveMigrationStatusOfSource()
return runPreCheck()
.compose(v -> checkLiveMigrationStatusOfSource())
.compose(v -> fetchSourceFileList())
.compose(this::cleanupUnnecessaryFiles)
.compose(this::prepareDownloadList)
Expand All @@ -124,6 +127,14 @@ public Future<OperationStatus> downloadFiles()
.otherwise(this::handleDownloadFailure);
}

private Future<Void> runPreCheck()
{
LiveMigrationFileDownloadPreCheck.PreCheckContext context
= new PreCheckContextImpl(source, instanceMetadata, port, request);
return preCheck.doCheck(context)
.onSuccess(v -> LOGGER.debug("{} Pre-check completed successfully. Proceeding with data copy.", logPrefix))
.onFailure(throwable -> LOGGER.error("{} Pre-check failed.", logPrefix, throwable));
}

/**
* Checks whether the live migration status at the source is NOT_COMPLETED or COMPLETED.
Expand Down Expand Up @@ -572,6 +583,7 @@ static class Builder implements DataObjectBuilder<Builder, LiveMigrationFileDown
private String id;
private String source;
private int port;
private LiveMigrationFileDownloadPreCheck preCheck;

protected Builder()
{
Expand Down Expand Up @@ -704,6 +716,17 @@ public Builder executorPools(ExecutorPools executorPools)
return update(b -> b.executorPools = executorPools);
}

/**
* Sets the {@code preCheck} instance and return a reference to this Builder enabling method chaining.
*
* @param preCheck the {@code preCheck} to set
* @return a reference to this Builder
*/
public Builder preCheck(LiveMigrationFileDownloadPreCheck preCheck)
{
return update(b -> b.preCheck = preCheck);
}

/**
* Returns a {@code LiveMigrationFileDownloader} built from the parameters previously set.
*
Expand All @@ -721,6 +744,7 @@ public LiveMigrationFileDownloader build()
Objects.requireNonNull(request);
Objects.requireNonNull(source);
Objects.requireNonNull(executorPools);
Objects.requireNonNull(preCheck);

return new LiveMigrationFileDownloader(this);
}
Expand All @@ -744,4 +768,51 @@ public FileAttributes(long size, long lastModifiedTime)
this.lastModifiedTime = lastModifiedTime;
}
}

/**
* Implementation of {@link LiveMigrationFileDownloadPreCheck.PreCheckContext} that provides
* the downloader's context to pre-check implementations.
*/
private static class PreCheckContextImpl implements LiveMigrationFileDownloadPreCheck.PreCheckContext
{
private final String source;
private final InstanceMetadata destinationInstanceMetadata;
private final int sidecarPort;
private final LiveMigrationDataCopyRequest request;

PreCheckContextImpl(String source,
InstanceMetadata destinationInstanceMetadata,
int sidecarPort,
LiveMigrationDataCopyRequest request)
{
this.source = source;
this.destinationInstanceMetadata = destinationInstanceMetadata;
this.sidecarPort = sidecarPort;
this.request = request;
}

@Override
public String source()
{
return source;
}

@Override
public InstanceMetadata destinationInstanceMetadata()
{
return destinationInstanceMetadata;
}

@Override
public int sidecarPort()
{
return sidecarPort;
}

@Override
public LiveMigrationDataCopyRequest request()
{
return request;
}
}
}
Loading