Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationRequest;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
Expand Down Expand Up @@ -154,12 +155,14 @@ public DistributedProcess(
try {
IgniteInternalFuture<R> fut;

if (ctx.rollingUpgrade().enabled()) {
fut = new GridFinishedFuture<>(new IgniteException("Failed to start distributed process "
+ type + ": rolling upgrade is enabled"));
}
I req = (I)msg.request();

String rejectMsg = ctx.rollingUpgrade().enabled() ? type.checkRollingUpgrade(req) : null;

if (rejectMsg != null)
fut = new GridFinishedFuture<>(new IgniteException(rejectMsg));
else
fut = exec.apply((I)msg.request());
fut = exec.apply(req);

fut.listen(() -> {
if (fut.error() != null)
Expand Down Expand Up @@ -446,14 +449,27 @@ public enum DistributedProcessType {
*
* @see IgniteSnapshotManager
*/
START_SNAPSHOT,
START_SNAPSHOT {
/** {@inheritDoc} */
@Override public @Nullable String checkRollingUpgrade(Message req) {
if (((SnapshotOperationRequest)req).incremental())
return "Incremental snapshot creation is not allowed when rolling upgrade is enabled.";

return null;
}
},

/**
* End snapshot procedure.
*
* @see IgniteSnapshotManager
*/
END_SNAPSHOT,
END_SNAPSHOT {
/** {@inheritDoc} */
@Override public @Nullable String checkRollingUpgrade(Message req) {
return null;
}
},

/**
* Cache group encyption key change prepare phase.
Expand Down Expand Up @@ -508,6 +524,14 @@ public enum DistributedProcessType {
/**
* Snapshot partitions validation.
*/
CHECK_SNAPSHOT_PARTS
CHECK_SNAPSHOT_PARTS;

/**
* @param req Process request.
* @return Reject reason for a request denied during rolling upgrade or {@code null} if the request is allowed.
*/
public @Nullable String checkRollingUpgrade(Message req) {
return "Failed to start distributed process " + this + ": rolling upgrade is enabled";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause;

/** */
public class IgniteSnapshotRollingUpgradeTest extends GridCommonAbstractTest {
Expand All @@ -36,7 +36,8 @@ public class IgniteSnapshotRollingUpgradeTest extends GridCommonAbstractTest {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)));
.setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true))
.setWalCompactionEnabled(true));

return cfg;
}
Expand All @@ -48,28 +49,55 @@ public class IgniteSnapshotRollingUpgradeTest extends GridCommonAbstractTest {
cleanPersistenceDir();
}

/** Tests that snapshot creation fails when rolling upgrade is enabled. */
/** Tests that full snapshot creation is allowed when rolling upgrade is enabled. */
@Test
public void testSnapshotCreationFailsDuringRollingUpgrade() throws Exception {
public void testSnapshotCreationSucceedsDuringRollingUpgrade() throws Exception {
IgniteEx srv = startGrid(0);

srv.cluster().state(ClusterState.ACTIVE);

IgniteProductVersion curVer = srv.context().discovery().localNode().version();
srv.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);

IgniteProductVersion targetVer = IgniteProductVersion.fromString(curVer.major()
+ "." + curVer.minor()
+ "." + curVer.maintenance() + 1);
enableRollingUpgrade(srv);

srv.context().rollingUpgrade().enable(targetVer, false);
assertTrue(srv.context().rollingUpgrade().enabled());

srv.snapshot().createSnapshot("test").get(getTestTimeout());

assertTrue("Full snapshot was not created",
srv.context().cache().context().snapshotMgr().localSnapshotNames(null).contains("test"));
}

/** Tests that incremental snapshot creation is still blocked during rolling upgrade. */
@Test
public void testIncrementalSnapshotCreationFailsDuringRollingUpgrade() throws Exception {
IgniteEx srv = startGrid(0);

srv.cluster().state(ClusterState.ACTIVE);

srv.getOrCreateCache(DEFAULT_CACHE_NAME).put(0, 0);

srv.snapshot().createSnapshot("test").get(getTestTimeout());

enableRollingUpgrade(srv);

assertTrue(srv.context().rollingUpgrade().enabled());

Throwable ex = assertThrowsWithCause(
() -> srv.snapshot().createSnapshot("test").get(getTestTimeout()),
IgniteException.class
assertThrowsAnyCause(log,
() -> srv.snapshot().createIncrementalSnapshot("test").get(getTestTimeout()),
IgniteException.class,
"Incremental snapshot creation is not allowed when rolling upgrade is enabled."
);
}

/** Enables rolling upgrade. */
private void enableRollingUpgrade(IgniteEx srv) throws Exception {
IgniteProductVersion curVer = srv.context().discovery().localNode().version();

IgniteProductVersion targetVer = IgniteProductVersion.fromString(curVer.major()
+ "." + curVer.minor()
+ "." + curVer.maintenance() + 1);

assertTrue(ex.getMessage().contains("Failed to start distributed process START_SNAPSHOT: rolling upgrade is enabled"));
srv.context().rollingUpgrade().enable(targetVer, false);
}
}
Loading