Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.4.0
-----
* Adding CDC support for Cassandra 5.0 Commit Logs (CASSANALYTICS-60)
* Fixing CdcTests.testMockedCdc broken due to incorrect position update in BufferingCommitLogReader (CASSANALYTICS-127)

0.3.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class KafkaPublisher implements AutoCloseable
{
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublisher.class);

protected CassandraVersion version;
protected TopicSupplier topicSupplier;
protected int maxRecordSizeBytes;
protected final RecordProducer recordProducer;
Expand All @@ -62,7 +63,8 @@ public class KafkaPublisher implements AutoCloseable
ThreadLocal.withInitial(HashMap::new);
protected final KafkaStats kafkaStats;

public KafkaPublisher(TopicSupplier topicSupplier,
public KafkaPublisher(CassandraVersion version,
TopicSupplier topicSupplier,
KafkaProducer<String, byte[]> producer,
Serializer<CdcEvent> serializer,
int maxRecordSizeBytes,
Expand All @@ -71,6 +73,7 @@ public KafkaPublisher(TopicSupplier topicSupplier,
CdcLogMode logMode)
{
this(
version,
topicSupplier,
producer,
serializer,
Expand All @@ -84,7 +87,8 @@ public KafkaPublisher(TopicSupplier topicSupplier,
);
}

public KafkaPublisher(TopicSupplier topicSupplier,
public KafkaPublisher(CassandraVersion version,
TopicSupplier topicSupplier,
KafkaProducer<String, byte[]> producer,
Serializer<CdcEvent> serializer,
int maxRecordSizeBytes,
Expand All @@ -95,6 +99,7 @@ public KafkaPublisher(TopicSupplier topicSupplier,
RecordProducer recordProducer,
EventHasher eventHasher)
{
this.version = version;
this.topicSupplier = topicSupplier;
this.maxRecordSizeBytes = maxRecordSizeBytes;
this.failOnRecordTooLargeError = failOnRecordTooLargeError;
Expand All @@ -116,7 +121,7 @@ public CqlField.CqlType getType(KeyspaceTypeKey key)

public CassandraVersion version()
{
return CassandraVersion.FOURZERO;
return version;
}

public Logger logger()
Expand Down
10 changes: 6 additions & 4 deletions cassandra-analytics-cdc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ dependencies {
testImplementation project(":cassandra-four-zero-types")
testImplementation project(":cassandra-four-zero-bridge")
testImplementation project(path: ':cassandra-four-zero', configuration: 'shadow')
// unit tests are performed only with Cassandra 4
// testImplementation project(":cassandra-five-zero-bridge")
// testImplementation project(":cassandra-five-zero-types")
// testImplementation project(path: ':cassandra-five-zero', configuration: 'shadow')

testImplementation project(":cassandra-five-zero-bridge")
testImplementation project(":cassandra-five-zero-types")
testImplementation project(path: ':cassandra-five-zero', configuration: 'shadow')

testImplementation(group: 'org.quicktheories', name: 'quicktheories', version: "${project.rootProject.quickTheoriesVersion}")
testImplementation(group: 'com.google.guava', name: 'guava', version: '31.1-jre')
Expand Down Expand Up @@ -139,6 +139,8 @@ jar {
}

test {
systemProperty "cassandra.sidecar.versions_to_test", (System.getenv("CASSANDRA_VERSION") ?: "4.0.17,5.0.5")

minHeapSize = '1024m'
maxHeapSize = '3072m'
maxParallelForks = Math.max(Runtime.runtime.availableProcessors() * 2, 8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.annotations.VisibleForTesting;

import org.apache.cassandra.cdc.avro.CqlToAvroSchemaConverter;
import org.apache.cassandra.spark.utils.Throwing;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -40,14 +43,17 @@ public static class VersionSpecificBridge
public final CdcBridge cdcBridge;
@Nullable
final CqlToAvroSchemaConverter avroSchemaConverter;
final ClassLoader classLoader;

public VersionSpecificBridge(CassandraBridge cassandraBridge,
CdcBridge cdcBridge,
@Nullable CqlToAvroSchemaConverter avroSchemaConverter)
@Nullable CqlToAvroSchemaConverter avroSchemaConverter,
ClassLoader classLoader)
{
this.cassandraBridge = cassandraBridge;
this.cdcBridge = cdcBridge;
this.avroSchemaConverter = avroSchemaConverter;
this.classLoader = classLoader;
}
}

Expand Down Expand Up @@ -150,12 +156,36 @@ private static VersionSpecificBridge create(@NotNull String label)
{
}

return new VersionSpecificBridge(bridgeInstance, cdcBridgeInstance, cqlToAvroSchemaConverter);
return new VersionSpecificBridge(bridgeInstance, cdcBridgeInstance, cqlToAvroSchemaConverter, loader);
}
catch (ClassNotFoundException | NoSuchMethodException | InstantiationException
| IllegalAccessException | InvocationTargetException exception)
{
throw new RuntimeException("Failed to create Cassandra bridge for label " + label, exception);
}
}

@VisibleForTesting
public static <T> T executeActionOnBridgeClassLoader(@NotNull CassandraVersion version, Throwing.Function<ClassLoader, T> action)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add @VisibleForTesting

{
ClassLoader bridgeLoader = getVersionSpecificBridge(version).classLoader;
Thread currentThread = Thread.currentThread();
ClassLoader originalClassLoader = currentThread.getContextClassLoader();
try
{
currentThread.setContextClassLoader(bridgeLoader);
try
{
return action.apply(bridgeLoader);
}
catch (Exception e)
{
throw new RuntimeException("Failed to execute function on bridge classloader", e);
}
}
finally
{
currentThread.setContextClassLoader(originalClassLoader);
}
}
}
Loading