diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 5e8a26d0..7bf4e417 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -16,8 +16,10 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -804,28 +806,622 @@ void testCreateOrReplaceWithJsonbField() throws Exception { class BulkOperationTests { @Test - @DisplayName("Should throw UnsupportedOperationException for bulkUpsert") - void testBulkUpsert() { - Map bulkMap = new HashMap<>(); - ObjectNode node = OBJECT_MAPPER.createObjectNode(); - node.put("_id", 101); - node.put("item", "BulkItem101"); - bulkMap.put(new SingleValueKey("default", "101"), new JSONDocument(node)); + @DisplayName("Should bulk upsert multiple new documents") + void testBulkUpsertNewDocuments() throws Exception { + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); + node1.put("item", "BulkItem101"); + node1.put("price", 101); + node1.put("quantity", 10); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-101"), new JSONDocument(node1)); + + ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); + node2.put("item", "BulkItem102"); + node2.put("price", 102); + node2.put("quantity", 20); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-102"), new JSONDocument(node2)); + + ObjectNode node3 = OBJECT_MAPPER.createObjectNode(); + node3.put("item", "BulkItem103"); + node3.put("price", 103); + node3.put("in_stock", true); + ObjectNode props = OBJECT_MAPPER.createObjectNode(); + props.put("color", "red"); + props.put("size", "large"); + node3.set("props", props); + node3.putArray("tags").add("electronics").add("sale"); + node3.putArray("numbers").add(1).add(2).add(3); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-103"), new JSONDocument(node3)); + + boolean result = flatCollection.bulkUpsert(bulkMap); + + assertTrue(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-101"), + rs -> { + assertTrue(rs.next()); + assertEquals("BulkItem101", rs.getString("item")); + assertEquals(101, rs.getInt("price")); + assertEquals(10, rs.getInt("quantity")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-102"), + rs -> { + assertTrue(rs.next()); + assertEquals("BulkItem102", rs.getString("item")); + assertEquals(102, rs.getInt("price")); + assertEquals(20, rs.getInt("quantity")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-103"), + rs -> { + assertTrue(rs.next()); + assertEquals("BulkItem103", rs.getString("item")); + assertEquals(103, rs.getInt("price")); + assertTrue(rs.getBoolean("in_stock")); + + // Verify JSONB column + JsonNode propsResult = OBJECT_MAPPER.readTree(rs.getString("props")); + assertEquals("red", propsResult.get("color").asText()); + assertEquals("large", propsResult.get("size").asText()); + + // Verify array columns + String[] tags = (String[]) rs.getArray("tags").getArray(); + assertEquals(2, tags.length); + assertEquals("electronics", tags[0]); - assertThrows(UnsupportedOperationException.class, () -> flatCollection.bulkUpsert(bulkMap)); + Integer[] numbers = (Integer[]) rs.getArray("numbers").getArray(); + assertEquals(3, numbers.length); + assertEquals(1, numbers[0]); + }); } @Test - @DisplayName("Should throw UnsupportedOperationException for bulkUpsertAndReturnOlderDocuments") - void testBulkUpsertAndReturnOlderDocuments() { - Map bulkMap = new HashMap<>(); + @DisplayName("Should bulk upsert updating existing documents") + void testBulkUpsertUpdatesExistingDocuments() throws Exception { + // First create some documents + String docId1 = "bulk-update-1"; + String docId2 = "bulk-update-2"; + + ObjectNode initial1 = OBJECT_MAPPER.createObjectNode(); + initial1.put("item", "Original1"); + initial1.put("price", 100); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1)); + + ObjectNode initial2 = OBJECT_MAPPER.createObjectNode(); + initial2.put("item", "Original2"); + initial2.put("price", 200); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2)); + + // Now bulk upsert with updates + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode updated1 = OBJECT_MAPPER.createObjectNode(); + updated1.put("item", "Updated1"); + updated1.put("price", 999); + updated1.put("quantity", 50); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1)); + + ObjectNode updated2 = OBJECT_MAPPER.createObjectNode(); + updated2.put("item", "Updated2"); + updated2.put("price", 888); + updated2.put("in_stock", true); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2)); + + boolean result = flatCollection.bulkUpsert(bulkMap); + + assertTrue(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId1), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated1", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + assertEquals(50, rs.getInt("quantity")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId2), + rs -> { + assertTrue(rs.next()); + assertEquals("Updated2", rs.getString("item")); + assertEquals(888, rs.getInt("price")); + assertTrue(rs.getBoolean("in_stock")); + }); + } + + @Test + @DisplayName("Should bulk upsert with mixed inserts and updates") + void testBulkUpsertMixedInsertAndUpdate() throws Exception { + // Create one existing document + String existingId = "bulk-mixed-existing"; + ObjectNode existing = OBJECT_MAPPER.createObjectNode(); + existing.put("item", "ExistingItem"); + existing.put("price", 100); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(existing)); + + // Bulk upsert: update existing + insert new + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode updatedExisting = OBJECT_MAPPER.createObjectNode(); + updatedExisting.put("item", "UpdatedExisting"); + updatedExisting.put("price", 555); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(updatedExisting)); + + ObjectNode newDoc = OBJECT_MAPPER.createObjectNode(); + newDoc.put("item", "NewItem"); + newDoc.put("price", 777); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-mixed-new"), new JSONDocument(newDoc)); + + boolean result = flatCollection.bulkUpsert(bulkMap); + + assertTrue(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, existingId), + rs -> { + assertTrue(rs.next()); + assertEquals("UpdatedExisting", rs.getString("item")); + assertEquals(555, rs.getInt("price")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-mixed-new"), + rs -> { + assertTrue(rs.next()); + assertEquals("NewItem", rs.getString("item")); + assertEquals(777, rs.getInt("price")); + }); + } + + @Test + @DisplayName("Should handle empty document map") + void testBulkUpsertEmptyMap() { + Map emptyMap = Collections.emptyMap(); + boolean result = flatCollection.bulkUpsert(emptyMap); + assertTrue(result); + } + + @Test + @DisplayName("Should handle null document map") + void testBulkUpsertNullMap() { + boolean result = flatCollection.bulkUpsert(null); + assertTrue(result); + } + + @Test + @DisplayName("Should bulk upsert documents with different column sets") + void testBulkUpsertDocumentsWithDifferentColumns() throws Exception { + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); + node1.put("item", "ItemWithPrice"); + node1.put("price", 100); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "diff-cols-1"), new JSONDocument(node1)); + + ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); + node2.put("item", "ItemWithQuantity"); + node2.put("quantity", 50); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "diff-cols-2"), new JSONDocument(node2)); + + ObjectNode node3 = OBJECT_MAPPER.createObjectNode(); + node3.put("item", "ItemWithAll"); + node3.put("price", 200); + node3.put("quantity", 30); + node3.put("in_stock", true); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "diff-cols-3"), new JSONDocument(node3)); + + boolean result = flatCollection.bulkUpsert(bulkMap); + + assertTrue(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "diff-cols-1"), + rs -> { + assertTrue(rs.next()); + assertEquals("ItemWithPrice", rs.getString("item")); + assertEquals(100, rs.getInt("price")); + assertEquals(0, rs.getInt("quantity")); + assertTrue(rs.wasNull()); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "diff-cols-2"), + rs -> { + assertTrue(rs.next()); + assertEquals("ItemWithQuantity", rs.getString("item")); + assertEquals(0, rs.getInt("price")); + assertTrue(rs.wasNull()); + assertEquals(50, rs.getInt("quantity")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "diff-cols-3"), + rs -> { + assertTrue(rs.next()); + assertEquals("ItemWithAll", rs.getString("item")); + assertEquals(200, rs.getInt("price")); + assertEquals(30, rs.getInt("quantity")); + assertTrue(rs.getBoolean("in_stock")); + }); + } + + @Test + @DisplayName("Should skip unknown fields in bulk upsert") + void testBulkUpsertSkipsUnknownFields() throws Exception { + Map bulkMap = new LinkedHashMap<>(); + ObjectNode node = OBJECT_MAPPER.createObjectNode(); - node.put("_id", 101); - bulkMap.put(new SingleValueKey("default", "101"), new JSONDocument(node)); + node.put("item", "ItemWithUnknown"); + node.put("price", 100); + node.put("unknown_field", "should be skipped"); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "bulk-unknown-field"), new JSONDocument(node)); - assertThrows( - UnsupportedOperationException.class, - () -> flatCollection.bulkUpsertAndReturnOlderDocuments(bulkMap)); + boolean result = flatCollection.bulkUpsert(bulkMap); + + assertTrue(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "bulk-unknown-field"), + rs -> { + assertTrue(rs.next()); + assertEquals("ItemWithUnknown", rs.getString("item")); + assertEquals(100, rs.getInt("price")); + }); + } + + @Test + @DisplayName("Should ignore documents with unknown fields when IGNORE_DOCUMENT strategy") + void testBulkUpsertIgnoreDocumentStrategy() throws Exception { + Collection collectionWithIgnoreStrategy = + getFlatCollectionWithStrategy(MissingColumnStrategy.IGNORE_DOCUMENT.toString()); + + Map bulkMap = new LinkedHashMap<>(); + + // Doc with unknown field - should be ignored + ObjectNode nodeWithUnknown = OBJECT_MAPPER.createObjectNode(); + nodeWithUnknown.put("item", "ShouldBeIgnored"); + nodeWithUnknown.put("price", 100); + nodeWithUnknown.put("unknown_field", "causes ignore"); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "ignore-doc-1"), new JSONDocument(nodeWithUnknown)); + + // Doc without unknown field - should be upserted + ObjectNode validNode = OBJECT_MAPPER.createObjectNode(); + validNode.put("item", "ValidItem"); + validNode.put("price", 200); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "ignore-doc-2"), new JSONDocument(validNode)); + + boolean result = collectionWithIgnoreStrategy.bulkUpsert(bulkMap); + + assertTrue(result); + + // First doc should NOT exist (was ignored) + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "ignore-doc-1"), + rs -> { + assertFalse(rs.next()); + }); + + // Second doc should exist + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "ignore-doc-2"), + rs -> { + assertTrue(rs.next()); + assertEquals("ValidItem", rs.getString("item")); + assertEquals(200, rs.getInt("price")); + }); + } + + @Test + @DisplayName("Should return false when document has invalid JSON (IOException)") + void testBulkUpsertWithInvalidJsonDocument() { + Map bulkMap = new LinkedHashMap<>(); + + // Create a Document that returns invalid JSON + Document invalidJsonDoc = + new Document() { + @Override + public String toJson() { + return "{ invalid json without closing brace"; + } + }; + + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "invalid-json-doc"), invalidJsonDoc); + + // Should return false due to IOException during parsing + boolean result = flatCollection.bulkUpsert(bulkMap); + + assertFalse(result); + } + + @Test + @DisplayName("Should return false when batch execution fails (BatchUpdateException)") + void testBulkUpsertBatchUpdateException() throws Exception { + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + + String addConstraintSQL = + String.format( + "ALTER TABLE \"%s\" ADD CONSTRAINT price_positive CHECK (\"price\" > 0)", + FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(addConstraintSQL)) { + ps.execute(); + LOGGER.info("Added CHECK constraint: price must be positive"); + } + + try { + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode node = OBJECT_MAPPER.createObjectNode(); + node.put("item", "NegativePriceItem"); + node.put("price", -100); // Violates CHECK constraint + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "negative-price-doc"), new JSONDocument(node)); + + // Should return false due to BatchUpdateException + boolean result = flatCollection.bulkUpsert(bulkMap); + + assertFalse(result); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "negative-price-doc"), + rs -> { + assertFalse(rs.next()); + }); + + } finally { + // Clean up: remove the CHECK constraint + String dropConstraintSQL = + String.format( + "ALTER TABLE \"%s\" DROP CONSTRAINT price_positive", FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(dropConstraintSQL)) { + ps.execute(); + LOGGER.info("Removed CHECK constraint"); + } + } + } + + @Test + @DisplayName("Should return empty iterator for null document map") + void testBulkUpsertAndReturnOlderDocumentsNullMap() throws Exception { + CloseableIterator result = flatCollection.bulkUpsertAndReturnOlderDocuments(null); + assertFalse(result.hasNext()); + result.close(); + } + + @Test + @DisplayName("Should return empty iterator for empty document map") + void testBulkUpsertAndReturnOlderDocumentsEmptyMap() throws Exception { + CloseableIterator result = + flatCollection.bulkUpsertAndReturnOlderDocuments(Collections.emptyMap()); + assertFalse(result.hasNext()); + result.close(); + } + + @Test + @DisplayName("Should return empty iterator when upserting new documents (no old docs exist)") + void testBulkUpsertAndReturnOlderDocumentsNewDocs() throws Exception { + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode node1 = OBJECT_MAPPER.createObjectNode(); + node1.put("item", "NewItem1"); + node1.put("price", 100); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "return-old-new-1"), new JSONDocument(node1)); + + ObjectNode node2 = OBJECT_MAPPER.createObjectNode(); + node2.put("item", "NewItem2"); + node2.put("price", 200); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "return-old-new-2"), new JSONDocument(node2)); + + CloseableIterator result = + flatCollection.bulkUpsertAndReturnOlderDocuments(bulkMap); + + // No old documents should be returned since these are new inserts + assertFalse(result.hasNext()); + result.close(); + + // Verify documents were inserted + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "return-old-new-1"), + rs -> { + assertTrue(rs.next()); + assertEquals("NewItem1", rs.getString("item")); + assertEquals(100, rs.getInt("price")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "return-old-new-2"), + rs -> { + assertTrue(rs.next()); + assertEquals("NewItem2", rs.getString("item")); + assertEquals(200, rs.getInt("price")); + }); + } + + @Test + @DisplayName("Should return old documents when updating existing documents") + void testBulkUpsertAndReturnOlderDocumentsExistingDocs() throws Exception { + // First create some documents + String docId1 = "return-old-existing-1"; + String docId2 = "return-old-existing-2"; + + ObjectNode initial1 = OBJECT_MAPPER.createObjectNode(); + initial1.put("item", "OldItem1"); + initial1.put("price", 100); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(initial1)); + + ObjectNode initial2 = OBJECT_MAPPER.createObjectNode(); + initial2.put("item", "OldItem2"); + initial2.put("price", 200); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(initial2)); + + // Now bulk upsert with updates + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode updated1 = OBJECT_MAPPER.createObjectNode(); + updated1.put("item", "UpdatedItem1"); + updated1.put("price", 999); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId1), new JSONDocument(updated1)); + + ObjectNode updated2 = OBJECT_MAPPER.createObjectNode(); + updated2.put("item", "UpdatedItem2"); + updated2.put("price", 888); + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, docId2), new JSONDocument(updated2)); + + CloseableIterator result = + flatCollection.bulkUpsertAndReturnOlderDocuments(bulkMap); + + // Collect old documents + List oldDocs = new ArrayList<>(); + while (result.hasNext()) { + oldDocs.add(result.next()); + } + result.close(); + + // Should have 2 old documents + assertEquals(2, oldDocs.size()); + + // Verify old documents contain original values + Map oldDocsByItem = new HashMap<>(); + for (Document doc : oldDocs) { + JsonNode json = OBJECT_MAPPER.readTree(doc.toJson()); + oldDocsByItem.put(json.get("item").asText(), json); + } + + assertTrue(oldDocsByItem.containsKey("OldItem1")); + assertEquals(100, oldDocsByItem.get("OldItem1").get("price").asInt()); + + assertTrue(oldDocsByItem.containsKey("OldItem2")); + assertEquals(200, oldDocsByItem.get("OldItem2").get("price").asInt()); + + // Verify documents were updated in DB + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId1), + rs -> { + assertTrue(rs.next()); + assertEquals("UpdatedItem1", rs.getString("item")); + assertEquals(999, rs.getInt("price")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, docId2), + rs -> { + assertTrue(rs.next()); + assertEquals("UpdatedItem2", rs.getString("item")); + assertEquals(888, rs.getInt("price")); + }); + } + + @Test + @DisplayName("Should return only existing old documents in mixed insert/update scenario") + void testBulkUpsertAndReturnOlderDocumentsMixed() throws Exception { + // Create one existing document + String existingId = "return-old-mixed-existing"; + + ObjectNode existing = OBJECT_MAPPER.createObjectNode(); + existing.put("item", "ExistingItem"); + existing.put("price", 500); + flatCollection.createOrReplace( + new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(existing)); + + // Bulk upsert: update existing + insert new + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode updatedExisting = OBJECT_MAPPER.createObjectNode(); + updatedExisting.put("item", "UpdatedExisting"); + updatedExisting.put("price", 555); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, existingId), new JSONDocument(updatedExisting)); + + ObjectNode newDoc = OBJECT_MAPPER.createObjectNode(); + newDoc.put("item", "NewItem"); + newDoc.put("price", 777); + bulkMap.put( + new SingleValueKey(DEFAULT_TENANT, "return-old-mixed-new"), new JSONDocument(newDoc)); + + CloseableIterator result = + flatCollection.bulkUpsertAndReturnOlderDocuments(bulkMap); + + // Should only return the one existing document (not the new one) + List oldDocs = new ArrayList<>(); + while (result.hasNext()) { + oldDocs.add(result.next()); + } + result.close(); + + assertEquals(1, oldDocs.size()); + + JsonNode oldDoc = OBJECT_MAPPER.readTree(oldDocs.get(0).toJson()); + assertEquals("ExistingItem", oldDoc.get("item").asText()); + assertEquals(500, oldDoc.get("price").asInt()); + + // Verify both documents exist in DB with new values + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, existingId), + rs -> { + assertTrue(rs.next()); + assertEquals("UpdatedExisting", rs.getString("item")); + assertEquals(555, rs.getInt("price")); + }); + + queryAndAssert( + new SingleValueKey(DEFAULT_TENANT, "return-old-mixed-new"), + rs -> { + assertTrue(rs.next()); + assertEquals("NewItem", rs.getString("item")); + assertEquals(777, rs.getInt("price")); + }); + } + + @Test + @DisplayName("Should throw IOException when bulkUpsert fails") + void testBulkUpsertAndReturnOlderDocumentsUpsertFailure() throws Exception { + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + + // Add a CHECK constraint to force upsert failure + String addConstraintSQL = + String.format( + "ALTER TABLE \"%s\" ADD CONSTRAINT price_positive_return CHECK (\"price\" > 0)", + FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(addConstraintSQL)) { + ps.execute(); + } + + try { + Map bulkMap = new LinkedHashMap<>(); + + ObjectNode node = OBJECT_MAPPER.createObjectNode(); + node.put("item", "NegativePriceItem"); + node.put("price", -100); // Violates CHECK constraint + bulkMap.put(new SingleValueKey(DEFAULT_TENANT, "return-old-fail"), new JSONDocument(node)); + + assertThrows( + IOException.class, () -> flatCollection.bulkUpsertAndReturnOlderDocuments(bulkMap)); + + } finally { + // Clean up: remove the CHECK constraint + String dropConstraintSQL = + String.format( + "ALTER TABLE \"%s\" DROP CONSTRAINT price_positive_return", FLAT_COLLECTION_NAME); + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = conn.prepareStatement(dropConstraintSQL)) { + ps.execute(); + } + } } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 430dd36b..26cc0e7e 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -9,6 +9,7 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.sql.Array; +import java.sql.BatchUpdateException; import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; @@ -191,13 +192,187 @@ public boolean deleteSubDoc(Key key, String subDocPath) { @Override public boolean bulkUpsert(Map documents) { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + if (documents == null || documents.isEmpty()) { + return true; + } + + String tableName = tableIdentifier.getTableName(); + String pkColumn = getPKForTable(tableName); + String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn); + PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn); + + try { + // Parse all documents and collect the union of all columns. This is because we can have + // different docs with different sets of cols, so we do this to create a single upsert SQL + Map parsedDocuments = new LinkedHashMap<>(); + Set allColumns = new LinkedHashSet<>(); + allColumns.add(quotedPkColumn); + + List ignoredDocuments = new ArrayList<>(); + for (Map.Entry entry : documents.entrySet()) { + List skippedFields = new ArrayList<>(); + TypedDocument parsed = parseDocument(entry.getValue(), tableName, skippedFields); + + // Handle IGNORE_DOCUMENT strategy: skip docs with unknown fields + if (missingColumnStrategy == MissingColumnStrategy.IGNORE_DOCUMENT + && !skippedFields.isEmpty()) { + ignoredDocuments.add(entry.getKey()); + continue; + } + + parsed.add(quotedPkColumn, entry.getKey().toString(), pkType, false); + parsedDocuments.put(entry.getKey(), parsed); + allColumns.addAll(parsed.getColumns()); + } + + if (!ignoredDocuments.isEmpty()) { + LOGGER.info( + "bulkUpsert: Ignored {} documents due to IGNORE_DOCUMENT strategy. Keys: {}", + ignoredDocuments.size(), + ignoredDocuments); + } + + // If all documents were ignored, return true (nothing to do) + if (parsedDocuments.isEmpty()) { + return true; + } + + // Build the bulk upsert SQL with all columns + List columnList = new ArrayList<>(allColumns); + String sql = buildBulkUpsertSql(columnList, quotedPkColumn); + LOGGER.debug("Bulk upsert SQL: {}", sql); + + try (Connection conn = client.getPooledConnection(); + PreparedStatement ps = conn.prepareStatement(sql)) { + + for (Map.Entry entry : parsedDocuments.entrySet()) { + TypedDocument parsed = entry.getValue(); + int index = 1; + + for (String column : columnList) { + if (parsed.getColumns().contains(column)) { + setParameter( + conn, + ps, + index++, + parsed.getValue(column), + parsed.getType(column), + parsed.isArray(column)); + } else { + ps.setObject(index++, null); + } + } + ps.addBatch(); + } + + int[] results = ps.executeBatch(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Bulk upsert results: {}", Arrays.toString(results)); + } + return true; + } + + } catch (BatchUpdateException e) { + LOGGER.error("BatchUpdateException in bulkUpsert", e); + } catch (SQLException e) { + LOGGER.error( + "SQLException in bulkUpsert. SQLState: {} Error Code: {}", + e.getSQLState(), + e.getErrorCode(), + e); + } catch (IOException e) { + LOGGER.error("IOException in bulkUpsert. documents: {}", documents, e); + } + + return false; + } + + /** + * Builds a PostgreSQL bulk upsert SQL statement for batch execution. + * + * @param columns List of quoted column names (PK should be first) + * @param pkColumn The quoted primary key column name + * @return The upsert SQL statement + */ + private String buildBulkUpsertSql(List columns, String pkColumn) { + String columnList = String.join(", ", columns); + String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new)); + + // Build SET clause for non-PK columns: col = EXCLUDED.col (this ensures that on conflict, the + // new value is picked) + String setClause = + columns.stream() + .filter(col -> !col.equals(pkColumn)) + .map(col -> col + " = EXCLUDED." + col) + .collect(Collectors.joining(", ")); + + return String.format( + "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO UPDATE SET %s", + tableIdentifier, columnList, placeholders, pkColumn, setClause); } @Override public CloseableIterator bulkUpsertAndReturnOlderDocuments(Map documents) throws IOException { - throw new UnsupportedOperationException(WRITE_NOT_SUPPORTED); + if (documents == null || documents.isEmpty()) { + return CloseableIterator.emptyIterator(); + } + + String tableName = tableIdentifier.getTableName(); + String pkColumn = getPKForTable(tableName); + String quotedPkColumn = PostgresUtils.wrapFieldNamesWithDoubleQuotes(pkColumn); + PostgresDataType pkType = getPrimaryKeyType(tableName, pkColumn); + + Connection connection = null; + try { + connection = client.getPooledConnection(); + + PreparedStatement preparedStatement = + getPreparedStatementForQuery(documents, quotedPkColumn, connection, pkType); + + ResultSet resultSet = preparedStatement.executeQuery(); + + boolean upsertResult = bulkUpsert(documents); + if (!upsertResult) { + closeConnection(connection); + throw new IOException("Bulk upsert failed"); + } + + // note that connection will be closed after the iterator is used by the client + return new PostgresCollection.PostgresResultIteratorWithBasicTypes( + resultSet, connection, DocumentType.FLAT); + + } catch (SQLException e) { + LOGGER.error("SQLException in bulkUpsertAndReturnOlderDocuments", e); + closeConnection(connection); + throw new IOException("Could not bulk upsert the documents.", e); + } + } + + private static void closeConnection(Connection connection) { + if (connection != null) { + try { + connection.close(); + } catch (SQLException closeEx) { + LOGGER.warn("Error closing connection after exception", closeEx); + } + } + } + + private PreparedStatement getPreparedStatementForQuery( + Map documents, + String quotedPkColumn, + Connection connection, + PostgresDataType pkType) + throws SQLException { + String selectQuery = + String.format("SELECT * FROM %s WHERE %s = ANY(?)", tableIdentifier, quotedPkColumn); + PreparedStatement preparedStatement = connection.prepareStatement(selectQuery); + + String[] keyArray = documents.keySet().stream().map(Key::toString).toArray(String[]::new); + Array sqlArray = connection.createArrayOf(pkType.getSqlType(), keyArray); + preparedStatement.setArray(1, sqlArray); + return preparedStatement; } @Override