diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java
index e15deaef..6a04b694 100644
--- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java
+++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java
@@ -131,21 +131,14 @@ public class FlatPostgresCollection extends PostgresCollection {
this.createdTsColumn = createdTs;
this.lastUpdatedTsColumn = lastUpdatedTs;
- if (this.createdTsColumn == null) {
+ if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) {
LOGGER.warn(
"timestampFields config not set properly for collection '{}'. "
- + "Row created timestamp will not be auto-managed. "
- + "Configure via collectionConfigs.{}.timestampFields {{ created = \"
\", lastUpdated = \"\" }}",
- collectionName,
- collectionName);
- }
-
- if (this.lastUpdatedTsColumn == null) {
- LOGGER.warn(
- "timestampFields config not set properly for collection '{}'. "
- + "Row lastUpdated timestamp will not be auto-managed. "
+ + "createdTsColumn: {}, lastUpdatedTsColumn: {}. "
+ "Configure via collectionConfigs.{}.timestampFields {{ created = \"\", lastUpdated = \"\" }}",
collectionName,
+ this.createdTsColumn,
+ this.lastUpdatedTsColumn,
collectionName);
}
}
@@ -572,11 +565,20 @@ public Optional update(
String tableName = tableIdentifier.getTableName();
+ long startTime = System.currentTimeMillis();
+ long resolveColumnsTime;
+ long connectionAcquireTime;
+ long executeUpdateTime;
+
// Acquire a transactional connection that can be managed manually
+ long connStart = System.currentTimeMillis();
try (Connection connection = client.getTransactionalConnection()) {
+ connectionAcquireTime = System.currentTimeMillis() - connStart;
try {
// 1. Validate all columns exist and operators are supported.
+ long resolveStart = System.currentTimeMillis();
Map resolvedColumns = resolvePathsToColumns(updates, tableName);
+ resolveColumnsTime = System.currentTimeMillis() - resolveStart;
// 2. Get before-document if needed (only for BEFORE_UPDATE)
Optional beforeDoc = Optional.empty();
@@ -589,10 +591,10 @@ public Optional update(
}
}
- // 3. Build and execute UPDATE
+ long execStart = System.currentTimeMillis();
executeUpdate(connection, query, updates, tableName, resolvedColumns);
+ executeUpdateTime = System.currentTimeMillis() - execStart;
- // 4. Resolve return document based on options
Document returnDoc = null;
if (returnType == BEFORE_UPDATE) {
returnDoc = beforeDoc.orElse(null);
@@ -601,6 +603,16 @@ public Optional update(
}
connection.commit();
+
+ long totalTime = System.currentTimeMillis() - startTime;
+ LOGGER.debug(
+ "update timing - totalMs: {}, resolveColumnsMs: {}, connectionAcquireMs: {}, executeUpdateMs: {}, table: {}",
+ totalTime,
+ resolveColumnsTime,
+ connectionAcquireTime,
+ executeUpdateTime,
+ tableName);
+
return Optional.ofNullable(returnDoc);
} catch (Exception e) {
@@ -628,19 +640,40 @@ public CloseableIterator bulkUpdate(
String tableName = tableIdentifier.getTableName();
CloseableIterator beforeIterator = null;
+ long startTime = System.currentTimeMillis();
+ long resolveColumnsTime;
+ long connectionAcquireTime;
+ long executeUpdateTime;
+
try {
ReturnDocumentType returnType = updateOptions.getReturnDocumentType();
+ long resolveStart = System.currentTimeMillis();
Map resolvedColumns = resolvePathsToColumns(updates, tableName);
+ resolveColumnsTime = System.currentTimeMillis() - resolveStart;
if (returnType == BEFORE_UPDATE) {
beforeIterator = find(query);
}
+ long connStart = System.currentTimeMillis();
try (Connection connection = client.getPooledConnection()) {
+ connectionAcquireTime = System.currentTimeMillis() - connStart;
+
+ long execStart = System.currentTimeMillis();
executeUpdate(connection, query, updates, tableName, resolvedColumns);
+ executeUpdateTime = System.currentTimeMillis() - execStart;
}
+ long totalTime = System.currentTimeMillis() - startTime;
+ LOGGER.debug(
+ "bulkUpdate timing - totalMs: {}, resolveColumnsMs: {}, connectionAcquireMs: {}, executeUpdateMs: {}, table: {}",
+ totalTime,
+ resolveColumnsTime,
+ connectionAcquireTime,
+ executeUpdateTime,
+ tableName);
+
switch (returnType) {
case AFTER_UPDATE:
return find(query);
diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java
index 53e1a53f..f8a28361 100644
--- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java
+++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresLazyilyLoadedSchemaRegistry.java
@@ -55,6 +55,7 @@ public class PostgresLazyilyLoadedSchemaRegistry implements SchemaRegistry> cache;
@@ -84,9 +85,15 @@ public PostgresLazyilyLoadedSchemaRegistry(
@Override
public Map load(String tableName) {
LOGGER.info("Loading schema for table: {}", tableName);
+ long startTime = System.currentTimeMillis();
Map updatedSchema = fetcher.fetch(tableName);
+ long duration = System.currentTimeMillis() - startTime;
lastRefreshTimes.put(tableName, Instant.now());
- LOGGER.info("Successfully loading schema for table: {}", tableName);
+ LOGGER.info(
+ "Successfully loaded schema for table: {}, columns: {}, durationMs: {}",
+ tableName,
+ updatedSchema.size(),
+ duration);
return updatedSchema;
}
});
@@ -102,7 +109,17 @@ public Map load(String tableName) {
@Override
public Map getSchema(String tableName) {
try {
- return cache.get(tableName);
+ long startTime = System.currentTimeMillis();
+ Map schema = cache.get(tableName);
+ long duration = System.currentTimeMillis() - startTime;
+ // Only log slow lookups (indicates a cache miss that triggered DB fetch)
+ if (duration > SLOW_LOOKUP_THRESHOLD_MS) {
+ LOGGER.debug(
+ "getSchema slow lookup (likely cache miss) - table: {}, durationMs: {}",
+ tableName,
+ duration);
+ }
+ return schema;
} catch (ExecutionException e) {
LOGGER.error("Could not fetch the schema for table from the cache: {}", tableName, e);
throw new RuntimeException("Failed to fetch schema for " + tableName, e.getCause());
@@ -148,10 +165,25 @@ public void invalidate(String tableName) {
@Override
public Optional getColumnOrRefresh(String tableName, String colName) {
Map schema = getSchema(tableName);
+ boolean refreshed = false;
if (!schema.containsKey(colName) && canRefresh(tableName)) {
+ LOGGER.debug(
+ "Column '{}' not found in cached schema for table '{}', triggering refresh",
+ colName,
+ tableName);
invalidate(tableName);
schema = getSchema(tableName);
+ refreshed = true;
+ }
+
+ if (refreshed) {
+ LOGGER.debug(
+ "getColumnOrRefresh - table: {}, column: {}, found: {}, refreshed: {}",
+ tableName,
+ colName,
+ schema.containsKey(colName),
+ refreshed);
}
return Optional.ofNullable(schema.get(colName));