Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/java/org/apache/cassandra/tcm/Startup.java
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ public static void startup(Supplier<Transformation> initialTransformation, boole
if (isReplacing)
ReconfigureCMS.maybeReconfigureCMS(metadata, DatabaseDescriptor.getReplaceAddress());

// if this throws startup is aborted and operator needs to restart, in that case the IPS is resumed if
// it was successfully committed
ClusterMetadataService.instance().commit(initialTransformation.get());
// When Accord starts up it needs to check for any historic epochs that it needs to know about (in order
// to handle pending transactions), in order to know what nodes to check with it needs to know what the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ static void decommission(boolean shutdownNetworking, boolean force)
ClusterMetadataService.instance().commit(new PrepareLeave(self,
force,
ClusterMetadataService.instance().placementProvider(),
LeaveStreams.Kind.UNBOOTSTRAP));
LeaveStreams.Kind.UNBOOTSTRAP),
m -> m,
failureHandler("PrepareLeave", StorageService.instance::markDecommissionFailed));
}
else if (InProgressSequences.isLeave(inProgress))
{
Expand Down Expand Up @@ -182,13 +184,24 @@ static void move(Token newToken)
ClusterMetadataService.instance().commit(new PrepareMove(self,
Collections.singleton(newToken),
ClusterMetadataService.instance().placementProvider(),
true));
true),
m -> m,
failureHandler("PrepareMove", StorageService.instance::markMoveFailed));
InProgressSequences.finishInProgressSequences(self);

if (logger.isDebugEnabled())
logger.debug("Successfully moved to new token {}", StorageService.instance.getLocalTokens().iterator().next());
}

private static ClusterMetadataService.CommitFailureHandler<ClusterMetadata> failureHandler(String type, Runnable markFailed)
{
return (code, msg) -> {
logger.warn("Got failure committing {} transformation: {} {}", type, code, msg);
markFailed.run();
throw new IllegalStateException(String.format("Can not commit transformation: \"%s\"(%s).", code, msg));
};
}

