Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* Fixing CdcTests.testMockedCdc broken due to incorrect position update in BufferingCommitLogReader (CASSANALYTICS-127)
* Commitlog reading not progressing in CDC due to incorrect CommitLogReader.isFullyRead (CASSANALYTICS-124)
* Incorrect hash code calculation in PartitionUpdateWrapper.Digest (CASSANALYTICS-125)
* Add flag to allow bulk write to indexed tables (CASSANALYTICS-128)
* Assign data file start offset based on BTI index (CASSANALYTICS-121)
* Quote identifiers option must be set to true if ttl has mixed case column name (CASSANALYTICS-120)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,8 @@ protected TableSchema initializeTableSchema(@NotNull BulkSparkConf conf,
conf.getTTLOptions(),
conf.getTimestampOptions(),
lowestCassandraVersion,
job().qualifiedTableName().quoteIdentifiers());
job().qualifiedTableName().quoteIdentifiers(),
conf.skipSecondaryIndexCheck);
}

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ public class BulkSparkConf implements Serializable
public final int commitThreadsPerInstance;
public final double importCoordinatorTimeoutMultiplier;
public boolean quoteIdentifiers;
public final boolean skipSecondaryIndexCheck;
protected final String keystorePassword;
protected final String keystorePath;
protected final String keystoreBase64Encoded;
Expand Down Expand Up @@ -207,6 +208,7 @@ public BulkSparkConf(SparkConf conf, Map<String, String> options, @Nullable Logg
this.ttl = MapUtils.getOrDefault(options, WriterOptions.TTL.name(), null);
this.timestamp = MapUtils.getOrDefault(options, WriterOptions.TIMESTAMP.name(), null);
this.quoteIdentifiers = MapUtils.getBoolean(options, WriterOptions.QUOTE_IDENTIFIERS.name(), false, "quote identifiers");
this.skipSecondaryIndexCheck = MapUtils.getBoolean(options, WriterOptions.SKIP_SECONDARY_INDEX_CHECK.name(), false, "skip secondary index check");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default value be true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe default value shouldn't be true, because user need to understand that rebuild need to happen after the job.
@jmckenzie-dev can we print a note/warning to the user mentioning rebuild needs to happen after the job, so they are aware of penalty of using this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call; added:

        if (!skipSecondaryIndexCheck)
        {
            validateNoSecondaryIndexes(tableInfo);
        }
        else if (tableInfo.hasSecondaryIndex())
        {
            LOGGER.warn("Bulk writing to tables with SecondaryIndexes will have an asynchronous index rebuild "
                      + "take place automatically after writing. Reads against the index during this time "
                      + "window will produce inconsistent or stale results until index rebuild is complete.");
        }

int storageClientConcurrency = MapUtils.getInt(options, WriterOptions.STORAGE_CLIENT_CONCURRENCY.name(),
DEFAULT_STORAGE_CLIENT_CONCURRENCY, "storage client concurrency");
long storageClientKeepAliveSeconds = MapUtils.getLong(options, WriterOptions.STORAGE_CLIENT_THREAD_KEEP_ALIVE_SECONDS.name(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ public TableSchema(StructType dfSchema,
TTLOption ttlOption,
TimestampOption timestampOption,
String lowestCassandraVersion,
boolean quoteIdentifiers)
boolean quoteIdentifiers,
boolean skipSecondaryIndexCheck)
{
this.writeMode = writeMode;
this.ttlOption = ttlOption;
Expand All @@ -79,7 +80,21 @@ public TableSchema(StructType dfSchema,
this.quoteIdentifiers = quoteIdentifiers;

validateDataFrameCompatibility(dfSchema, tableInfo);
validateNoSecondaryIndexes(tableInfo);
// If a table has indexes on it, some external process (application, DB, etc.) is responsible for rebuilding
// indexes on the table after the bulk write completes; cassandra does this as part of the SSTable import
// process today. 2i and SAI have different ergonomics here regarding if stale data is served during index build;
// ultimately we want the bulk writer to also write native SAI index files alongside sstables but until
// then, this is allowable and fine for users who Know What They're Doing.
if (!skipSecondaryIndexCheck)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a test if possible that check this behavior, basically the if block

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not explicitly tested with the following?

    @ParameterizedTest
    @MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
    public void testSecondaryIndexAllowedWithSkipCheck(String cassandraVersion)
    {
        TableSchema schema = getValidSchemaBuilder(cassandraVersion)
                .withHasSecondaryIndex()
                .withSkipSecondaryIndexCheck()
                .build();
        assertThat(schema).isNotNull();
    }

As TableSchemaTestCommon.MockTableSchemaBuilder#build calls the TableSchema constructor with 2i enabled + the flag to allow, and testSecondaryIndexIsUnsupprted tests the other case.

Or am I misunderstanding the request?

{
validateNoSecondaryIndexes(tableInfo);
}
else if (tableInfo.hasSecondaryIndex())
{
LOGGER.warn("Bulk writing to tables with SecondaryIndexes will have an asynchronous index rebuild "
+ "take place automatically after writing. Reads against the index during this time "
+ "window will produce inconsistent or stale results until index rebuild is complete.");
}
validateUserAddedColumns(lowestCassandraVersion, quoteIdentifiers, ttlOption, timestampOption);

this.createStatement = getCreateStatement(tableInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,12 @@ public enum WriterOptions implements WriterOption
* - a failure otherwise
*/
JOB_TIMEOUT_SECONDS,
/**
* Option to bypass the secondary index validation check during bulk write job setup.
* By default, bulk writes to tables with secondary indexes are rejected.
* Setting this option to {@code true} allows bulk writes to proceed on tables that have secondary indexes,
* with the understanding that the secondary indexes will NOT be updated by the bulk write and must be
* rebuilt separately after the job completes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does C* detect and rebuild them, or does the user need to run a command to trigger the rebuild?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C* automatically rebuilds them on import. Part of the SSTableImporter logic paths.

*/
SKIP_SECONDARY_INDEX_CHECK,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about defining it in the spark conf, instead of write options?

The rationale is that the toggle is to for advanced use case and not directly related to the write behavior. There is another existing spark conf, org.apache.cassandra.spark.bulkwriter.BulkSparkConf#SKIP_CLEAN that skips cleaning up SSTable when job fails.

Admittedly, there are some existing inconsistency of where to have conf and where to have write options in the project.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would putting this in BulkSparkConf force it session-wide? i.e. a user would lose the ability to reason about and easily configure this setting on a per-table / per operation basis vs. the public exposure of it via WriterOptions?

My intuition right now is that this is something that probably should have been enabled by default all this time w/a configurable guardrail to turn it off, so while I'm sympathetic to the idea of taking a small step from "don't allow it" to "allow it but make it hard to use", the risk of this being easily accessible for users is that they'll bulk write to a table that will then have a long running index building operation happen in the background. Which, other than "load on node" and "application might read a partial index if you don't have automation that clearly delineates when a bulk insert and reindex finish from application accessing it", doesn't represent a structural or correctness risk to the data.

Copy link
Contributor

@yifan-c yifan-c Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Josh explained to me offline too. I think it makes sense to have it as part of writer option. Especially in the scenario when the spark job has multiple write sessions, writer option allows finer control to toggle the flag on specific tables / write sessions.

}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ public void testSecondaryIndexIsUnsupported() throws Exception
.hasMessage("Bulkwriter doesn't support secondary indexes");
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
public void testSecondaryIndexAllowedWithSkipCheck(String cassandraVersion)
{
TableSchema schema = getValidSchemaBuilder(cassandraVersion)
.withHasSecondaryIndex()
.withSkipSecondaryIndexCheck()
.build();
assertThat(schema).isNotNull();
}

@ParameterizedTest
@MethodSource("org.apache.cassandra.bridge.VersionRunner#supportedVersions")
public void testMixedCaseTTLColumnNameWithoutQuoteIdentifiersFails(String cassandraVersion)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ public static class MockTableSchemaBuilder
private TTLOption ttlOption = TTLOption.forever();
private TimestampOption timestampOption = TimestampOption.now();
private boolean quoteIdentifiers = false;
private boolean skipSecondaryIndexCheck = false;
private boolean hasSecondaryIndex = false;

public MockTableSchemaBuilder(CassandraBridge bridge)
{
Expand Down Expand Up @@ -270,6 +272,18 @@ public MockTableSchemaBuilder withQuotedIdentifiers()
return this;
}

public MockTableSchemaBuilder withSkipSecondaryIndexCheck()
{
this.skipSecondaryIndexCheck = true;
return this;
}

public MockTableSchemaBuilder withHasSecondaryIndex()
{
this.hasSecondaryIndex = true;
return this;
}

private ImmutableMap<String, CqlField.CqlType> addColumnToCqlColumns(ImmutableMap<String, CqlField.CqlType> currentColumns,
String columnName,
String cqlType)
Expand Down Expand Up @@ -315,8 +329,16 @@ public TableSchema build()
partitionKeyColumnTypes,
primaryKeyColumnNames,
cassandraVersion,
quoteIdentifiers);
return new TableSchema(dataFrameSchema, tableInfoProvider, writeMode, ttlOption, timestampOption, cassandraVersion, quoteIdentifiers);
quoteIdentifiers,
hasSecondaryIndex);
return new TableSchema(dataFrameSchema,
tableInfoProvider,
writeMode,
ttlOption,
timestampOption,
cassandraVersion,
quoteIdentifiers,
skipSecondaryIndexCheck);
}
}

