Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[submodule "modules/accord"]
path = modules/accord
url = https://github.com/apache/cassandra-accord.git
branch = trunk
url = https://github.com/alanwang67/cassandra-accord.git
branch = CASSANDRA-19433
2 changes: 1 addition & 1 deletion modules/accord
107 changes: 95 additions & 12 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
Expand Down Expand Up @@ -114,9 +115,11 @@
import org.apache.cassandra.repair.NoSuchRepairSessionException;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.AccordService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tcm.ClusterMetadata;
import org.apache.cassandra.tcm.ownership.DataPlacement;
Expand All @@ -133,6 +136,7 @@
import org.apache.cassandra.utils.concurrent.Promise;
import org.apache.cassandra.utils.concurrent.Refs;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Collections.singleton;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.concurrent.FutureTask.callable;
Expand Down Expand Up @@ -612,7 +616,14 @@ public Object call() throws Exception
FBUtilities.waitOnFutures(futures);
assert compacting.originals().isEmpty();
logger.info("Finished {} for {}.{} successfully", operationType, keyspace, table);
return AllSSTableOpStatus.SUCCESSFUL;

if (operation.incompleteOperation())
{
logger.info("Operation incomplete for {}.{}", keyspace, table);
return AllSSTableOpStatus.INCOMPLETE;
}
else
return AllSSTableOpStatus.SUCCESSFUL;
}
finally
{
Expand All @@ -636,13 +647,18 @@ private static interface OneSSTableOperation
{
Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction);
void execute(LifecycleTransaction input) throws IOException;
default boolean incompleteOperation()
{
return false;
};
}

public enum AllSSTableOpStatus
{
SUCCESSFUL(0),
ABORTED(1),
UNABLE_TO_CANCEL(2);
UNABLE_TO_CANCEL(2),
INCOMPLETE(3);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nodeprobe doesn't handle this. Cleanup should signal an error if some data wasn't cleaned up since what was requested didn't fully happen.


public final int statusCode;

Expand Down Expand Up @@ -789,12 +805,17 @@ public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jo
if (partitioner.getClass() == LocalPartitioner.class)
localWrites = RangesAtEndpoint.of(Replica.fullReplica(local, new Range<>(partitioner.getMinimumToken(), partitioner.getMinimumToken())));

final Set<Range<Token>> allRanges = new HashSet<>(localWrites.ranges());
InUseRanges inUseRanges = getInUseRanges(cfStore.getTableId(), localWrites);

final List<Range<Token>> noLongerOwnedRangesInUseByAccord = inUseRanges.noLongerOwnedRangesInUseByAccord;
final List<Range<Token>> allRanges = inUseRanges.allRanges;
final Set<Range<Token>> transientRanges = new HashSet<>(localWrites.onlyTransient().ranges());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not something you changed but I don't get why these are wrapped in new HashSet, it should be ImmmutableSet.copyOf.

final Set<Range<Token>> fullRanges = new HashSet<>(localWrites.onlyFull().ranges());

return parallelAllSSTableOperation(cfStore, new OneSSTableOperation()
{
boolean incompleteOperation = false;

@Override
public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
Comment thread
aweisberg marked this conversation as resolved.
{
Expand All @@ -807,6 +828,8 @@ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
SSTableReader sstable = sstableIter.next();
boolean needsCleanupFull = needsCleanup(sstable, fullRanges);
boolean needsCleanupTransient = !transientRanges.isEmpty() && sstable.isRepaired() && needsCleanup(sstable, transientRanges);
boolean needsCleanupAccord = needsCleanup(sstable, noLongerOwnedRangesInUseByAccord);

//If there are no ranges for which the table needs cleanup either due to lack of intersection or lack
//of the table being repaired.
totalSSTables++;
Expand All @@ -823,7 +846,15 @@ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
transaction.cancel(sstable);
skippedSStables++;
}
else if (!needsCleanupAccord)
{
sstableIter.remove();
transaction.cancel(sstable);
skippedSStables++;
this.incompleteOperation = true;
}
}

logger.info("Skipping cleanup for {}/{} sstables for {}.{} since they are fully contained in owned ranges (full ranges: {}, transient ranges: {})",
skippedSStables, totalSSTables, cfStore.getKeyspaceName(), cfStore.getTableName(), fullRanges, transientRanges);
sortedSSTables.sort(SSTableReader.sizeComparator);
Expand All @@ -834,11 +865,50 @@ public Iterable<SSTableReader> filterSSTables(LifecycleTransaction transaction)
public void execute(LifecycleTransaction txn) throws IOException
{
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfStore, allRanges, transientRanges, txn.onlyOne().isRepaired(), FBUtilities.nowInSeconds());
doCleanupOne(cfStore, txn, cleanupStrategy, allRanges, hasIndexes);
this.incompleteOperation |= (doCleanupOne(cfStore, txn, cleanupStrategy, allRanges, hasIndexes, this.incompleteOperation, noLongerOwnedRangesInUseByAccord));
}

@Override
public boolean incompleteOperation()
{
return this.incompleteOperation;
}
}, jobs, OperationType.CLEANUP);
}