static void resumeMove()
{
if (ClusterMetadataService.instance().isMigrating() || ClusterMetadataService.state() == ClusterMetadataService.State.GOSSIP)
Expand All @@ -201,6 +214,11 @@ static void resumeMove()
{
String msg = "No move operation in progress, can't resume";
logger.info(msg);
if (StorageService.instance.operationMode() == MOVE_FAILED)
{
// there is no ongoing move to resume, but operation mode thinks there is
StorageService.instance.clearTransientMode();
}
throw new IllegalStateException(msg);
}
if (StorageService.instance.operationMode() != MOVE_FAILED)
Expand All @@ -221,7 +239,7 @@ static void abortMove(String nodeId)
/**
*
* @param nodeId node id to abort the MSO for, null for local node
* @param kind the expected kind of the multi step operation to abolt
* @param kind the expected kind of the multi step operation to abort
* @param ssMode the legacy mode we want storage service to be in, null for any
*/
private static void abortHelper(@Nullable String nodeId, MultiStepOperation.Kind kind, @Nullable StorageService.Mode ssMode)
Expand All @@ -234,9 +252,19 @@ private static void abortHelper(@Nullable String nodeId, MultiStepOperation.Kind
MultiStepOperation<?> sequence = metadata.inProgressSequences.get(toAbort);
if (sequence == null || sequence.kind() != kind)
{
String msg = String.format("No %s operation in progress for %s, can't abort (%s)", kind, toAbort, sequence);
logger.info(msg);
throw new IllegalStateException(msg);
if (toAbort.equals(metadata.myNodeId()) && ssMode != null && StorageService.instance.operationMode() == ssMode)
{
// there is no ongoing sequence with the given kind, but storage service operation mode is set, clear it
logger.debug("There is no ongoing {} sequence for this node, but operation mode is {} - clearing transient mode", kind, ssMode);
StorageService.instance.clearTransientMode();
return;
}
else
{
String msg = String.format("No %s operation in progress for %s, can't abort (%s)", kind, toAbort, sequence);
logger.info(msg);
throw new IllegalStateException(msg);
}
}
if (toAbort.equals(metadata.myNodeId()))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ public static UnbootstrapAndLeave newSequence(Epoch preparedAt,
*/
@VisibleForTesting
UnbootstrapAndLeave(Epoch latestModification,
LockedRanges.Key lockKey,
Transformation.Kind next,
PrepareLeave.StartLeave startLeave,
PrepareLeave.MidLeave midLeave,
PrepareLeave.FinishLeave finishLeave,
LeaveStreams streams)
LockedRanges.Key lockKey,
Transformation.Kind next,
PrepareLeave.StartLeave startLeave,
PrepareLeave.MidLeave midLeave,
PrepareLeave.FinishLeave finishLeave,
LeaveStreams streams)
{
super(nextToIndex(next), latestModification);
this.lockKey = lockKey;
Expand Down Expand Up @@ -198,7 +198,8 @@ public SequenceState executeNext()
}
catch (ExecutionException e)
{
StorageService.instance.markDecommissionFailed();
if (startLeave.nodeId().equals(ClusterMetadata.current().myNodeId()))
StorageService.instance.markDecommissionFailed();
JVMStabilityInspector.inspectThrowable(e);
logger.error("Error while decommissioning node: {}", e.getCause().getMessage());
throw new RuntimeException("Error while decommissioning node: " + e.getCause().getMessage());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test.tcm;

import java.io.IOException;

import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.service.StorageService;

import static org.junit.Assert.assertEquals;

public class LostCommitReqResTest extends TestBaseImpl
{
@Test
public void lostMoveCommitResponseTest() throws IOException
{
try (Cluster cluster = init(builder().withNodes(2)
.withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5"))
.start()))
{
// no commit responses
cluster.filters().verbs(Verb.TCM_COMMIT_RSP.id).from(1).to(2).drop();
// lost response when committing PrepareMove, fails the nodetool command and halts progress
cluster.get(2).nodetoolResult("move", "1234").asserts().failure();
assertMoveFailed(cluster.get(2)); // we should be in MOVE_FAILED state to allow abortmove
// still no responses, committing CancelInProgressSequence response is lost, but is actually committed
cluster.get(2).nodetoolResult("abortmove").asserts().failure();
assertNormal(cluster.get(2)); // and we should be back to normal
cluster.get(2).nodetoolResult("move", "1234").asserts().failure();
assertMoveFailed(cluster.get(2));
// finishing the MSO does not depend on any commit responses, just that ClusterMetadata.current() is up to date, so this is successful;
cluster.get(2).nodetoolResult("move", "--resume").asserts().success();
assertNormal(cluster.get(2));
}
}

@Test
public void lostMoveCommitRequestTest() throws IOException
{
try (Cluster cluster = init(builder().withNodes(2)
.withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5"))
.start()))
{
// no commit requests
cluster.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop();
cluster.get(2).nodetoolResult("move", "1234").asserts().failure();
// state should be "move failed" since we don't know if the request or response went missing:
assertMoveFailed(cluster.get(2));
cluster.filters().reset();
// abort move should be successful, it only clears the transient state in this case though
cluster.get(2).nodetoolResult("abortmove").asserts().success();
assertNormal(cluster.get(2));
cluster.get(2).nodetoolResult("move", "1234").asserts().success();
assertNormal(cluster.get(2));
}
}

@Test
public void lostDecomCommitResponseTest() throws IOException
{
try (Cluster cluster = init(builder().withNodes(2)
.withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5"))
.start()))
{
cluster.filters().verbs(Verb.TCM_COMMIT_RSP.id).from(1).to(2).drop();
cluster.get(2).nodetoolResult("decommission", "--force").asserts().failure();
assertDecomFailed(cluster.get(2));
cluster.get(2).nodetoolResult("abortdecommission").asserts().failure();
assertNormal(cluster.get(2));
cluster.get(2).nodetoolResult("decommission", "--force").asserts().failure();
assertDecomFailed(cluster.get(2));
cluster.get(2).nodetoolResult("decommission").asserts().success();
}
}

@Test
public void lostDecomCommitRequestTest() throws IOException
{
try (Cluster cluster = init(builder().withNodes(2)
.withConfig(c -> c.with(Feature.NETWORK, Feature.GOSSIP).set("cms_await_timeout", "1s").set("cms_default_max_retries", "5"))
.start()))
{
cluster.filters().verbs(Verb.TCM_COMMIT_REQ.id).from(2).to(1).drop();
cluster.get(2).nodetoolResult("decommission", "--force").asserts().failure();
assertDecomFailed(cluster.get(2));
cluster.filters().reset();
cluster.get(2).nodetoolResult("abortdecommission").asserts().success();
assertNormal(cluster.get(2));
cluster.get(2).nodetoolResult("decommission", "--force").asserts().success();
}
}

private static void assertNormal(IInvokableInstance i)
{
assertOperationMode(i, StorageService.Mode.NORMAL);
}

private static void assertMoveFailed(IInvokableInstance i)
{
assertOperationMode(i, StorageService.Mode.MOVE_FAILED);
}

private static void assertDecomFailed(IInvokableInstance i)
{
assertOperationMode(i, StorageService.Mode.DECOMMISSION_FAILED);
}

private static void assertOperationMode(IInvokableInstance i, StorageService.Mode expectedMode)
{
String mode = i.callOnInstance(() -> StorageService.instance.operationMode().toString());
assertEquals(expectedMode.toString(), mode);
}
}