Expand All @@ -333,6 +355,7 @@ public static class MockTableInfoProvider implements TableInfoProvider
Map<String, CqlField.CqlType> columns;
private final String cassandraVersion;
private final boolean quoteIdentifiers;
private final boolean hasSecondaryIndex;

public MockTableInfoProvider(CassandraBridge bridge,
ImmutableMap<String, CqlField.CqlType> cqlColumns,
Expand All @@ -341,6 +364,18 @@ public MockTableInfoProvider(CassandraBridge bridge,
String[] primaryKeyColumnNames,
String cassandraVersion,
boolean quoteIdentifiers)
{
this(bridge, cqlColumns, partitionKeyColumns, partitionKeyColumnTypes, primaryKeyColumnNames, cassandraVersion, quoteIdentifiers, false);
}

public MockTableInfoProvider(CassandraBridge bridge,
ImmutableMap<String, CqlField.CqlType> cqlColumns,
String[] partitionKeyColumns,
ColumnType[] partitionKeyColumnTypes,
String[] primaryKeyColumnNames,
String cassandraVersion,
boolean quoteIdentifiers,
boolean hasSecondaryIndex)
{
this.bridge = bridge;
this.cqlColumns = cqlColumns;
Expand All @@ -350,6 +385,7 @@ public MockTableInfoProvider(CassandraBridge bridge,
columns = cqlColumns;
this.cassandraVersion = cassandraVersion.replaceAll("(\\w+-)*cassandra-", "");
this.quoteIdentifiers = quoteIdentifiers;
this.hasSecondaryIndex = hasSecondaryIndex;
this.uniqueTableName = TEST_TABLE_PREFIX + TEST_TABLE_ID.getAndIncrement();
}

Expand Down Expand Up @@ -439,7 +475,7 @@ public String getKeyspaceName()
@Override
public boolean hasSecondaryIndex()
{
return false;
return hasSecondaryIndex;
}

@Override
Expand Down