From 45f79ba7cf8032fd6f04c3eda5f96fd30d17ffc4 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 9 Feb 2026 12:20:45 +0530 Subject: [PATCH 01/16] WIP --- .../FlatCollectionWriteTest.java | 166 +++++++++++++++++- .../commons/FlatStoreConstants.java | 7 + .../postgres/FlatPostgresCollection.java | 126 ++++++++++++- 3 files changed, 296 insertions(+), 3 deletions(-) create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 7bf4e417..05f06e05 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -16,6 +16,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -94,6 +95,10 @@ public static void init() throws IOException { postgresConfig.put("url", postgresConnectionUrl); postgresConfig.put("user", "postgres"); postgresConfig.put("password", "postgres"); + // Configure timestamp fields for auto-managed document timestamps + postgresConfig.put( + "customParams.timestampFields", + "{\"docCreatedTsCol\": \"createdTime\", \"docLastUpdatedTsCol\": \"lastUpdateTime\"}"); postgresDatastore = DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(postgresConfig)); @@ -124,7 +129,9 @@ private static void createFlatCollectionSchema() { + "\"big_number\" BIGINT," + "\"rating\" REAL," + "\"created_date\" DATE," - + "\"weight\" DOUBLE PRECISION" + + "\"weight\" DOUBLE PRECISION," + + "\"createdTime\" BIGINT," + + "\"lastUpdateTime\" TIMESTAMP WITH TIME ZONE" + ");", FLAT_COLLECTION_NAME); @@ -2185,4 +2192,161 @@ void testSetWithJsonDocumentValue() throws Exception { } } } + + @Nested + @DisplayName("Timestamp Auto-Population Tests") + class TimestampTests { + + @Test + @DisplayName( + "Should auto-populate createdTime (BIGINT) and lastUpdateTime (TIMESTAMPTZ) on create") + void testTimestampsOnCreate() throws Exception { + long beforeCreate = System.currentTimeMillis(); + + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "ts-test-1"); + objectNode.put("item", "TimestampTestItem"); + objectNode.put("price", 100); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-1"); + + CreateResult result = flatCollection.create(key, document); + assertTrue(result.isSucceed()); + + long afterCreate = System.currentTimeMillis(); + + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key.toString())); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + + long createdTime = rs.getLong("createdTime"); + assertFalse(rs.wasNull(), "createdTime should not be NULL"); + assertTrue( + createdTime >= beforeCreate && createdTime <= afterCreate, + "createdTime should be within test execution window"); + + Timestamp lastUpdateTime = rs.getTimestamp("lastUpdateTime"); + assertNotNull(lastUpdateTime, "lastUpdateTime should not be NULL"); + assertTrue( + lastUpdateTime.getTime() >= beforeCreate && lastUpdateTime.getTime() <= afterCreate, + "lastUpdateTime should be within test execution window"); + } + } + + @Test + @DisplayName("Should preserve createdTime and update lastUpdateTime on upsert") + void testTimestampsOnUpsert() throws Exception { + // First create + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "ts-test-2"); + objectNode.put("item", "UpsertTimestampTest"); + objectNode.put("price", 100); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-2"); + + flatCollection.create(key, document); + + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + long originalCreatedTime; + long originalLastUpdateTime; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key.toString())); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + originalCreatedTime = rs.getLong("createdTime"); + originalLastUpdateTime = rs.getTimestamp("lastUpdateTime").getTime(); + } + + // Wait a bit to ensure time difference + Thread.sleep(50); + + // Upsert (update existing) + long beforeUpsert = System.currentTimeMillis(); + objectNode.put("price", 200); + Document updatedDoc = new JSONDocument(objectNode); + flatCollection.createOrReplace(key, updatedDoc); + long afterUpsert = System.currentTimeMillis(); + + // Verify createdTime preserved, lastUpdateTime updated + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key.toString())); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + + long newCreatedTime = rs.getLong("createdTime"); + assertEquals( + originalCreatedTime, newCreatedTime, "createdTime should be preserved on upsert"); + + long newLastUpdateTime = rs.getTimestamp("lastUpdateTime").getTime(); + assertTrue(newLastUpdateTime > originalLastUpdateTime, "lastUpdateTime should be updated"); + assertTrue( + newLastUpdateTime >= beforeUpsert && newLastUpdateTime <= afterUpsert, + "lastUpdateTime should be within upsert execution window"); + } + } + + @Test + @DisplayName( + "Should not throw exception when timestampFields config is missing - cols remain NULL") + void testNoExceptionWhenTimestampConfigMissing() throws Exception { + // Create a collection WITHOUT timestampFields config + String postgresConnectionUrl = + String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432)); + + Map configWithoutTimestamps = new HashMap<>(); + configWithoutTimestamps.put("url", postgresConnectionUrl); + configWithoutTimestamps.put("user", "postgres"); + configWithoutTimestamps.put("password", "postgres"); + // Note: NO customParams.timestampFields config + + Datastore datastoreWithoutTimestamps = + DatastoreProvider.getDatastore( + "Postgres", ConfigFactory.parseMap(configWithoutTimestamps)); + Collection collectionWithoutTimestamps = + datastoreWithoutTimestamps.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); + + // Create a document - should NOT throw exception + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "ts-test-no-config"); + objectNode.put("item", "NoTimestampConfigTest"); + objectNode.put("price", 100); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-no-config"); + + CreateResult result = collectionWithoutTimestamps.create(key, document); + assertTrue(result.isSucceed()); + + // Verify timestamp columns are NULL + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key.toString())); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + + rs.getLong("createdTime"); + assertTrue(rs.wasNull(), "createdTime should be NULL when config is missing"); + + rs.getTimestamp("lastUpdateTime"); + assertTrue(rs.wasNull(), "lastUpdateTime should be NULL when config is missing"); + } + } + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java new file mode 100644 index 00000000..8174f7d3 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java @@ -0,0 +1,7 @@ +package org.hypertrace.core.documentstore.commons; + +public class FlatStoreConstants { + public static final String TIMESTAMP_FIELDS_CONFIG = "timestampFields"; + public static final String DOC_CREATED_TS_COL_KEY = "docCreatedTsCol"; + public static final String DOC_LAST_UPDATED_TS_COL_KEY = "docLastUpdatedTsCol"; +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 26cc0e7e..b40ffa94 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -33,6 +33,7 @@ import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.UpdateResult; +import org.hypertrace.core.documentstore.commons.FlatStoreConstants; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; import org.hypertrace.core.documentstore.model.exception.SchemaMismatchException; import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy; @@ -82,6 +83,9 @@ public class FlatPostgresCollection extends PostgresCollection { */ private final MissingColumnStrategy missingColumnStrategy; + private final String createdTsColumn; + private final String lastUpdatedTsColumn; + FlatPostgresCollection( final PostgresClient client, final String collectionName, @@ -89,6 +93,20 @@ public class FlatPostgresCollection extends PostgresCollection { super(client, collectionName); this.schemaRegistry = schemaRegistry; this.missingColumnStrategy = parseMissingColumnStrategy(client.getCustomParameters()); + this.createdTsColumn = + getTsColFromConfig(FlatStoreConstants.DOC_CREATED_TS_COL_KEY, client.getCustomParameters()) + .orElse(null); + this.lastUpdatedTsColumn = + getTsColFromConfig( + FlatStoreConstants.DOC_LAST_UPDATED_TS_COL_KEY, client.getCustomParameters()) + .orElse(null); + if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) { + LOGGER.warn( + "timestampFields config not set properly for collection '{}'. " + + "Document timestamps (either createdTime, lastUpdatedTime or both) will not be auto-managed. " + + "Configure via: {{\"timestampFields\": {{\"docCreatedTsCol\": \"\", \"docLastUpdatedTsCol\": \"\"}}}}", + collectionName); + } } private static MissingColumnStrategy parseMissingColumnStrategy(Map params) { @@ -107,6 +125,24 @@ private static MissingColumnStrategy parseMissingColumnStrategy(Map getTsColFromConfig(String key, Map config) { + String jsonValue = config.get(FlatStoreConstants.TIMESTAMP_FIELDS_CONFIG); + if (jsonValue == null || jsonValue.isEmpty()) { + return Optional.empty(); + } + try { + JsonNode node = MAPPER.readTree(jsonValue); + return Optional.ofNullable(node.get(key).asText(null)); + } catch (Exception e) { + LOGGER.warn( + "Failed to parse timestampFields config: '{}'. Expected format: " + + "{{\"docCreatedTsCol\": \"\", \"docLastUpdatedTsCol\": \"\"}}. Error: {}", + jsonValue, + e.getMessage()); + return Optional.empty(); + } + } + @Override public CloseableIterator query( final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) { @@ -298,11 +334,16 @@ private String buildBulkUpsertSql(List columns, String pkColumn) { String columnList = String.join(", ", columns); String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new)); - // Build SET clause for non-PK columns: col = EXCLUDED.col (this ensures that on conflict, the - // new value is picked) + // Build SET clause for non-PK columns: col = EXCLUDED.col + // Exclude createdTsColumn from updates to preserve original creation time + String quotedCreatedTs = + createdTsColumn != null + ? PostgresUtils.wrapFieldNamesWithDoubleQuotes(createdTsColumn) + : null; String setClause = columns.stream() .filter(col -> !col.equals(pkColumn)) + .filter(col -> !col.equals(quotedCreatedTs)) .map(col -> col + " = EXCLUDED." + col) .collect(Collectors.joining(", ")); @@ -747,9 +788,79 @@ private TypedDocument parseDocument( } } + addTimestampFields(typedDocument, tableName); + return typedDocument; } + /** + * Adds timestamp fields if configured and columns exist in the schema. The column type is looked + * up from the schema to determine the appropriate value format: + * + *
    + *
  • BIGINT: epoch milliseconds (long) + *
  • TIMESTAMPTZ: java.sql.Timestamp + *
  • DATE: java.sql.Date + *
  • TEXT: ISO-8601 string + *
+ */ + private void addTimestampFields(TypedDocument typedDocument, String tableName) { + long now = System.currentTimeMillis(); + + if (createdTsColumn != null) { + addTimestampField(typedDocument, tableName, createdTsColumn, now, false); + } + + if (lastUpdatedTsColumn != null) { + addTimestampField(typedDocument, tableName, lastUpdatedTsColumn, now, true); + } + } + + private void addTimestampField( + TypedDocument typedDocument, + String tableName, + String columnName, + long epochMillis, + boolean overwrite) { + if (!overwrite && typedDocument.hasColumn(columnName)) { + return; + } + + Optional colMeta = + schemaRegistry.getColumnOrRefresh(tableName, columnName); + if (colMeta.isEmpty()) { + LOGGER.debug( + "Timestamp column '{}' configured but not found in schema for table '{}'", + columnName, + tableName); + return; + } + + PostgresDataType type = colMeta.get().getPostgresType(); + String quotedCol = PostgresUtils.wrapFieldNamesWithDoubleQuotes(columnName); + Object value = convertTimestampForType(epochMillis, type); + typedDocument.add(quotedCol, value, type, false); + } + + private Object convertTimestampForType(long epochMillis, PostgresDataType type) { + switch (type) { + case BIGINT: + return epochMillis; + case INTEGER: + return (int) (epochMillis / 1000); + case TIMESTAMPTZ: + return Timestamp.from(Instant.ofEpochMilli(epochMillis)); + case DATE: + return new Date(epochMillis); + case TEXT: + return Instant.ofEpochMilli(epochMillis).toString(); // ISO-8601 + default: + LOGGER.warn( + "Unexpected type '{}' for timestamp column, using epoch millis as string", type); + return String.valueOf(epochMillis); + } + } + private int executeUpdate(String sql, TypedDocument parsed) throws SQLException { try (Connection conn = client.getPooledConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { @@ -837,9 +948,15 @@ private String buildUpsertSql(List columns, String pkColumn) { String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new)); // Build SET clause for non-PK columns: col = EXCLUDED.col + // Exclude createdTsColumn from updates to preserve original creation time + String quotedCreatedTs = + createdTsColumn != null + ? PostgresUtils.wrapFieldNamesWithDoubleQuotes(createdTsColumn) + : null; String setClause = columns.stream() .filter(col -> !col.equals(pkColumn)) + .filter(col -> !col.equals(quotedCreatedTs)) .map(col -> col + " = EXCLUDED." + col) .collect(Collectors.joining(", ")); @@ -953,6 +1070,11 @@ PostgresDataType getType(String column) { boolean isArray(String column) { return arrays.getOrDefault(column, false); } + + boolean hasColumn(String columnName) { + String quotedCol = PostgresUtils.wrapFieldNamesWithDoubleQuotes(columnName); + return values.containsKey(quotedCol) || values.containsKey(columnName); + } } private String buildInsertSql(List columns) { From d6c693776a6fe786b20c6ff2c7d914a8f63092f3 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 9 Feb 2026 12:27:50 +0530 Subject: [PATCH 02/16] Update config keys --- .../core/documentstore/FlatCollectionWriteTest.java | 2 +- .../core/documentstore/commons/FlatStoreConstants.java | 4 ++-- .../documentstore/postgres/FlatPostgresCollection.java | 7 +++---- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 05f06e05..9e283a01 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -98,7 +98,7 @@ public static void init() throws IOException { // Configure timestamp fields for auto-managed document timestamps postgresConfig.put( "customParams.timestampFields", - "{\"docCreatedTsCol\": \"createdTime\", \"docLastUpdatedTsCol\": \"lastUpdateTime\"}"); + "{\"createdTsCol\": \"createdTime\", \"lastUpdatedTsCol\": \"lastUpdateTime\"}"); postgresDatastore = DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(postgresConfig)); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java index 8174f7d3..489d5987 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java @@ -2,6 +2,6 @@ public class FlatStoreConstants { public static final String TIMESTAMP_FIELDS_CONFIG = "timestampFields"; - public static final String DOC_CREATED_TS_COL_KEY = "docCreatedTsCol"; - public static final String DOC_LAST_UPDATED_TS_COL_KEY = "docLastUpdatedTsCol"; + public static final String CREATED_TS_COL_KEY = "createdTsCol"; + public static final String LAST_UPDATED_TS_COL_KEY = "lastUpdatedTsCol"; } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index b40ffa94..8bef9af1 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -94,17 +94,16 @@ public class FlatPostgresCollection extends PostgresCollection { this.schemaRegistry = schemaRegistry; this.missingColumnStrategy = parseMissingColumnStrategy(client.getCustomParameters()); this.createdTsColumn = - getTsColFromConfig(FlatStoreConstants.DOC_CREATED_TS_COL_KEY, client.getCustomParameters()) + getTsColFromConfig(FlatStoreConstants.CREATED_TS_COL_KEY, client.getCustomParameters()) .orElse(null); this.lastUpdatedTsColumn = - getTsColFromConfig( - FlatStoreConstants.DOC_LAST_UPDATED_TS_COL_KEY, client.getCustomParameters()) + getTsColFromConfig(FlatStoreConstants.LAST_UPDATED_TS_COL_KEY, client.getCustomParameters()) .orElse(null); if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) { LOGGER.warn( "timestampFields config not set properly for collection '{}'. " + "Document timestamps (either createdTime, lastUpdatedTime or both) will not be auto-managed. " - + "Configure via: {{\"timestampFields\": {{\"docCreatedTsCol\": \"\", \"docLastUpdatedTsCol\": \"\"}}}}", + + "Configure via: {{\"timestampFields\": {{\"createdTsCol\": \"\", \"lastUpdatedTsCol\": \"\"}}}}", collectionName); } } From b70864ad9d581868430196fd68775cc5935282e0 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 9 Feb 2026 12:34:55 +0530 Subject: [PATCH 03/16] Added more coverage --- .../FlatCollectionWriteTest.java | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 9e283a01..75faa31a 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -2348,5 +2348,55 @@ void testNoExceptionWhenTimestampConfigMissing() throws Exception { assertTrue(rs.wasNull(), "lastUpdateTime should be NULL when config is missing"); } } + + @Test + @DisplayName( + "Should not throw exception when timestampFields config is invalid JSON - cols remain NULL") + void testNoExceptionWhenTimestampConfigInvalidJson() throws Exception { + // Create a collection with INVALID JSON in timestampFields config + String postgresConnectionUrl = + String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432)); + + Map configWithInvalidJson = new HashMap<>(); + configWithInvalidJson.put("url", postgresConnectionUrl); + configWithInvalidJson.put("user", "postgres"); + configWithInvalidJson.put("password", "postgres"); + // Invalid JSON - missing quotes, malformed + configWithInvalidJson.put("customParams.timestampFields", "not valid json {{{"); + + Datastore datastoreWithInvalidConfig = + DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(configWithInvalidJson)); + Collection collectionWithInvalidConfig = + datastoreWithInvalidConfig.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); + + // Create a document - should NOT throw exception + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "ts-test-invalid-json"); + objectNode.put("item", "InvalidJsonConfigTest"); + objectNode.put("price", 100); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-invalid-json"); + + CreateResult result = collectionWithInvalidConfig.create(key, document); + assertTrue(result.isSucceed()); + + // Verify timestamp columns are NULL (config parsing failed gracefully) + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key.toString())); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + + rs.getLong("createdTime"); + assertTrue(rs.wasNull(), "createdTime should be NULL when config JSON is invalid"); + + rs.getTimestamp("lastUpdateTime"); + assertTrue(rs.wasNull(), "lastUpdateTime should be NULL when config JSON is invalid"); + } + } } } From e86721a6416569b87213276af38c5cc9b7b85a43 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 9 Feb 2026 12:43:01 +0530 Subject: [PATCH 04/16] Remove FlatStoreConstants.java --- .../documentstore/commons/FlatStoreConstants.java | 7 ------- .../postgres/FlatPostgresCollection.java | 12 ++++++------ 2 files changed, 6 insertions(+), 13 deletions(-) delete mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java deleted file mode 100644 index 489d5987..00000000 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.hypertrace.core.documentstore.commons; - -public class FlatStoreConstants { - public static final String TIMESTAMP_FIELDS_CONFIG = "timestampFields"; - public static final String CREATED_TS_COL_KEY = "createdTsCol"; - public static final String LAST_UPDATED_TS_COL_KEY = "lastUpdatedTsCol"; -} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 8bef9af1..d664d546 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -33,7 +33,6 @@ import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.UpdateResult; -import org.hypertrace.core.documentstore.commons.FlatStoreConstants; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; import org.hypertrace.core.documentstore.model.exception.SchemaMismatchException; import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy; @@ -71,6 +70,9 @@ public class FlatPostgresCollection extends PostgresCollection { "Write operations are not supported for flat collections yet!"; private static final String MISSING_COLUMN_STRATEGY_CONFIG = "missingColumnStrategy"; private static final String DEFAULT_PRIMARY_KEY_COLUMN = "key"; + private static final String TIMESTAMP_FIELDS_CONFIG = "timestampFields"; + private static final String CREATED_TS_COL_KEY = "createdTsCol"; + private static final String LAST_UPDATED_TS_COL_KEY = "lastUpdatedTsCol"; private static final Map SUB_DOC_UPDATE_PARSERS = Map.of(SET, new FlatCollectionSubDocSetOperatorParser()); @@ -94,11 +96,9 @@ public class FlatPostgresCollection extends PostgresCollection { this.schemaRegistry = schemaRegistry; this.missingColumnStrategy = parseMissingColumnStrategy(client.getCustomParameters()); this.createdTsColumn = - getTsColFromConfig(FlatStoreConstants.CREATED_TS_COL_KEY, client.getCustomParameters()) - .orElse(null); + getTsColFromConfig(CREATED_TS_COL_KEY, client.getCustomParameters()).orElse(null); this.lastUpdatedTsColumn = - getTsColFromConfig(FlatStoreConstants.LAST_UPDATED_TS_COL_KEY, client.getCustomParameters()) - .orElse(null); + getTsColFromConfig(LAST_UPDATED_TS_COL_KEY, client.getCustomParameters()).orElse(null); if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) { LOGGER.warn( "timestampFields config not set properly for collection '{}'. " @@ -125,7 +125,7 @@ private static MissingColumnStrategy parseMissingColumnStrategy(Map getTsColFromConfig(String key, Map config) { - String jsonValue = config.get(FlatStoreConstants.TIMESTAMP_FIELDS_CONFIG); + String jsonValue = config.get(TIMESTAMP_FIELDS_CONFIG); if (jsonValue == null || jsonValue.isEmpty()) { return Optional.empty(); } From 1e5714fe3a35b90de542e886b03baefec76d7d67 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 9 Feb 2026 12:46:46 +0530 Subject: [PATCH 05/16] Revert "Remove FlatStoreConstants.java" This reverts commit e86721a6416569b87213276af38c5cc9b7b85a43. --- .../documentstore/commons/FlatStoreConstants.java | 7 +++++++ .../postgres/FlatPostgresCollection.java | 12 ++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java new file mode 100644 index 00000000..489d5987 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java @@ -0,0 +1,7 @@ +package org.hypertrace.core.documentstore.commons; + +public class FlatStoreConstants { + public static final String TIMESTAMP_FIELDS_CONFIG = "timestampFields"; + public static final String CREATED_TS_COL_KEY = "createdTsCol"; + public static final String LAST_UPDATED_TS_COL_KEY = "lastUpdatedTsCol"; +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index d664d546..8bef9af1 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -33,6 +33,7 @@ import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.UpdateResult; +import org.hypertrace.core.documentstore.commons.FlatStoreConstants; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; import org.hypertrace.core.documentstore.model.exception.SchemaMismatchException; import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy; @@ -70,9 +71,6 @@ public class FlatPostgresCollection extends PostgresCollection { "Write operations are not supported for flat collections yet!"; private static final String MISSING_COLUMN_STRATEGY_CONFIG = "missingColumnStrategy"; private static final String DEFAULT_PRIMARY_KEY_COLUMN = "key"; - private static final String TIMESTAMP_FIELDS_CONFIG = "timestampFields"; - private static final String CREATED_TS_COL_KEY = "createdTsCol"; - private static final String LAST_UPDATED_TS_COL_KEY = "lastUpdatedTsCol"; private static final Map SUB_DOC_UPDATE_PARSERS = Map.of(SET, new FlatCollectionSubDocSetOperatorParser()); @@ -96,9 +94,11 @@ public class FlatPostgresCollection extends PostgresCollection { this.schemaRegistry = schemaRegistry; this.missingColumnStrategy = parseMissingColumnStrategy(client.getCustomParameters()); this.createdTsColumn = - getTsColFromConfig(CREATED_TS_COL_KEY, client.getCustomParameters()).orElse(null); + getTsColFromConfig(FlatStoreConstants.CREATED_TS_COL_KEY, client.getCustomParameters()) + .orElse(null); this.lastUpdatedTsColumn = - getTsColFromConfig(LAST_UPDATED_TS_COL_KEY, client.getCustomParameters()).orElse(null); + getTsColFromConfig(FlatStoreConstants.LAST_UPDATED_TS_COL_KEY, client.getCustomParameters()) + .orElse(null); if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) { LOGGER.warn( "timestampFields config not set properly for collection '{}'. " @@ -125,7 +125,7 @@ private static MissingColumnStrategy parseMissingColumnStrategy(Map getTsColFromConfig(String key, Map config) { - String jsonValue = config.get(TIMESTAMP_FIELDS_CONFIG); + String jsonValue = config.get(FlatStoreConstants.TIMESTAMP_FIELDS_CONFIG); if (jsonValue == null || jsonValue.isEmpty()) { return Optional.empty(); } From df0f707b78df8f357b2054d891afad39269d762e Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 9 Feb 2026 12:50:21 +0530 Subject: [PATCH 06/16] Added more coverage --- .../core/documentstore/postgres/FlatPostgresCollection.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 8bef9af1..c26529f3 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; import java.sql.Array; @@ -841,7 +842,8 @@ private void addTimestampField( typedDocument.add(quotedCol, value, type, false); } - private Object convertTimestampForType(long epochMillis, PostgresDataType type) { + @VisibleForTesting + Object convertTimestampForType(long epochMillis, PostgresDataType type) { switch (type) { case BIGINT: return epochMillis; From d96a2284f6b439c6600bb2a66ca4758989c29b97 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 9 Feb 2026 12:53:56 +0530 Subject: [PATCH 07/16] Added FlatPostgresCollectionTest --- .../postgres/FlatPostgresCollectionTest.java | 111 ++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java new file mode 100644 index 00000000..06af91a7 --- /dev/null +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -0,0 +1,111 @@ +package org.hypertrace.core.documentstore.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.sql.Date; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.Collections; +import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +class FlatPostgresCollectionTest { + + private FlatPostgresCollection collection; + + @BeforeEach + void setUp() { + PostgresClient mockClient = mock(PostgresClient.class); + when(mockClient.getCustomParameters()).thenReturn(Collections.emptyMap()); + + PostgresLazyilyLoadedSchemaRegistry mockSchemaRegistry = + mock(PostgresLazyilyLoadedSchemaRegistry.class); + + collection = new FlatPostgresCollection(mockClient, "test_table", mockSchemaRegistry); + } + + @Nested + @DisplayName("convertTimestampForType Tests") + class ConvertTimestampForTypeTests { + + private static final long TEST_EPOCH_MILLIS = 1707465494818L; // 2024-02-09T06:58:14.818Z + + @Test + @DisplayName("BIGINT should return epoch millis as-is") + void testBigintReturnsEpochMillis() { + Object result = + collection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.BIGINT); + + assertInstanceOf(Long.class, result); + assertEquals(TEST_EPOCH_MILLIS, result); + } + + @Test + @DisplayName("INTEGER should return epoch seconds (millis / 1000)") + void testIntegerReturnsEpochSeconds() { + Object result = + collection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.INTEGER); + + assertInstanceOf(Integer.class, result); + assertEquals((int) (TEST_EPOCH_MILLIS / 1000), result); + } + + @Test + @DisplayName("TIMESTAMPTZ should return java.sql.Timestamp") + void testTimestamptzReturnsSqlTimestamp() { + Object result = + collection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.TIMESTAMPTZ); + + assertInstanceOf(Timestamp.class, result); + Timestamp timestamp = (Timestamp) result; + assertEquals(TEST_EPOCH_MILLIS, timestamp.getTime()); + } + + @Test + @DisplayName("DATE should return java.sql.Date") + void testDateReturnsSqlDate() { + Object result = collection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.DATE); + + assertInstanceOf(Date.class, result); + Date date = (Date) result; + assertEquals(TEST_EPOCH_MILLIS, date.getTime()); + } + + @Test + @DisplayName("TEXT should return ISO-8601 formatted string") + void testTextReturnsIso8601String() { + Object result = collection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.TEXT); + + assertInstanceOf(String.class, result); + String isoString = (String) result; + assertEquals(Instant.ofEpochMilli(TEST_EPOCH_MILLIS).toString(), isoString); + assertTrue(isoString.contains("2024-02-09")); + } + + @Test + @DisplayName("Unsupported type should return epoch millis as string (fallback)") + void testUnsupportedTypeReturnsStringFallback() { + // REAL is not a typical timestamp type, should fall through to default + Object result = collection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.REAL); + + assertInstanceOf(String.class, result); + assertEquals(String.valueOf(TEST_EPOCH_MILLIS), result); + } + + @Test + @DisplayName("JSONB type should return epoch millis as string (fallback)") + void testJsonbTypeReturnsStringFallback() { + Object result = collection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.JSONB); + + assertInstanceOf(String.class, result); + assertEquals(String.valueOf(TEST_EPOCH_MILLIS), result); + } + } +} From 2a588987c76082448bb14f301131153d335feb41 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Wed, 11 Feb 2026 12:44:31 +0530 Subject: [PATCH 08/16] WIP --- .../TypesafeDatastoreConfigAdapter.java | 3 +- .../commons/FlatStoreConstants.java | 7 --- .../model/config/ConnectionConfig.java | 12 +++- ...ypesafeConfigDatastoreConfigExtractor.java | 53 ++++++++++++++++- .../postgres/PostgresConnectionConfig.java | 16 ++++- .../postgres/FlatPostgresCollection.java | 45 ++++++-------- .../postgres/PostgresClient.java | 6 ++ ...afeConfigDatastoreConfigExtractorTest.java | 59 +++++++++++++++++++ 8 files changed, 163 insertions(+), 38 deletions(-) delete mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java index abc00e88..e033561e 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java @@ -94,7 +94,8 @@ public DatastoreConfig convert(final Config config) { connectionConfig.applicationName(), connectionConfig.connectionPoolConfig(), connectionConfig.queryTimeout(), - connectionConfig.customParameters()) { + connectionConfig.customParameters(), + connectionConfig.collectionConfigs()) { @Override public String toConnectionString() { return config.hasPath("url") diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java b/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java deleted file mode 100644 index 489d5987..00000000 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/commons/FlatStoreConstants.java +++ /dev/null @@ -1,7 +0,0 @@ -package org.hypertrace.core.documentstore.commons; - -public class FlatStoreConstants { - public static final String TIMESTAMP_FIELDS_CONFIG = "timestampFields"; - public static final String CREATED_TS_COL_KEY = "createdTsCol"; - public static final String LAST_UPDATED_TS_COL_KEY = "lastUpdatedTsCol"; -} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java index 11127b81..ae8ed5f1 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java @@ -75,12 +75,21 @@ public static class ConnectionConfigBuilder { String applicationName = DEFAULT_APP_NAME; String replicaSet; Map customParameters = new HashMap<>(); + Map + collectionConfigs = new HashMap<>(); public ConnectionConfigBuilder customParameter(String key, String value) { this.customParameters.put(key, value); return this; } + public ConnectionConfigBuilder collectionConfig( + String collectionName, + org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig config) { + this.collectionConfigs.put(collectionName, config); + return this; + } + ConnectionPoolConfig connectionPoolConfig; AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS; DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT; @@ -127,7 +136,8 @@ public ConnectionConfig build() { applicationName, connectionPoolConfig, queryTimeout, - customParameters); + customParameters, + collectionConfigs); } throw new IllegalArgumentException("Unsupported database type: " + type); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java index f094e5a7..cd8105d6 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java @@ -35,6 +35,7 @@ public class TypesafeConfigDatastoreConfigExtractor { private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout"; private static final String DEFAULT_CONNECTION_TIMEOUT_KEY = "connectionTimeout"; private static final String DEFAULT_CUSTOM_PARAMETERS_PREFIX = "customParams"; + private static final String DEFAULT_COLLECTION_CONFIGS_PREFIX = "collectionConfigs"; @NonNull Config config; DatastoreConfigBuilder datastoreConfigBuilder; @@ -82,7 +83,8 @@ private TypesafeConfigDatastoreConfigExtractor( .dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY) .queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY) .connectionTimeoutKey(DEFAULT_CONNECTION_TIMEOUT_KEY) - .customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX); + .customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX) + .collectionConfigsKey(dataStoreType + "." + DEFAULT_COLLECTION_CONFIGS_PREFIX); } public static TypesafeConfigDatastoreConfigExtractor from( @@ -253,6 +255,54 @@ public TypesafeConfigDatastoreConfigExtractor connectionTimeoutKey(@NonNull fina return this; } + public TypesafeConfigDatastoreConfigExtractor collectionConfigsKey(@NonNull final String key) { + if (!config.hasPath(key)) { + return this; + } + + try { + Config collectionsConfig = config.getConfig(key); + collectionsConfig + .root() + .keySet() + .forEach( + collectionName -> { + Config collectionConfig = collectionsConfig.getConfig(collectionName); + org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig + .CollectionConfigBuilder + builder = + org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig + .builder(); + + // Parse timestampFields if present + if (collectionConfig.hasPath("timestampFields")) { + Config timestampConfig = collectionConfig.getConfig("timestampFields"); + org.hypertrace.core.documentstore.model.config.postgres.TimestampFieldsConfig + .TimestampFieldsConfigBuilder + tsBuilder = + org.hypertrace.core.documentstore.model.config.postgres + .TimestampFieldsConfig.builder(); + + if (timestampConfig.hasPath("created")) { + tsBuilder.created(timestampConfig.getString("created")); + } + if (timestampConfig.hasPath("lastUpdated")) { + tsBuilder.lastUpdated(timestampConfig.getString("lastUpdated")); + } + + builder.timestampFields(tsBuilder.build()); + } + + connectionConfigBuilder.collectionConfig(collectionName, builder.build()); + }); + } catch (Exception e) { + LOGGER.warn( + "Collection configs key '{}' exists but could not be parsed: {}", key, e.getMessage()); + } + + return this; + } + public DatastoreConfig extract() { if (connectionConfigBuilder.endpoints().isEmpty() && !Endpoint.builder().build().equals(endpointBuilder.build())) { @@ -265,6 +315,7 @@ public DatastoreConfig extract() { .connectionPoolConfig(connectionPoolConfigBuilder.build()) .credentials(connectionCredentialsBuilder.build()) .customParameters(connectionConfigBuilder.customParameters()) + .collectionConfigs(connectionConfigBuilder.collectionConfigs()) .build()) .build(); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java index 409fa95d..af6b308b 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java @@ -5,6 +5,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -46,6 +47,7 @@ public class PostgresConnectionConfig extends ConnectionConfig { @NonNull Duration queryTimeout; @NonNull Duration schemaCacheExpiry; @NonNull Duration schemaRefreshCooldown; + @NonNull Map collectionConfigs; public static ConnectionConfigBuilder builder() { return ConnectionConfig.builder().type(DatabaseType.POSTGRES); @@ -58,7 +60,8 @@ public PostgresConnectionConfig( @NonNull final String applicationName, @Nullable final ConnectionPoolConfig connectionPoolConfig, @NonNull final Duration queryTimeout, - @NonNull final Map customParameters) { + @NonNull final Map customParameters, + @Nullable final Map collectionConfigs) { super( ensureSingleEndpoint(endpoints), getDatabaseOrDefault(database), @@ -69,6 +72,7 @@ public PostgresConnectionConfig( this.queryTimeout = queryTimeout; this.schemaCacheExpiry = extractSchemaCacheExpiry(customParameters); this.schemaRefreshCooldown = extractSchemaRefreshCooldown(customParameters); + this.collectionConfigs = collectionConfigs != null ? collectionConfigs : Collections.emptyMap(); } public String toConnectionString() { @@ -157,4 +161,14 @@ private static Duration extractSchemaRefreshCooldown(final Map c .map(Duration::ofMillis) .orElse(DEFAULT_SCHEMA_REFRESH_COOLDOWN); } + + /** + * Gets the collection-specific configuration for the given collection name. + * + * @param collectionName the name of the collection + * @return Optional containing the CollectionConfig, or empty if not configured + */ + public Optional getCollectionConfig(String collectionName) { + return Optional.ofNullable(collectionConfigs.get(collectionName)); + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index c26529f3..b769920a 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -34,7 +34,6 @@ import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.UpdateResult; -import org.hypertrace.core.documentstore.commons.FlatStoreConstants; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; import org.hypertrace.core.documentstore.model.exception.SchemaMismatchException; import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy; @@ -94,17 +93,27 @@ public class FlatPostgresCollection extends PostgresCollection { super(client, collectionName); this.schemaRegistry = schemaRegistry; this.missingColumnStrategy = parseMissingColumnStrategy(client.getCustomParameters()); - this.createdTsColumn = - getTsColFromConfig(FlatStoreConstants.CREATED_TS_COL_KEY, client.getCustomParameters()) - .orElse(null); - this.lastUpdatedTsColumn = - getTsColFromConfig(FlatStoreConstants.LAST_UPDATED_TS_COL_KEY, client.getCustomParameters()) - .orElse(null); + + // Get timestamp configuration from collectionConfigs + String createdTs = null; + String lastUpdatedTs = null; + + var collectionConfig = client.getCollectionConfig(collectionName); + if (collectionConfig.isPresent() && collectionConfig.get().getTimestampFields().isPresent()) { + var tsConfig = collectionConfig.get().getTimestampFields().get(); + createdTs = tsConfig.getCreated().orElse(null); + lastUpdatedTs = tsConfig.getLastUpdated().orElse(null); + } + + this.createdTsColumn = createdTs; + this.lastUpdatedTsColumn = lastUpdatedTs; + if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) { LOGGER.warn( "timestampFields config not set properly for collection '{}'. " - + "Document timestamps (either createdTime, lastUpdatedTime or both) will not be auto-managed. " - + "Configure via: {{\"timestampFields\": {{\"createdTsCol\": \"\", \"lastUpdatedTsCol\": \"\"}}}}", + + "Document timestamps (either created, lastUpdated or both) will not be auto-managed. " + + "Configure via collectionConfigs.{}.timestampFields {{ created = \"\", lastUpdated = \"\" }}", + collectionName, collectionName); } } @@ -125,24 +134,6 @@ private static MissingColumnStrategy parseMissingColumnStrategy(Map getTsColFromConfig(String key, Map config) { - String jsonValue = config.get(FlatStoreConstants.TIMESTAMP_FIELDS_CONFIG); - if (jsonValue == null || jsonValue.isEmpty()) { - return Optional.empty(); - } - try { - JsonNode node = MAPPER.readTree(jsonValue); - return Optional.ofNullable(node.get(key).asText(null)); - } catch (Exception e) { - LOGGER.warn( - "Failed to parse timestampFields config: '{}'. Expected format: " - + "{{\"docCreatedTsCol\": \"\", \"docLastUpdatedTsCol\": \"\"}}. Error: {}", - jsonValue, - e.getMessage()); - return Optional.empty(); - } - } - @Override public CloseableIterator query( final org.hypertrace.core.documentstore.query.Query query, final QueryOptions queryOptions) { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java index d0ced723..9e7d48eb 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java @@ -7,7 +7,9 @@ import java.sql.SQLException; import java.time.Duration; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresDefaults; import org.slf4j.Logger; @@ -69,6 +71,10 @@ public Map getCustomParameters() { return connectionConfig.customParameters(); } + public Optional getCollectionConfig(String collectionName) { + return connectionConfig.getCollectionConfig(collectionName); + } + public int getQueryTimeoutSeconds() { return (int) connectionConfig.queryTimeout().toSeconds(); } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java index 01062175..3877638b 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java @@ -353,6 +353,40 @@ void testBuildPostgresUsingDefaultKeysAndCustomParams() { assertEquals(expected, config); } + @Test + void testBuildPostgresWithCollectionConfigs() { + final ConnectionConfig config = + TypesafeConfigDatastoreConfigExtractor.from( + buildConfigMapWithCollectionConfigsForPostgres(), TYPE_KEY) + .extract() + .connectionConfig(); + + final org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig + postgresConfig = + (org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig) + config; + + // Verify collection config for entities_api + var entitiesApiConfig = postgresConfig.getCollectionConfig("entities_api"); + assertEquals(true, entitiesApiConfig.isPresent()); + var timestampFields = entitiesApiConfig.get().getTimestampFields(); + assertEquals(true, timestampFields.isPresent()); + assertEquals("created_time", timestampFields.get().getCreated().get()); + assertEquals("last_updated_time", timestampFields.get().getLastUpdated().get()); + + // Verify collection config for entities_domain + var entitiesDomainConfig = postgresConfig.getCollectionConfig("entities_domain"); + assertEquals(true, entitiesDomainConfig.isPresent()); + var domainTimestampFields = entitiesDomainConfig.get().getTimestampFields(); + assertEquals(true, domainTimestampFields.isPresent()); + assertEquals("create_ts", domainTimestampFields.get().getCreated().get()); + assertEquals("update_ts", domainTimestampFields.get().getLastUpdated().get()); + + // Verify non-existent collection config returns empty + var nonExistentConfig = postgresConfig.getCollectionConfig("non_existent"); + assertEquals(false, nonExistentConfig.isPresent()); + } + private Config buildConfigMap() { return ConfigFactory.parseMap( Map.ofEntries( @@ -457,4 +491,29 @@ private Config buildMongoConfigMapUsingEndpointsKey() { entry(CONNECTION_ACCESS_TIMEOUT_KEY, accessTimeout), entry(CONNECTION_SURRENDER_TIMEOUT_KEY, surrenderTimeout))); } + + private Config buildConfigMapWithCollectionConfigsForPostgres() { + return ConfigFactory.parseMap( + Map.ofEntries( + entry(TYPE_KEY, "postgres"), + entry("postgres.host", host), + entry("postgres.port", port), + entry("postgres.database", database), + entry("postgres.user", user), + entry("postgres.password", password), + entry("appName", appName), + entry("maxPoolSize", maxConnections), + entry("connectionAccessTimeout", accessTimeout), + entry("connectionIdleTime", surrenderTimeout), + entry( + "postgres.collectionConfigs.entities_api.timestampFields.created", "created_time"), + entry( + "postgres.collectionConfigs.entities_api.timestampFields.lastUpdated", + "last_updated_time"), + entry( + "postgres.collectionConfigs.entities_domain.timestampFields.created", "create_ts"), + entry( + "postgres.collectionConfigs.entities_domain.timestampFields.lastUpdated", + "update_ts"))); + } } From 57f5426bba3508f14eb6760be153074e17d8e596 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 23 Feb 2026 14:55:12 +0530 Subject: [PATCH 09/16] Fix failing tests --- .../FlatCollectionWriteTest.java | 23 ++++++++++--------- .../postgres/FlatPostgresCollection.java | 13 +++++++++-- .../postgres/FlatPostgresCollectionTest.java | 2 +- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 3ae3311d..f9f83c7e 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -95,10 +96,12 @@ public static void init() throws IOException { postgresConfig.put("url", postgresConnectionUrl); postgresConfig.put("user", "postgres"); postgresConfig.put("password", "postgres"); - // Configure timestamp fields for auto-managed document timestamps postgresConfig.put( - "customParams.timestampFields", - "{\"createdTsCol\": \"createdTime\", \"lastUpdatedTsCol\": \"lastUpdateTime\"}"); + "postgres.collectionConfigs." + FLAT_COLLECTION_NAME + ".timestampFields.created", + "createdTime"); + postgresConfig.put( + "postgres.collectionConfigs." + FLAT_COLLECTION_NAME + ".timestampFields.lastUpdated", + "lastUpdateTime"); postgresDatastore = DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(postgresConfig)); @@ -2495,7 +2498,7 @@ void testTimestampsOnCreate() throws Exception { conn.prepareStatement( String.format( "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", - FLAT_COLLECTION_NAME, key.toString())); + FLAT_COLLECTION_NAME, key)); ResultSet rs = ps.executeQuery()) { assertTrue(rs.next()); @@ -2551,7 +2554,6 @@ void testTimestampsOnUpsert() throws Exception { flatCollection.createOrReplace(key, updatedDoc); long afterUpsert = System.currentTimeMillis(); - // Verify createdTime preserved, lastUpdateTime updated try (Connection conn = pgDatastore.getPostgresClient(); PreparedStatement ps = conn.prepareStatement( @@ -2611,15 +2613,14 @@ void testNoExceptionWhenTimestampConfigMissing() throws Exception { conn.prepareStatement( String.format( "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", - FLAT_COLLECTION_NAME, key.toString())); + FLAT_COLLECTION_NAME, key)); ResultSet rs = ps.executeQuery()) { assertTrue(rs.next()); - rs.getLong("createdTime"); - assertTrue(rs.wasNull(), "createdTime should be NULL when config is missing"); - - rs.getTimestamp("lastUpdateTime"); - assertTrue(rs.wasNull(), "lastUpdateTime should be NULL when config is missing"); + assertNull( + rs.getObject("createdTime"), "createdTime should be NULL when config is missing"); + assertNull( + rs.getObject("lastUpdateTime"), "lastUpdateTime should be NULL when config is missing"); } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 4e6d82a8..a8fe19d5 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -109,10 +109,19 @@ public class FlatPostgresCollection extends PostgresCollection { this.createdTsColumn = createdTs; this.lastUpdatedTsColumn = lastUpdatedTs; - if (this.createdTsColumn == null || this.lastUpdatedTsColumn == null) { + if (this.createdTsColumn == null) { LOGGER.warn( "timestampFields config not set properly for collection '{}'. " - + "Document timestamps (either created, lastUpdated or both) will not be auto-managed. " + + "Row created timestamp will not be auto-managed. " + + "Configure via collectionConfigs.{}.timestampFields {{ created = \"\", lastUpdated = \"\" }}", + collectionName, + collectionName); + } + + if (this.lastUpdatedTsColumn == null) { + LOGGER.warn( + "timestampFields config not set properly for collection '{}'. " + + "Row lastUpdated timestamp will not be auto-managed. " + "Configure via collectionConfigs.{}.timestampFields {{ created = \"\", lastUpdated = \"\" }}", collectionName, collectionName); diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java index 06af91a7..99f04e8d 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -33,7 +33,7 @@ void setUp() { @Nested @DisplayName("convertTimestampForType Tests") - class ConvertTimestampForTypeTests { + class TimestampCoversionTests { private static final long TEST_EPOCH_MILLIS = 1707465494818L; // 2024-02-09T06:58:14.818Z From bbd49a1bc1a6d1d07ccd2e6e12449b4134ddf058 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 23 Feb 2026 15:40:03 +0530 Subject: [PATCH 10/16] Trigger CI --- .../core/documentstore/postgres/FlatPostgresCollection.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 387e7808..6ad0daa7 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -35,6 +35,7 @@ import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.UpdateResult; +import org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; import org.hypertrace.core.documentstore.model.exception.SchemaMismatchException; import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy; @@ -104,7 +105,7 @@ SET, new FlatCollectionSubDocSetOperatorParser(), String createdTs = null; String lastUpdatedTs = null; - var collectionConfig = client.getCollectionConfig(collectionName); + Optional collectionConfig = client.getCollectionConfig(collectionName); if (collectionConfig.isPresent() && collectionConfig.get().getTimestampFields().isPresent()) { var tsConfig = collectionConfig.get().getTimestampFields().get(); createdTs = tsConfig.getCreated().orElse(null); From cf2cef53a21627d6b2fefae53fedb72e37f82ac5 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 23 Feb 2026 16:07:46 +0530 Subject: [PATCH 11/16] WIP --- ...ypesafeConfigDatastoreConfigExtractor.java | 38 ++++++++++--------- .../config/postgres/CollectionConfig.java | 22 +++++++++++ .../postgres/TimestampFieldsConfig.java | 35 +++++++++++++++++ 3 files changed, 78 insertions(+), 17 deletions(-) create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/CollectionConfig.java create mode 100644 document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/TimestampFieldsConfig.java diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java index cd8105d6..90fda99b 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java @@ -10,12 +10,14 @@ import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig.ConnectionPoolConfigBuilder; import org.hypertrace.core.documentstore.model.config.DatastoreConfig.DatastoreConfigBuilder; import org.hypertrace.core.documentstore.model.config.Endpoint.EndpointBuilder; +import org.hypertrace.core.documentstore.model.config.postgres.TimestampFieldsConfig; import org.hypertrace.core.documentstore.model.options.DataFreshness; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Value public class TypesafeConfigDatastoreConfigExtractor { + private static final Logger LOGGER = LoggerFactory.getLogger(TypesafeConfigDatastoreConfigExtractor.class); private static final String DEFAULT_HOST_KEY = "host"; @@ -36,6 +38,9 @@ public class TypesafeConfigDatastoreConfigExtractor { private static final String DEFAULT_CONNECTION_TIMEOUT_KEY = "connectionTimeout"; private static final String DEFAULT_CUSTOM_PARAMETERS_PREFIX = "customParams"; private static final String DEFAULT_COLLECTION_CONFIGS_PREFIX = "collectionConfigs"; + private static final String TIMESTAMP_FIELDS_KEY = "timestampFields"; + private static final String TIMESTAMP_CREATED_KEY = "created"; + private static final String TIMESTAMP_LAST_UPDATED_KEY = "lastUpdated"; @NonNull Config config; DatastoreConfigBuilder datastoreConfigBuilder; @@ -274,23 +279,9 @@ public TypesafeConfigDatastoreConfigExtractor collectionConfigsKey(@NonNull fina org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig .builder(); - // Parse timestampFields if present - if (collectionConfig.hasPath("timestampFields")) { - Config timestampConfig = collectionConfig.getConfig("timestampFields"); - org.hypertrace.core.documentstore.model.config.postgres.TimestampFieldsConfig - .TimestampFieldsConfigBuilder - tsBuilder = - org.hypertrace.core.documentstore.model.config.postgres - .TimestampFieldsConfig.builder(); - - if (timestampConfig.hasPath("created")) { - tsBuilder.created(timestampConfig.getString("created")); - } - if (timestampConfig.hasPath("lastUpdated")) { - tsBuilder.lastUpdated(timestampConfig.getString("lastUpdated")); - } - - builder.timestampFields(tsBuilder.build()); + if (collectionConfig.hasPath(TIMESTAMP_FIELDS_KEY)) { + builder.timestampFields( + parseTimestampFieldsConfig(collectionConfig.getConfig(TIMESTAMP_FIELDS_KEY))); } connectionConfigBuilder.collectionConfig(collectionName, builder.build()); @@ -303,6 +294,19 @@ public TypesafeConfigDatastoreConfigExtractor collectionConfigsKey(@NonNull fina return this; } + private TimestampFieldsConfig parseTimestampFieldsConfig(Config timestampConfig) { + TimestampFieldsConfig.TimestampFieldsConfigBuilder tsBuilder = TimestampFieldsConfig.builder(); + + if (timestampConfig.hasPath(TIMESTAMP_CREATED_KEY)) { + tsBuilder.created(timestampConfig.getString(TIMESTAMP_CREATED_KEY)); + } + if (timestampConfig.hasPath(TIMESTAMP_LAST_UPDATED_KEY)) { + tsBuilder.lastUpdated(timestampConfig.getString(TIMESTAMP_LAST_UPDATED_KEY)); + } + + return tsBuilder.build(); + } + public DatastoreConfig extract() { if (connectionConfigBuilder.endpoints().isEmpty() && !Endpoint.builder().build().equals(endpointBuilder.build())) { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/CollectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/CollectionConfig.java new file mode 100644 index 00000000..dc3a0e8a --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/CollectionConfig.java @@ -0,0 +1,22 @@ +package org.hypertrace.core.documentstore.model.config.postgres; + +import java.util.Optional; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class CollectionConfig { + + /** Timestamp field configuration for auto-managed created/updated timestamps */ + @Nullable TimestampFieldsConfig timestampFields; + + public Optional getTimestampFields() { + return Optional.ofNullable(timestampFields); + } + + public static CollectionConfig empty() { + return CollectionConfig.builder().build(); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/TimestampFieldsConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/TimestampFieldsConfig.java new file mode 100644 index 00000000..cb9dd225 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/TimestampFieldsConfig.java @@ -0,0 +1,35 @@ +package org.hypertrace.core.documentstore.model.config.postgres; + +import java.util.Optional; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Value; + +/** + * Configuration for auto-managed timestamp fields in a Postgres collection. Specifies which columns + * should be automatically populated with creation and last update timestamps. + */ +@Value +@Builder +public class TimestampFieldsConfig { + + /** + * Column name for the creation timestamp. This field is set once when a document is created and + * preserved on updates. + */ + @Nullable String created; + + /** + * Column name for the last update timestamp. This field is updated every time a document is + * created or modified. + */ + @Nullable String lastUpdated; + + public Optional getCreated() { + return Optional.ofNullable(created); + } + + public Optional getLastUpdated() { + return Optional.ofNullable(lastUpdated); + } +} From 4f633750539b41b1b7dd50976b24a79b51bf69a7 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 23 Feb 2026 16:14:16 +0530 Subject: [PATCH 12/16] Fix failing test cases --- .../core/documentstore/postgres/FlatPostgresCollection.java | 1 - 1 file changed, 1 deletion(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index 6ad0daa7..c6b1fa10 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -1066,7 +1066,6 @@ private String buildCreateOrReplaceSql( return col + " = DEFAULT"; } }) - .map(col -> col + " = EXCLUDED." + col) .collect(Collectors.joining(", ")); return String.format( From 951e0dbc24913dca84d840a756e3c76817bfc68d Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 23 Feb 2026 16:17:13 +0530 Subject: [PATCH 13/16] WIP --- .../model/config/postgres/PostgresConnectionConfig.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java index af6b308b..f63ec5e7 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java @@ -162,12 +162,6 @@ private static Duration extractSchemaRefreshCooldown(final Map c .orElse(DEFAULT_SCHEMA_REFRESH_COOLDOWN); } - /** - * Gets the collection-specific configuration for the given collection name. - * - * @param collectionName the name of the collection - * @return Optional containing the CollectionConfig, or empty if not configured - */ public Optional getCollectionConfig(String collectionName) { return Optional.ofNullable(collectionConfigs.get(collectionName)); } From 9f95ea9966ea175bbe4c947d364aaff61571f5ca Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 23 Feb 2026 16:36:12 +0530 Subject: [PATCH 14/16] Added more UTs for coverage --- ...afeConfigDatastoreConfigExtractorTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java index 3877638b..5ef84327 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java @@ -4,19 +4,23 @@ import static org.hypertrace.core.documentstore.model.config.AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE; import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.time.Duration; import java.util.List; import java.util.Map; +import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; import org.hypertrace.core.documentstore.model.options.DataFreshness; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; class TypesafeConfigDatastoreConfigExtractorTest { + private static final String TYPE_KEY = "database_type"; private static final String HOST_KEY = "hostname"; private static final String PORT_KEY = "port_number"; @@ -387,6 +391,21 @@ void testBuildPostgresWithCollectionConfigs() { assertEquals(false, nonExistentConfig.isPresent()); } + @Test + void testBuildPostgresWithCollectionConfigWithoutTimestampFields() { + final ConnectionConfig config = + TypesafeConfigDatastoreConfigExtractor.from( + buildConfigMapWithCollectionConfigWithoutTimestampFields(), TYPE_KEY) + .extract() + .connectionConfig(); + + final PostgresConnectionConfig postgresConfig = (PostgresConnectionConfig) config; + + var collectionWithoutTs = postgresConfig.getCollectionConfig("collection_without_ts"); + assertTrue(collectionWithoutTs.isPresent()); + assertFalse(collectionWithoutTs.get().getTimestampFields().isPresent()); + } + private Config buildConfigMap() { return ConfigFactory.parseMap( Map.ofEntries( @@ -516,4 +535,21 @@ private Config buildConfigMapWithCollectionConfigsForPostgres() { "postgres.collectionConfigs.entities_domain.timestampFields.lastUpdated", "update_ts"))); } + + private Config buildConfigMapWithCollectionConfigWithoutTimestampFields() { + return ConfigFactory.parseMap( + Map.ofEntries( + entry(TYPE_KEY, "postgres"), + entry("postgres.host", host), + entry("postgres.port", port), + entry("postgres.database", database), + entry("postgres.user", user), + entry("postgres.password", password), + entry("appName", appName), + entry("maxPoolSize", maxConnections), + entry("connectionAccessTimeout", accessTimeout), + entry("connectionIdleTime", surrenderTimeout), + // Collection config exists but has no timestampFields - uses a dummy key + entry("postgres.collectionConfigs.collection_without_ts.someOtherConfig", "value"))); + } } From 3029f3910ba471e04f4f8316cc162bfa16ac86d1 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 23 Feb 2026 16:42:11 +0530 Subject: [PATCH 15/16] Added more UTs for coverage --- ...afeConfigDatastoreConfigExtractorTest.java | 40 ++++++++++++++++++- 1 file changed, 38 insertions(+), 2 deletions(-) diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java index 5ef84327..9e385b2b 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java @@ -367,8 +367,8 @@ void testBuildPostgresWithCollectionConfigs() { final org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig postgresConfig = - (org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig) - config; + (org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig) + config; // Verify collection config for entities_api var entitiesApiConfig = postgresConfig.getCollectionConfig("entities_api"); @@ -406,6 +406,23 @@ void testBuildPostgresWithCollectionConfigWithoutTimestampFields() { assertFalse(collectionWithoutTs.get().getTimestampFields().isPresent()); } + @Test + void testBuildPostgresWithEmptyTimestampFields() { + final ConnectionConfig config = + TypesafeConfigDatastoreConfigExtractor.from( + buildConfigMapWithEmptyTimestampFields(), TYPE_KEY) + .extract() + .connectionConfig(); + + final PostgresConnectionConfig postgresConfig = (PostgresConnectionConfig) config; + + var collectionConfig = postgresConfig.getCollectionConfig("collection_empty_ts"); + assertTrue(collectionConfig.isPresent()); + assertTrue(collectionConfig.get().getTimestampFields().isPresent()); + assertFalse(collectionConfig.get().getTimestampFields().get().getCreated().isPresent()); + assertFalse(collectionConfig.get().getTimestampFields().get().getLastUpdated().isPresent()); + } + private Config buildConfigMap() { return ConfigFactory.parseMap( Map.ofEntries( @@ -552,4 +569,23 @@ private Config buildConfigMapWithCollectionConfigWithoutTimestampFields() { // Collection config exists but has no timestampFields - uses a dummy key entry("postgres.collectionConfigs.collection_without_ts.someOtherConfig", "value"))); } + + private Config buildConfigMapWithEmptyTimestampFields() { + return ConfigFactory.parseMap( + Map.ofEntries( + entry(TYPE_KEY, "postgres"), + entry("postgres.host", host), + entry("postgres.port", port), + entry("postgres.database", database), + entry("postgres.user", user), + entry("postgres.password", password), + entry("appName", appName), + entry("maxPoolSize", maxConnections), + entry("connectionAccessTimeout", accessTimeout), + entry("connectionIdleTime", surrenderTimeout), + // timestampFields exists but has neither created nor lastUpdated + entry( + "postgres.collectionConfigs.collection_empty_ts.timestampFields.unknownKey", + "value"))); + } } From 81470d01f2b072bc66c779ae4239f042021010a1 Mon Sep 17 00:00:00 2001 From: Prashant Pandey Date: Mon, 23 Feb 2026 16:43:41 +0530 Subject: [PATCH 16/16] Spotless --- .../config/TypesafeConfigDatastoreConfigExtractorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java index 9e385b2b..09a41517 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java @@ -367,8 +367,8 @@ void testBuildPostgresWithCollectionConfigs() { final org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig postgresConfig = - (org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig) - config; + (org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig) + config; // Verify collection config for entities_api var entitiesApiConfig = postgresConfig.getCollectionConfig("entities_api");