diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index e15deaef..6a04b694 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -131,21 +131,14 @@ public class FlatPostgresCollection extends PostgresCollection { this.createdTsColumn = createdTs; this.lastUpdatedTsColumn = lastUpdatedTs; - if (this.createdTsColumn == null) { + if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) { LOGGER.warn( "timestampFields config not set properly for collection '{}'. " - + "Row created timestamp will not be auto-managed. " - + "Configure via collectionConfigs.{}.timestampFields {{ created = \"\", lastUpdated = \"\" }}", - collectionName, - collectionName); - } - - if (this.lastUpdatedTsColumn == null) { - LOGGER.warn( - "timestampFields config not set properly for collection '{}'. " - + "Row lastUpdated timestamp will not be auto-managed. " + + "createdTsColumn: {}, lastUpdatedTsColumn: {}. " + "Configure via collectionConfigs.{}.timestampFields {{ created = \"\", lastUpdated = \"\" }}", collectionName, + this.createdTsColumn, + this.lastUpdatedTsColumn, collectionName); } } @@ -572,11 +565,20 @@ public Optional update( String tableName = tableIdentifier.getTableName(); + long startTime = System.currentTimeMillis(); + long resolveColumnsTime; + long connectionAcquireTime; + long executeUpdateTime; + // Acquire a transactional connection that can be managed manually + long connStart = System.currentTimeMillis(); try (Connection connection = client.getTransactionalConnection()) { + connectionAcquireTime = System.currentTimeMillis() - connStart; try { // 1. Validate all columns exist and operators are supported. + long resolveStart = System.currentTimeMillis(); Map resolvedColumns = resolvePathsToColumns(updates, tableName); + resolveColumnsTime = System.currentTimeMillis() - resolveStart; // 2. Get before-document if needed (only for BEFORE_UPDATE) Optional beforeDoc = Optional.empty(); @@ -589,10 +591,10 @@ public Optional update( } } - // 3. Build and execute UPDATE + long execStart = System.currentTimeMillis(); executeUpdate(connection, query, updates, tableName, resolvedColumns); + executeUpdateTime = System.currentTimeMillis() - execStart; - // 4. Resolve return document based on options Document returnDoc = null; if (returnType == BEFORE_UPDATE) { returnDoc = beforeDoc.orElse(null); @@ -601,6 +603,16 @@ public Optional update( } connection.commit(); + + long totalTime = System.currentTimeMillis() - startTime; + LOGGER.debug( + "update timing - totalMs: {}, resolveColumnsMs: {}, connectionAcquireMs: {}, executeUpdateMs: {}, table: {}", + totalTime, + resolveColumnsTime, + connectionAcquireTime, + executeUpdateTime, + tableName); + return Optional.ofNullable(returnDoc); } catch (Exception e) { @@ -628,19 +640,40 @@ public CloseableIterator bulkUpdate( String tableName = tableIdentifier.getTableName(); CloseableIterator beforeIterator = null; + long startTime = System.currentTimeMillis(); + long resolveColumnsTime; + long connectionAcquireTime; + long executeUpdateTime; + try { ReturnDocumentType returnType = updateOptions.getReturnDocumentType(); + long resolveStart = System.currentTimeMillis(); Map resolvedColumns = resolvePathsToColumns(updates, tableName); + resolveColumnsTime = System.currentTimeMillis() - resolveStart; if (returnType == BEFORE_UPDATE) { beforeIterator = find(query); } + long connStart = System.currentTimeMillis(); try (Connection connection = client.getPooledConnection()) { + connectionAcquireTime = System.currentTimeMillis() - connStart; + + long execStart = System.currentTimeMillis(); executeUpdate(connection, query, updates, tableName, resolvedColumns); + executeUpdateTime = System.currentTimeMillis() - execStart; } + long totalTime = System.currentTimeMillis() - startTime; + LOGGER.debug( + "bulkUpdate timing - totalMs: {}, resolveColumnsMs: {}, connectionAcquireMs: {}, executeUpdateMs: {}, table: {}", + totalTime, + resolveColumnsTime, + connectionAcquireTime, + executeUpdateTime, + tableName); + switch (returnType) { case AFTER_UPDATE: return find(query); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java index 53e1a53f..f8a28361 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java @@ -55,6 +55,7 @@ public class PostgresLazyilyLoadedSchemaRegistry implements SchemaRegistry> cache; @@ -84,9 +85,15 @@ public PostgresLazyilyLoadedSchemaRegistry( @Override public Map load(String tableName) { LOGGER.info("Loading schema for table: {}", tableName); + long startTime = System.currentTimeMillis(); Map updatedSchema = fetcher.fetch(tableName); + long duration = System.currentTimeMillis() - startTime; lastRefreshTimes.put(tableName, Instant.now()); - LOGGER.info("Successfully loading schema for table: {}", tableName); + LOGGER.info( + "Successfully loaded schema for table: {}, columns: {}, durationMs: {}", + tableName, + updatedSchema.size(), + duration); return updatedSchema; } }); @@ -102,7 +109,17 @@ public Map load(String tableName) { @Override public Map getSchema(String tableName) { try { - return cache.get(tableName); + long startTime = System.currentTimeMillis(); + Map schema = cache.get(tableName); + long duration = System.currentTimeMillis() - startTime; + // Only log slow lookups (indicates a cache miss that triggered DB fetch) + if (duration > SLOW_LOOKUP_THRESHOLD_MS) { + LOGGER.debug( + "getSchema slow lookup (likely cache miss) - table: {}, durationMs: {}", + tableName, + duration); + } + return schema; } catch (ExecutionException e) { LOGGER.error("Could not fetch the schema for table from the cache: {}", tableName, e); throw new RuntimeException("Failed to fetch schema for " + tableName, e.getCause()); @@ -148,10 +165,25 @@ public void invalidate(String tableName) { @Override public Optional getColumnOrRefresh(String tableName, String colName) { Map schema = getSchema(tableName); + boolean refreshed = false; if (!schema.containsKey(colName) && canRefresh(tableName)) { + LOGGER.debug( + "Column '{}' not found in cached schema for table '{}', triggering refresh", + colName, + tableName); invalidate(tableName); schema = getSchema(tableName); + refreshed = true; + } + + if (refreshed) { + LOGGER.debug( + "getColumnOrRefresh - table: {}, column: {}, found: {}, refreshed: {}", + tableName, + colName, + schema.containsKey(colName), + refreshed); } return Optional.ofNullable(schema.get(colName));