public static class InUseRanges
{
public List<Range<Token>> noLongerOwnedRangesInUseByAccord;
public List<Range<Token>> allRanges;

public InUseRanges(List<Range<Token>> noLongerOwnedRangesInUseByAccord, List<Range<Token>> allRanges)
{
this.noLongerOwnedRangesInUseByAccord = noLongerOwnedRangesInUseByAccord;
this.allRanges = allRanges;
}
}

private InUseRanges getInUseRanges(TableId tableId, RangesAtEndpoint localWrites)
{
List<Range<Token>> noLongerOwnedRangesInUseByAccord = new ArrayList<>();
TableMetadata metadata = Schema.instance.getTableMetadata(tableId);

if (metadata != null && metadata.isAccordEnabled())
{
checkState(localWrites.onlyTransient().ranges().isEmpty(), "Transient Replication is not supported with Accord");
Map<TableId, Set<Range<Token>>> inUseRanges = AccordService.instance().getInUseRanges();
if (inUseRanges.containsKey(tableId))
{
Set<Range<Token>> accordOwnedTableRanges = inUseRanges.get(tableId);
for (Range<Token> range : accordOwnedTableRanges)
noLongerOwnedRangesInUseByAccord.addAll(range.subtractAll(localWrites.ranges()));
}
}

Collection<Range<Token>> allRanges = Stream.concat(localWrites.ranges().stream(), noLongerOwnedRangesInUseByAccord.stream()).collect(Collectors.toSet());
return new InUseRanges(Range.normalize(noLongerOwnedRangesInUseByAccord), Range.normalize(allRanges));
}

public AllSSTableOpStatus performGarbageCollection(final ColumnFamilyStore cfStore, TombstoneOption tombstoneOption, int jobs) throws InterruptedException, ExecutionException
{
assert !cfStore.isIndex();
Expand Down Expand Up @@ -1391,8 +1461,13 @@ public void forceUserDefinedCleanup(String dataFiles)
ColumnFamilyStore cfs = entry.getKey();
Keyspace keyspace = cfs.keyspace;
final RangesAtEndpoint replicas = StorageService.instance.getLocalReplicas(keyspace.getName());
final Set<Range<Token>> allRanges = replicas.ranges();

InUseRanges inUseRanges = getInUseRanges(cfs.getTableId(), replicas);

final List<Range<Token>> noLongerOwnedRangesInUseByAccord = inUseRanges.noLongerOwnedRangesInUseByAccord;
final List<Range<Token>> allRanges = inUseRanges.allRanges;
final Set<Range<Token>> transientRanges = replicas.onlyTransient().ranges();

boolean hasIndexes = cfs.indexManager.hasIndexes();
SSTableReader sstable = lookupSSTable(cfs, entry.getValue());

Expand All @@ -1405,7 +1480,7 @@ public void forceUserDefinedCleanup(String dataFiles)
CleanupStrategy cleanupStrategy = CleanupStrategy.get(cfs, allRanges, transientRanges, sstable.isRepaired(), FBUtilities.nowInSeconds());
try (LifecycleTransaction txn = cfs.getTracker().tryModify(sstable, OperationType.CLEANUP))
{
doCleanupOne(cfs, txn, cleanupStrategy, allRanges, hasIndexes);
doCleanupOne(cfs, txn, cleanupStrategy, allRanges, hasIndexes, false, noLongerOwnedRangesInUseByAccord);
}
catch (IOException e)
{
Expand Down Expand Up @@ -1605,13 +1680,16 @@ public static boolean needsCleanup(SSTableReader sstable, Collection<Range<Token
*
* @throws IOException
*/
private void doCleanupOne(final ColumnFamilyStore cfs,
LifecycleTransaction txn,
CleanupStrategy cleanupStrategy,
Collection<Range<Token>> allRanges,
boolean hasIndexes) throws IOException
private boolean doCleanupOne(final ColumnFamilyStore cfs,
LifecycleTransaction txn,
CleanupStrategy cleanupStrategy,
Collection<Range<Token>> allRanges,
boolean hasIndexes,
boolean alreadyIncomplete,
Collection<Range<Token>> noLongerOwnedRangesInUseByAccord) throws IOException
{
assert !cfs.isIndex();
boolean incompleteOperation = alreadyIncomplete;

SSTableReader sstable = txn.onlyOne();

Expand All @@ -1621,7 +1699,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs,
txn.obsoleteOriginals();
txn.finish();
logger.info("SSTable {} ([{}, {}]) does not intersect the owned ranges ({}), dropping it", sstable, sstable.getFirst().getToken(), sstable.getLast().getToken(), allRanges);
return;
return incompleteOperation;
}

long start = nanoTime();
Expand Down Expand Up @@ -1662,6 +1740,10 @@ private void doCleanupOne(final ColumnFamilyStore cfs,
if (notCleaned == null)
continue;

if (!noLongerOwnedRangesInUseByAccord.isEmpty() && !incompleteOperation
&& Range.isInRanges(partition.partitionKey().getToken(), noLongerOwnedRangesInUseByAccord))
incompleteOperation = true;

if (writer.append(notCleaned) != null)
totalkeysWritten++;

Expand Down Expand Up @@ -1692,6 +1774,7 @@ private void doCleanupOne(final ColumnFamilyStore cfs,
FBUtilities.prettyPrintMemory(endsize), (int) (ratio * 100), totalkeysWritten, dTime));
}

return incompleteOperation;
}

protected void compactionRateLimiterAcquire(RateLimiter limiter, long bytesScanned, long lastBytesScanned, double compressionRatio)
Expand Down
22 changes: 22 additions & 0 deletions src/java/org/apache/cassandra/service/accord/AccordService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -87,6 +89,7 @@
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.journal.Params;
import org.apache.cassandra.locator.InetAddressAndPort;
Expand Down Expand Up @@ -1175,6 +1178,25 @@ public void awaitDone(TableId id, long epoch)
getBlocking(node.durability().sync("Drop Keyspace/Table (Epoch " + epoch + ')', ExclusiveSyncPoint, TxnId.minForEpoch(epoch), ranges, Self, All, DatabaseDescriptor.getAccordRangeSyncPointTimeoutNanos(), NANOSECONDS), ranges, new LatencyRequestBookkeeping(null), startedAt, deadline, false);
}

