Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,14 @@ public class FlatPostgresCollection extends PostgresCollection {
this.createdTsColumn = createdTs;
this.lastUpdatedTsColumn = lastUpdatedTs;

if (this.createdTsColumn == null) {
if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) {
LOGGER.warn(
"timestampFields config not set properly for collection '{}'. "
+ "Row created timestamp will not be auto-managed. "
+ "Configure via collectionConfigs.{}.timestampFields {{ created = \"<col>\", lastUpdated = \"<col>\" }}",
collectionName,
collectionName);
}

if (this.lastUpdatedTsColumn == null) {
LOGGER.warn(
"timestampFields config not set properly for collection '{}'. "
+ "Row lastUpdated timestamp will not be auto-managed. "
+ "createdTsColumn: {}, lastUpdatedTsColumn: {}. "
+ "Configure via collectionConfigs.{}.timestampFields {{ created = \"<col>\", lastUpdated = \"<col>\" }}",
collectionName,
this.createdTsColumn,
this.lastUpdatedTsColumn,
collectionName);
}
}
Expand Down Expand Up @@ -572,11 +565,20 @@ public Optional<Document> update(

String tableName = tableIdentifier.getTableName();

long startTime = System.currentTimeMillis();
long resolveColumnsTime;
long connectionAcquireTime;
long executeUpdateTime;

// Acquire a transactional connection that can be managed manually
long connStart = System.currentTimeMillis();
try (Connection connection = client.getTransactionalConnection()) {
connectionAcquireTime = System.currentTimeMillis() - connStart;
try {
// 1. Validate all columns exist and operators are supported.
long resolveStart = System.currentTimeMillis();
Map<String, String> resolvedColumns = resolvePathsToColumns(updates, tableName);
resolveColumnsTime = System.currentTimeMillis() - resolveStart;

// 2. Get before-document if needed (only for BEFORE_UPDATE)
Optional<Document> beforeDoc = Optional.empty();
Expand All @@ -589,10 +591,10 @@ public Optional<Document> update(
}
}

// 3. Build and execute UPDATE
long execStart = System.currentTimeMillis();
executeUpdate(connection, query, updates, tableName, resolvedColumns);
executeUpdateTime = System.currentTimeMillis() - execStart;

// 4. Resolve return document based on options
Document returnDoc = null;
if (returnType == BEFORE_UPDATE) {
returnDoc = beforeDoc.orElse(null);
Expand All @@ -601,6 +603,16 @@ public Optional<Document> update(
}

connection.commit();

long totalTime = System.currentTimeMillis() - startTime;
LOGGER.debug(
"update timing - totalMs: {}, resolveColumnsMs: {}, connectionAcquireMs: {}, executeUpdateMs: {}, table: {}",
totalTime,
resolveColumnsTime,
connectionAcquireTime,
executeUpdateTime,
tableName);

return Optional.ofNullable(returnDoc);

} catch (Exception e) {
Expand Down Expand Up @@ -628,19 +640,40 @@ public CloseableIterator<Document> bulkUpdate(
String tableName = tableIdentifier.getTableName();
CloseableIterator<Document> beforeIterator = null;

long startTime = System.currentTimeMillis();
long resolveColumnsTime;
long connectionAcquireTime;
long executeUpdateTime;

try {
ReturnDocumentType returnType = updateOptions.getReturnDocumentType();

long resolveStart = System.currentTimeMillis();
Map<String, String> resolvedColumns = resolvePathsToColumns(updates, tableName);
resolveColumnsTime = System.currentTimeMillis() - resolveStart;

if (returnType == BEFORE_UPDATE) {
beforeIterator = find(query);
}

long connStart = System.currentTimeMillis();
try (Connection connection = client.getPooledConnection()) {
connectionAcquireTime = System.currentTimeMillis() - connStart;

long execStart = System.currentTimeMillis();
executeUpdate(connection, query, updates, tableName, resolvedColumns);
executeUpdateTime = System.currentTimeMillis() - execStart;
}

long totalTime = System.currentTimeMillis() - startTime;
LOGGER.debug(
"bulkUpdate timing - totalMs: {}, resolveColumnsMs: {}, connectionAcquireMs: {}, executeUpdateMs: {}, table: {}",
totalTime,
resolveColumnsTime,
connectionAcquireTime,
executeUpdateTime,
tableName);

switch (returnType) {
case AFTER_UPDATE:
return find(query);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class PostgresLazyilyLoadedSchemaRegistry implements SchemaRegistry<Postg

private static final Logger LOGGER =
LoggerFactory.getLogger(PostgresLazyilyLoadedSchemaRegistry.class);
private static final int SLOW_LOOKUP_THRESHOLD_MS = 100;

// The cache registry - Key: Table name, value: Map of column name to column metadata
private final LoadingCache<String, Map<String, PostgresColumnMetadata>> cache;
Expand Down Expand Up @@ -84,9 +85,15 @@ public PostgresLazyilyLoadedSchemaRegistry(
@Override
public Map<String, PostgresColumnMetadata> load(String tableName) {
LOGGER.info("Loading schema for table: {}", tableName);
long startTime = System.currentTimeMillis();
Map<String, PostgresColumnMetadata> updatedSchema = fetcher.fetch(tableName);
long duration = System.currentTimeMillis() - startTime;
lastRefreshTimes.put(tableName, Instant.now());
LOGGER.info("Successfully loading schema for table: {}", tableName);
LOGGER.info(
"Successfully loaded schema for table: {}, columns: {}, durationMs: {}",
tableName,
updatedSchema.size(),
duration);
return updatedSchema;
}
});
Expand All @@ -102,7 +109,17 @@ public Map<String, PostgresColumnMetadata> load(String tableName) {
@Override
public Map<String, PostgresColumnMetadata> getSchema(String tableName) {
try {
return cache.get(tableName);
long startTime = System.currentTimeMillis();
Map<String, PostgresColumnMetadata> schema = cache.get(tableName);
long duration = System.currentTimeMillis() - startTime;
// Only log slow lookups (indicates a cache miss that triggered DB fetch)
if (duration > SLOW_LOOKUP_THRESHOLD_MS) {
LOGGER.debug(
"getSchema slow lookup (likely cache miss) - table: {}, durationMs: {}",
tableName,
duration);
}
return schema;
} catch (ExecutionException e) {
LOGGER.error("Could not fetch the schema for table from the cache: {}", tableName, e);
throw new RuntimeException("Failed to fetch schema for " + tableName, e.getCause());
Expand Down Expand Up @@ -148,10 +165,25 @@ public void invalidate(String tableName) {
@Override
public Optional<PostgresColumnMetadata> getColumnOrRefresh(String tableName, String colName) {
Map<String, PostgresColumnMetadata> schema = getSchema(tableName);
boolean refreshed = false;

if (!schema.containsKey(colName) && canRefresh(tableName)) {
LOGGER.debug(
"Column '{}' not found in cached schema for table '{}', triggering refresh",
colName,
tableName);
invalidate(tableName);
schema = getSchema(tableName);
refreshed = true;
}

if (refreshed) {
LOGGER.debug(
"getColumnOrRefresh - table: {}, column: {}, found: {}, refreshed: {}",
tableName,
colName,
schema.containsKey(colName),
refreshed);
}

return Optional.ofNullable(schema.get(colName));
Expand Down
Loading