@Override
public Map<TableId, Set<org.apache.cassandra.dht.Range<Token>>> getInUseRanges()
{
Map<TableId, Set<org.apache.cassandra.dht.Range<Token>>> tableToRangeMap = new HashMap<>();
List<Ranges> globalRanges = getBlocking(node.commandStores().getInUseRanges());

for (Ranges ranges : globalRanges)
{
for (accord.primitives.Range r : ranges)
{
TokenRange tokenRange = (TokenRange) r;
tableToRangeMap.computeIfAbsent(tokenRange.table(), tableId -> new HashSet<>())
.add(tokenRange.toKeyspaceRange());
}
}
Comment thread
alanwang67 marked this conversation as resolved.

return tableToRangeMap;
}

public Params journalConfiguration()
{
return journal.configuration();
Expand Down
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/service/accord/IAccordService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.cassandra.service.accord;

import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -55,6 +58,8 @@

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RequestExecutionException;
import org.apache.cassandra.journal.Params;
import org.apache.cassandra.net.IVerbHandler;
Expand Down Expand Up @@ -183,6 +188,8 @@ public AccordCompactionInfos(DurableBefore durableBefore, long minEpoch, AccordC

void awaitDone(TableId id, long epoch);

Map<TableId, Set<Range<Token>>> getInUseRanges();

AccordEndpointMapper endpointMapper();

AccordTopologyService topologyService();
Expand Down Expand Up @@ -352,6 +359,12 @@ public void awaitDone(TableId id, long epoch)

}

@Override
public Map<TableId, Set<org.apache.cassandra.dht.Range<Token>>> getInUseRanges()
{
return Collections.emptyMap();
}

@Override
public AccordEndpointMapper endpointMapper()
{
Expand Down Expand Up @@ -562,6 +575,12 @@ public void awaitDone(TableId id, long epoch)
delegate.awaitDone(id, epoch);
}

@Override
public Map<TableId, Set<org.apache.cassandra.dht.Range<Token>>> getInUseRanges()
{
return delegate.getInUseRanges();
}

@Override
public Params journalConfiguration()
{
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/tools/NodeProbe.java
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,9 @@ private void perform(PrintStream out, String ks, Job job, String jobName) throws
case 2:
out.printf("Failed marking some sstables compacting in keyspace %s, check server logs for more information.\n", ks);
throw new RuntimeException(String.format("Failed marking some sstables compacting in keyspace %s, check server logs for more information.\n", ks));
case 3:
out.printf("Partially cleaned up SSTables for ranges that are no longer owned in keyspace %s, check server logs for more information.\n", ks);
throw new RuntimeException(String.format("Partially cleaned up SSTables for ranges that are no longer owned in keyspace %s, check server logs for more information.\n", ks));
}
}

Expand Down
Loading