diff --git a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java index 138229ee..7830bb2f 100644 --- a/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java +++ b/document-store/src/integrationTest/java/org/hypertrace/core/documentstore/FlatCollectionWriteTest.java @@ -17,6 +17,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -95,6 +96,12 @@ public static void init() throws IOException { postgresConfig.put("url", postgresConnectionUrl); postgresConfig.put("user", "postgres"); postgresConfig.put("password", "postgres"); + postgresConfig.put( + "postgres.collectionConfigs." + FLAT_COLLECTION_NAME + ".timestampFields.created", + "createdTime"); + postgresConfig.put( + "postgres.collectionConfigs." + FLAT_COLLECTION_NAME + ".timestampFields.lastUpdated", + "lastUpdateTime"); postgresDatastore = DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(postgresConfig)); @@ -125,7 +132,9 @@ private static void createFlatCollectionSchema() { + "\"big_number\" BIGINT," + "\"rating\" REAL," + "\"created_date\" DATE," - + "\"weight\" DOUBLE PRECISION" + + "\"weight\" DOUBLE PRECISION," + + "\"createdTime\" BIGINT," + + "\"lastUpdateTime\" TIMESTAMP WITH TIME ZONE" + ");", FLAT_COLLECTION_NAME); @@ -3386,4 +3395,209 @@ void testSetWithJsonDocumentValue() throws Exception { } } } + + @Nested + @DisplayName("Timestamp Auto-Population Tests") + class TimestampTests { + + @Test + @DisplayName( + "Should auto-populate createdTime (BIGINT) and lastUpdateTime (TIMESTAMPTZ) on create") + void testTimestampsOnCreate() throws Exception { + long beforeCreate = System.currentTimeMillis(); + + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "ts-test-1"); + objectNode.put("item", "TimestampTestItem"); + objectNode.put("price", 100); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-1"); + + CreateResult result = flatCollection.create(key, document); + assertTrue(result.isSucceed()); + + long afterCreate = System.currentTimeMillis(); + + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + + long createdTime = rs.getLong("createdTime"); + assertFalse(rs.wasNull(), "createdTime should not be NULL"); + assertTrue( + createdTime >= beforeCreate && createdTime <= afterCreate, + "createdTime should be within test execution window"); + + Timestamp lastUpdateTime = rs.getTimestamp("lastUpdateTime"); + assertNotNull(lastUpdateTime, "lastUpdateTime should not be NULL"); + assertTrue( + lastUpdateTime.getTime() >= beforeCreate && lastUpdateTime.getTime() <= afterCreate, + "lastUpdateTime should be within test execution window"); + } + } + + @Test + @DisplayName("Should preserve createdTime and update lastUpdateTime on upsert") + void testTimestampsOnUpsert() throws Exception { + // First create + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "ts-test-2"); + objectNode.put("item", "UpsertTimestampTest"); + objectNode.put("price", 100); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-2"); + + flatCollection.create(key, document); + + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + long originalCreatedTime; + long originalLastUpdateTime; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key.toString())); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + originalCreatedTime = rs.getLong("createdTime"); + originalLastUpdateTime = rs.getTimestamp("lastUpdateTime").getTime(); + } + + // Wait a bit to ensure time difference + Thread.sleep(50); + + // Upsert (update existing) + long beforeUpsert = System.currentTimeMillis(); + objectNode.put("price", 200); + Document updatedDoc = new JSONDocument(objectNode); + flatCollection.createOrReplace(key, updatedDoc); + long afterUpsert = System.currentTimeMillis(); + + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key.toString())); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + + long newCreatedTime = rs.getLong("createdTime"); + assertEquals( + originalCreatedTime, newCreatedTime, "createdTime should be preserved on upsert"); + + long newLastUpdateTime = rs.getTimestamp("lastUpdateTime").getTime(); + assertTrue(newLastUpdateTime > originalLastUpdateTime, "lastUpdateTime should be updated"); + assertTrue( + newLastUpdateTime >= beforeUpsert && newLastUpdateTime <= afterUpsert, + "lastUpdateTime should be within upsert execution window"); + } + } + + @Test + @DisplayName( + "Should not throw exception when timestampFields config is missing - cols remain NULL") + void testNoExceptionWhenTimestampConfigMissing() throws Exception { + // Create a collection WITHOUT timestampFields config + String postgresConnectionUrl = + String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432)); + + Map configWithoutTimestamps = new HashMap<>(); + configWithoutTimestamps.put("url", postgresConnectionUrl); + configWithoutTimestamps.put("user", "postgres"); + configWithoutTimestamps.put("password", "postgres"); + // Note: NO customParams.timestampFields config + + Datastore datastoreWithoutTimestamps = + DatastoreProvider.getDatastore( + "Postgres", ConfigFactory.parseMap(configWithoutTimestamps)); + Collection collectionWithoutTimestamps = + datastoreWithoutTimestamps.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); + + // Create a document - should NOT throw exception + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "ts-test-no-config"); + objectNode.put("item", "NoTimestampConfigTest"); + objectNode.put("price", 100); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-no-config"); + + CreateResult result = collectionWithoutTimestamps.create(key, document); + assertTrue(result.isSucceed()); + + // Verify timestamp columns are NULL + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key)); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + + assertNull( + rs.getObject("createdTime"), "createdTime should be NULL when config is missing"); + assertNull( + rs.getObject("lastUpdateTime"), "lastUpdateTime should be NULL when config is missing"); + } + } + + @Test + @DisplayName( + "Should not throw exception when timestampFields config is invalid JSON - cols remain NULL") + void testNoExceptionWhenTimestampConfigInvalidJson() throws Exception { + // Create a collection with INVALID JSON in timestampFields config + String postgresConnectionUrl = + String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432)); + + Map configWithInvalidJson = new HashMap<>(); + configWithInvalidJson.put("url", postgresConnectionUrl); + configWithInvalidJson.put("user", "postgres"); + configWithInvalidJson.put("password", "postgres"); + // Invalid JSON - missing quotes, malformed + configWithInvalidJson.put("customParams.timestampFields", "not valid json {{{"); + + Datastore datastoreWithInvalidConfig = + DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(configWithInvalidJson)); + Collection collectionWithInvalidConfig = + datastoreWithInvalidConfig.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT); + + // Create a document - should NOT throw exception + ObjectNode objectNode = OBJECT_MAPPER.createObjectNode(); + objectNode.put("id", "ts-test-invalid-json"); + objectNode.put("item", "InvalidJsonConfigTest"); + objectNode.put("price", 100); + Document document = new JSONDocument(objectNode); + Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-invalid-json"); + + CreateResult result = collectionWithInvalidConfig.create(key, document); + assertTrue(result.isSucceed()); + + // Verify timestamp columns are NULL (config parsing failed gracefully) + PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore; + try (Connection conn = pgDatastore.getPostgresClient(); + PreparedStatement ps = + conn.prepareStatement( + String.format( + "SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'", + FLAT_COLLECTION_NAME, key.toString())); + ResultSet rs = ps.executeQuery()) { + assertTrue(rs.next()); + + rs.getLong("createdTime"); + assertTrue(rs.wasNull(), "createdTime should be NULL when config JSON is invalid"); + + rs.getTimestamp("lastUpdateTime"); + assertTrue(rs.wasNull(), "lastUpdateTime should be NULL when config JSON is invalid"); + } + } + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java index abc00e88..e033561e 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/TypesafeDatastoreConfigAdapter.java @@ -94,7 +94,8 @@ public DatastoreConfig convert(final Config config) { connectionConfig.applicationName(), connectionConfig.connectionPoolConfig(), connectionConfig.queryTimeout(), - connectionConfig.customParameters()) { + connectionConfig.customParameters(), + connectionConfig.collectionConfigs()) { @Override public String toConnectionString() { return config.hasPath("url") diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java index 11127b81..ae8ed5f1 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/ConnectionConfig.java @@ -75,12 +75,21 @@ public static class ConnectionConfigBuilder { String applicationName = DEFAULT_APP_NAME; String replicaSet; Map customParameters = new HashMap<>(); + Map + collectionConfigs = new HashMap<>(); public ConnectionConfigBuilder customParameter(String key, String value) { this.customParameters.put(key, value); return this; } + public ConnectionConfigBuilder collectionConfig( + String collectionName, + org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig config) { + this.collectionConfigs.put(collectionName, config); + return this; + } + ConnectionPoolConfig connectionPoolConfig; AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS; DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT; @@ -127,7 +136,8 @@ public ConnectionConfig build() { applicationName, connectionPoolConfig, queryTimeout, - customParameters); + customParameters, + collectionConfigs); } throw new IllegalArgumentException("Unsupported database type: " + type); diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java index f094e5a7..90fda99b 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractor.java @@ -10,12 +10,14 @@ import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig.ConnectionPoolConfigBuilder; import org.hypertrace.core.documentstore.model.config.DatastoreConfig.DatastoreConfigBuilder; import org.hypertrace.core.documentstore.model.config.Endpoint.EndpointBuilder; +import org.hypertrace.core.documentstore.model.config.postgres.TimestampFieldsConfig; import org.hypertrace.core.documentstore.model.options.DataFreshness; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @Value public class TypesafeConfigDatastoreConfigExtractor { + private static final Logger LOGGER = LoggerFactory.getLogger(TypesafeConfigDatastoreConfigExtractor.class); private static final String DEFAULT_HOST_KEY = "host"; @@ -35,6 +37,10 @@ public class TypesafeConfigDatastoreConfigExtractor { private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout"; private static final String DEFAULT_CONNECTION_TIMEOUT_KEY = "connectionTimeout"; private static final String DEFAULT_CUSTOM_PARAMETERS_PREFIX = "customParams"; + private static final String DEFAULT_COLLECTION_CONFIGS_PREFIX = "collectionConfigs"; + private static final String TIMESTAMP_FIELDS_KEY = "timestampFields"; + private static final String TIMESTAMP_CREATED_KEY = "created"; + private static final String TIMESTAMP_LAST_UPDATED_KEY = "lastUpdated"; @NonNull Config config; DatastoreConfigBuilder datastoreConfigBuilder; @@ -82,7 +88,8 @@ private TypesafeConfigDatastoreConfigExtractor( .dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY) .queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY) .connectionTimeoutKey(DEFAULT_CONNECTION_TIMEOUT_KEY) - .customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX); + .customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX) + .collectionConfigsKey(dataStoreType + "." + DEFAULT_COLLECTION_CONFIGS_PREFIX); } public static TypesafeConfigDatastoreConfigExtractor from( @@ -253,6 +260,53 @@ public TypesafeConfigDatastoreConfigExtractor connectionTimeoutKey(@NonNull fina return this; } + public TypesafeConfigDatastoreConfigExtractor collectionConfigsKey(@NonNull final String key) { + if (!config.hasPath(key)) { + return this; + } + + try { + Config collectionsConfig = config.getConfig(key); + collectionsConfig + .root() + .keySet() + .forEach( + collectionName -> { + Config collectionConfig = collectionsConfig.getConfig(collectionName); + org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig + .CollectionConfigBuilder + builder = + org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig + .builder(); + + if (collectionConfig.hasPath(TIMESTAMP_FIELDS_KEY)) { + builder.timestampFields( + parseTimestampFieldsConfig(collectionConfig.getConfig(TIMESTAMP_FIELDS_KEY))); + } + + connectionConfigBuilder.collectionConfig(collectionName, builder.build()); + }); + } catch (Exception e) { + LOGGER.warn( + "Collection configs key '{}' exists but could not be parsed: {}", key, e.getMessage()); + } + + return this; + } + + private TimestampFieldsConfig parseTimestampFieldsConfig(Config timestampConfig) { + TimestampFieldsConfig.TimestampFieldsConfigBuilder tsBuilder = TimestampFieldsConfig.builder(); + + if (timestampConfig.hasPath(TIMESTAMP_CREATED_KEY)) { + tsBuilder.created(timestampConfig.getString(TIMESTAMP_CREATED_KEY)); + } + if (timestampConfig.hasPath(TIMESTAMP_LAST_UPDATED_KEY)) { + tsBuilder.lastUpdated(timestampConfig.getString(TIMESTAMP_LAST_UPDATED_KEY)); + } + + return tsBuilder.build(); + } + public DatastoreConfig extract() { if (connectionConfigBuilder.endpoints().isEmpty() && !Endpoint.builder().build().equals(endpointBuilder.build())) { @@ -265,6 +319,7 @@ public DatastoreConfig extract() { .connectionPoolConfig(connectionPoolConfigBuilder.build()) .credentials(connectionCredentialsBuilder.build()) .customParameters(connectionConfigBuilder.customParameters()) + .collectionConfigs(connectionConfigBuilder.collectionConfigs()) .build()) .build(); } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/CollectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/CollectionConfig.java new file mode 100644 index 00000000..dc3a0e8a --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/CollectionConfig.java @@ -0,0 +1,22 @@ +package org.hypertrace.core.documentstore.model.config.postgres; + +import java.util.Optional; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Value; + +@Value +@Builder +public class CollectionConfig { + + /** Timestamp field configuration for auto-managed created/updated timestamps */ + @Nullable TimestampFieldsConfig timestampFields; + + public Optional getTimestampFields() { + return Optional.ofNullable(timestampFields); + } + + public static CollectionConfig empty() { + return CollectionConfig.builder().build(); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java index 409fa95d..f63ec5e7 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/PostgresConnectionConfig.java @@ -5,6 +5,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -46,6 +47,7 @@ public class PostgresConnectionConfig extends ConnectionConfig { @NonNull Duration queryTimeout; @NonNull Duration schemaCacheExpiry; @NonNull Duration schemaRefreshCooldown; + @NonNull Map collectionConfigs; public static ConnectionConfigBuilder builder() { return ConnectionConfig.builder().type(DatabaseType.POSTGRES); @@ -58,7 +60,8 @@ public PostgresConnectionConfig( @NonNull final String applicationName, @Nullable final ConnectionPoolConfig connectionPoolConfig, @NonNull final Duration queryTimeout, - @NonNull final Map customParameters) { + @NonNull final Map customParameters, + @Nullable final Map collectionConfigs) { super( ensureSingleEndpoint(endpoints), getDatabaseOrDefault(database), @@ -69,6 +72,7 @@ public PostgresConnectionConfig( this.queryTimeout = queryTimeout; this.schemaCacheExpiry = extractSchemaCacheExpiry(customParameters); this.schemaRefreshCooldown = extractSchemaRefreshCooldown(customParameters); + this.collectionConfigs = collectionConfigs != null ? collectionConfigs : Collections.emptyMap(); } public String toConnectionString() { @@ -157,4 +161,8 @@ private static Duration extractSchemaRefreshCooldown(final Map c .map(Duration::ofMillis) .orElse(DEFAULT_SCHEMA_REFRESH_COOLDOWN); } + + public Optional getCollectionConfig(String collectionName) { + return Optional.ofNullable(collectionConfigs.get(collectionName)); + } } diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/TimestampFieldsConfig.java b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/TimestampFieldsConfig.java new file mode 100644 index 00000000..cb9dd225 --- /dev/null +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/model/config/postgres/TimestampFieldsConfig.java @@ -0,0 +1,35 @@ +package org.hypertrace.core.documentstore.model.config.postgres; + +import java.util.Optional; +import javax.annotation.Nullable; +import lombok.Builder; +import lombok.Value; + +/** + * Configuration for auto-managed timestamp fields in a Postgres collection. Specifies which columns + * should be automatically populated with creation and last update timestamps. + */ +@Value +@Builder +public class TimestampFieldsConfig { + + /** + * Column name for the creation timestamp. This field is set once when a document is created and + * preserved on updates. + */ + @Nullable String created; + + /** + * Column name for the last update timestamp. This field is updated every time a document is + * created or modified. + */ + @Nullable String lastUpdated; + + public Optional getCreated() { + return Optional.ofNullable(created); + } + + public Optional getLastUpdated() { + return Optional.ofNullable(lastUpdated); + } +} diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java index cf4835a5..5f868525 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollection.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.io.IOException; import java.sql.Array; @@ -34,6 +35,7 @@ import org.hypertrace.core.documentstore.Filter; import org.hypertrace.core.documentstore.Key; import org.hypertrace.core.documentstore.UpdateResult; +import org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig; import org.hypertrace.core.documentstore.model.exception.DuplicateDocumentException; import org.hypertrace.core.documentstore.model.exception.SchemaMismatchException; import org.hypertrace.core.documentstore.model.options.MissingColumnStrategy; @@ -88,6 +90,9 @@ SET, new FlatCollectionSubDocSetOperatorParser(), */ private final MissingColumnStrategy missingColumnStrategy; + private final String createdTsColumn; + private final String lastUpdatedTsColumn; + FlatPostgresCollection( final PostgresClient client, final String collectionName, @@ -95,6 +100,38 @@ SET, new FlatCollectionSubDocSetOperatorParser(), super(client, collectionName); this.schemaRegistry = schemaRegistry; this.missingColumnStrategy = parseMissingColumnStrategy(client.getCustomParameters()); + + // Get timestamp configuration from collectionConfigs + String createdTs = null; + String lastUpdatedTs = null; + + Optional collectionConfig = client.getCollectionConfig(collectionName); + if (collectionConfig.isPresent() && collectionConfig.get().getTimestampFields().isPresent()) { + var tsConfig = collectionConfig.get().getTimestampFields().get(); + createdTs = tsConfig.getCreated().orElse(null); + lastUpdatedTs = tsConfig.getLastUpdated().orElse(null); + } + + this.createdTsColumn = createdTs; + this.lastUpdatedTsColumn = lastUpdatedTs; + + if (this.createdTsColumn == null) { + LOGGER.warn( + "timestampFields config not set properly for collection '{}'. " + + "Row created timestamp will not be auto-managed. " + + "Configure via collectionConfigs.{}.timestampFields {{ created = \"\", lastUpdated = \"\" }}", + collectionName, + collectionName); + } + + if (this.lastUpdatedTsColumn == null) { + LOGGER.warn( + "timestampFields config not set properly for collection '{}'. " + + "Row lastUpdated timestamp will not be auto-managed. " + + "Configure via collectionConfigs.{}.timestampFields {{ created = \"\", lastUpdated = \"\" }}", + collectionName, + collectionName); + } } private static MissingColumnStrategy parseMissingColumnStrategy(Map params) { @@ -389,10 +426,17 @@ private String buildMergeUpsertSql( String columnList = String.join(", ", columns); String placeholders = String.join(", ", columns.stream().map(c -> "?").toArray(String[]::new)); - // Build SET clause for non-PK columns: col = EXCLUDED.col + String quotedCreatedTs = + createdTsColumn != null + ? PostgresUtils.wrapFieldNamesWithDoubleQuotes(createdTsColumn) + : null; + + // Build SET clause for non-PK columns: col = EXCLUDED.col. Exclude createdTsColumn from updates + // to preserve original creation time String setClause = columns.stream() .filter(col -> !col.equals(pkColumn)) + .filter(col -> !col.equals(quotedCreatedTs)) .map(col -> col + " = EXCLUDED." + col) .collect(Collectors.joining(", ")); @@ -890,9 +934,80 @@ private TypedDocument parseDocument( } } + addTimestampFields(typedDocument, tableName); + return typedDocument; } + /** + * Adds timestamp fields if configured and columns exist in the schema. The column type is looked + * up from the schema to determine the appropriate value format: + * + *
    + *
  • BIGINT: epoch milliseconds (long) + *
  • TIMESTAMPTZ: java.sql.Timestamp + *
  • DATE: java.sql.Date + *
  • TEXT: ISO-8601 string + *
+ */ + private void addTimestampFields(TypedDocument typedDocument, String tableName) { + long now = System.currentTimeMillis(); + + if (createdTsColumn != null) { + addTimestampField(typedDocument, tableName, createdTsColumn, now, false); + } + + if (lastUpdatedTsColumn != null) { + addTimestampField(typedDocument, tableName, lastUpdatedTsColumn, now, true); + } + } + + private void addTimestampField( + TypedDocument typedDocument, + String tableName, + String columnName, + long epochMillis, + boolean overwrite) { + if (!overwrite && typedDocument.hasColumn(columnName)) { + return; + } + + Optional colMeta = + schemaRegistry.getColumnOrRefresh(tableName, columnName); + if (colMeta.isEmpty()) { + LOGGER.debug( + "Timestamp column '{}' configured but not found in schema for table '{}'", + columnName, + tableName); + return; + } + + PostgresDataType type = colMeta.get().getPostgresType(); + String quotedCol = PostgresUtils.wrapFieldNamesWithDoubleQuotes(columnName); + Object value = convertTimestampForType(epochMillis, type); + typedDocument.add(quotedCol, value, type, false); + } + + @VisibleForTesting + Object convertTimestampForType(long epochMillis, PostgresDataType type) { + switch (type) { + case BIGINT: + return epochMillis; + case INTEGER: + return (int) (epochMillis / 1000); + case TIMESTAMPTZ: + return Timestamp.from(Instant.ofEpochMilli(epochMillis)); + case DATE: + return new Date(epochMillis); + case TEXT: + return Instant.ofEpochMilli(epochMillis).toString(); // ISO-8601 + default: + LOGGER.warn( + "Unexpected type '{}' for timestamp column, using epoch millis as string", type); + return String.valueOf(epochMillis); + } + } + private int executeUpdate(String sql, TypedDocument parsed) throws SQLException { try (Connection conn = client.getPooledConnection(); PreparedStatement ps = conn.prepareStatement(sql)) { @@ -937,7 +1052,7 @@ private boolean createOrReplaceWithRetry(Key key, Document document, boolean isR return executeUpsert(sql, parsed); } catch (PSQLException e) { - return handlePSQLExceptionForCreateOrReplace(e, key, document, tableName, isRetry); + return handlePSQLExceptionForUpsert(e, key, document, tableName, isRetry); } catch (SQLException e) { LOGGER.error("SQLException in createOrReplace. key: {} content: {}", key, document, e); throw new IOException(e); @@ -1059,10 +1174,17 @@ private String buildCreateOrReplaceSql( String.join(", ", docColumns.stream().map(c -> "?").toArray(String[]::new)); Set docColumnsSet = new HashSet<>(docColumns); + // Build SET clause for non-PK columns: col = EXCLUDED.col + // Exclude createdTsColumn from updates to preserve original creation time + String quotedCreatedTs = + createdTsColumn != null + ? PostgresUtils.wrapFieldNamesWithDoubleQuotes(createdTsColumn) + : null; // Build SET clause for non-PK columns. String setClause = allTableColumns.stream() .filter(col -> !col.equals(pkColumn)) + .filter(col -> !col.equals(quotedCreatedTs)) .map( col -> { if (docColumnsSet.contains(col)) { @@ -1198,6 +1320,11 @@ PostgresDataType getType(String column) { boolean isArray(String column) { return arrays.getOrDefault(column, false); } + + boolean hasColumn(String columnName) { + String quotedCol = PostgresUtils.wrapFieldNamesWithDoubleQuotes(columnName); + return values.containsKey(quotedCol) || values.containsKey(columnName); + } } private String buildInsertSql(List columns) { diff --git a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java index d0ced723..9e7d48eb 100644 --- a/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java +++ b/document-store/src/main/java/org/hypertrace/core/documentstore/postgres/PostgresClient.java @@ -7,7 +7,9 @@ import java.sql.SQLException; import java.time.Duration; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; import org.hypertrace.core.documentstore.model.config.postgres.PostgresDefaults; import org.slf4j.Logger; @@ -69,6 +71,10 @@ public Map getCustomParameters() { return connectionConfig.customParameters(); } + public Optional getCollectionConfig(String collectionName) { + return connectionConfig.getCollectionConfig(collectionName); + } + public int getQueryTimeoutSeconds() { return (int) connectionConfig.queryTimeout().toSeconds(); } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java index 01062175..09a41517 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/model/config/TypesafeConfigDatastoreConfigExtractorTest.java @@ -4,19 +4,23 @@ import static org.hypertrace.core.documentstore.model.config.AggregatePipelineMode.SORT_OPTIMIZED_IF_POSSIBLE; import static org.hypertrace.core.documentstore.model.options.DataFreshness.NEAR_REALTIME_FRESHNESS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import java.time.Duration; import java.util.List; import java.util.Map; +import org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig; import org.hypertrace.core.documentstore.model.options.DataFreshness; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; class TypesafeConfigDatastoreConfigExtractorTest { + private static final String TYPE_KEY = "database_type"; private static final String HOST_KEY = "hostname"; private static final String PORT_KEY = "port_number"; @@ -353,6 +357,72 @@ void testBuildPostgresUsingDefaultKeysAndCustomParams() { assertEquals(expected, config); } + @Test + void testBuildPostgresWithCollectionConfigs() { + final ConnectionConfig config = + TypesafeConfigDatastoreConfigExtractor.from( + buildConfigMapWithCollectionConfigsForPostgres(), TYPE_KEY) + .extract() + .connectionConfig(); + + final org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig + postgresConfig = + (org.hypertrace.core.documentstore.model.config.postgres.PostgresConnectionConfig) + config; + + // Verify collection config for entities_api + var entitiesApiConfig = postgresConfig.getCollectionConfig("entities_api"); + assertEquals(true, entitiesApiConfig.isPresent()); + var timestampFields = entitiesApiConfig.get().getTimestampFields(); + assertEquals(true, timestampFields.isPresent()); + assertEquals("created_time", timestampFields.get().getCreated().get()); + assertEquals("last_updated_time", timestampFields.get().getLastUpdated().get()); + + // Verify collection config for entities_domain + var entitiesDomainConfig = postgresConfig.getCollectionConfig("entities_domain"); + assertEquals(true, entitiesDomainConfig.isPresent()); + var domainTimestampFields = entitiesDomainConfig.get().getTimestampFields(); + assertEquals(true, domainTimestampFields.isPresent()); + assertEquals("create_ts", domainTimestampFields.get().getCreated().get()); + assertEquals("update_ts", domainTimestampFields.get().getLastUpdated().get()); + + // Verify non-existent collection config returns empty + var nonExistentConfig = postgresConfig.getCollectionConfig("non_existent"); + assertEquals(false, nonExistentConfig.isPresent()); + } + + @Test + void testBuildPostgresWithCollectionConfigWithoutTimestampFields() { + final ConnectionConfig config = + TypesafeConfigDatastoreConfigExtractor.from( + buildConfigMapWithCollectionConfigWithoutTimestampFields(), TYPE_KEY) + .extract() + .connectionConfig(); + + final PostgresConnectionConfig postgresConfig = (PostgresConnectionConfig) config; + + var collectionWithoutTs = postgresConfig.getCollectionConfig("collection_without_ts"); + assertTrue(collectionWithoutTs.isPresent()); + assertFalse(collectionWithoutTs.get().getTimestampFields().isPresent()); + } + + @Test + void testBuildPostgresWithEmptyTimestampFields() { + final ConnectionConfig config = + TypesafeConfigDatastoreConfigExtractor.from( + buildConfigMapWithEmptyTimestampFields(), TYPE_KEY) + .extract() + .connectionConfig(); + + final PostgresConnectionConfig postgresConfig = (PostgresConnectionConfig) config; + + var collectionConfig = postgresConfig.getCollectionConfig("collection_empty_ts"); + assertTrue(collectionConfig.isPresent()); + assertTrue(collectionConfig.get().getTimestampFields().isPresent()); + assertFalse(collectionConfig.get().getTimestampFields().get().getCreated().isPresent()); + assertFalse(collectionConfig.get().getTimestampFields().get().getLastUpdated().isPresent()); + } + private Config buildConfigMap() { return ConfigFactory.parseMap( Map.ofEntries( @@ -457,4 +527,65 @@ private Config buildMongoConfigMapUsingEndpointsKey() { entry(CONNECTION_ACCESS_TIMEOUT_KEY, accessTimeout), entry(CONNECTION_SURRENDER_TIMEOUT_KEY, surrenderTimeout))); } + + private Config buildConfigMapWithCollectionConfigsForPostgres() { + return ConfigFactory.parseMap( + Map.ofEntries( + entry(TYPE_KEY, "postgres"), + entry("postgres.host", host), + entry("postgres.port", port), + entry("postgres.database", database), + entry("postgres.user", user), + entry("postgres.password", password), + entry("appName", appName), + entry("maxPoolSize", maxConnections), + entry("connectionAccessTimeout", accessTimeout), + entry("connectionIdleTime", surrenderTimeout), + entry( + "postgres.collectionConfigs.entities_api.timestampFields.created", "created_time"), + entry( + "postgres.collectionConfigs.entities_api.timestampFields.lastUpdated", + "last_updated_time"), + entry( + "postgres.collectionConfigs.entities_domain.timestampFields.created", "create_ts"), + entry( + "postgres.collectionConfigs.entities_domain.timestampFields.lastUpdated", + "update_ts"))); + } + + private Config buildConfigMapWithCollectionConfigWithoutTimestampFields() { + return ConfigFactory.parseMap( + Map.ofEntries( + entry(TYPE_KEY, "postgres"), + entry("postgres.host", host), + entry("postgres.port", port), + entry("postgres.database", database), + entry("postgres.user", user), + entry("postgres.password", password), + entry("appName", appName), + entry("maxPoolSize", maxConnections), + entry("connectionAccessTimeout", accessTimeout), + entry("connectionIdleTime", surrenderTimeout), + // Collection config exists but has no timestampFields - uses a dummy key + entry("postgres.collectionConfigs.collection_without_ts.someOtherConfig", "value"))); + } + + private Config buildConfigMapWithEmptyTimestampFields() { + return ConfigFactory.parseMap( + Map.ofEntries( + entry(TYPE_KEY, "postgres"), + entry("postgres.host", host), + entry("postgres.port", port), + entry("postgres.database", database), + entry("postgres.user", user), + entry("postgres.password", password), + entry("appName", appName), + entry("maxPoolSize", maxConnections), + entry("connectionAccessTimeout", accessTimeout), + entry("connectionIdleTime", surrenderTimeout), + // timestampFields exists but has neither created nor lastUpdated + entry( + "postgres.collectionConfigs.collection_empty_ts.timestampFields.unknownKey", + "value"))); + } } diff --git a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java index fcd94112..c59b2afd 100644 --- a/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java +++ b/document-store/src/test/java/org/hypertrace/core/documentstore/postgres/FlatPostgresCollectionTest.java @@ -2,6 +2,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -17,9 +18,12 @@ import java.io.IOException; import java.sql.Connection; +import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Timestamp; +import java.time.Instant; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -464,4 +468,89 @@ void testBulkUpdateClosesIteratorOnException() throws Exception { verify(mockIterator).close(); } } + + @Nested + @DisplayName("convertTimestampForType Tests") + class TimestampCoversionTests { + + private static final long TEST_EPOCH_MILLIS = 1707465494818L; // 2024-02-09T06:58:14.818Z + + @Test + @DisplayName("BIGINT should return epoch millis as-is") + void testBigintReturnsEpochMillis() { + Object result = + flatPostgresCollection.convertTimestampForType( + TEST_EPOCH_MILLIS, PostgresDataType.BIGINT); + + assertInstanceOf(Long.class, result); + assertEquals(TEST_EPOCH_MILLIS, result); + } + + @Test + @DisplayName("INTEGER should return epoch seconds (millis / 1000)") + void testIntegerReturnsEpochSeconds() { + Object result = + flatPostgresCollection.convertTimestampForType( + TEST_EPOCH_MILLIS, PostgresDataType.INTEGER); + + assertInstanceOf(Integer.class, result); + assertEquals((int) (TEST_EPOCH_MILLIS / 1000), result); + } + + @Test + @DisplayName("TIMESTAMPTZ should return java.sql.Timestamp") + void testTimestamptzReturnsSqlTimestamp() { + Object result = + flatPostgresCollection.convertTimestampForType( + TEST_EPOCH_MILLIS, PostgresDataType.TIMESTAMPTZ); + + assertInstanceOf(Timestamp.class, result); + Timestamp timestamp = (Timestamp) result; + assertEquals(TEST_EPOCH_MILLIS, timestamp.getTime()); + } + + @Test + @DisplayName("DATE should return java.sql.Date") + void testDateReturnsSqlDate() { + Object result = + flatPostgresCollection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.DATE); + + assertInstanceOf(Date.class, result); + Date date = (Date) result; + assertEquals(TEST_EPOCH_MILLIS, date.getTime()); + } + + @Test + @DisplayName("TEXT should return ISO-8601 formatted string") + void testTextReturnsIso8601String() { + Object result = + flatPostgresCollection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.TEXT); + + assertInstanceOf(String.class, result); + String isoString = (String) result; + assertEquals(Instant.ofEpochMilli(TEST_EPOCH_MILLIS).toString(), isoString); + assertTrue(isoString.contains("2024-02-09")); + } + + @Test + @DisplayName("Unsupported type should return epoch millis as string (fallback)") + void testUnsupportedTypeReturnsStringFallback() { + // REAL is not a typical timestamp type, should fall through to default + Object result = + flatPostgresCollection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.REAL); + + assertInstanceOf(String.class, result); + assertEquals(String.valueOf(TEST_EPOCH_MILLIS), result); + } + + @Test + @DisplayName("JSONB type should return epoch millis as string (fallback)") + void testJsonbTypeReturnsStringFallback() { + Object result = + flatPostgresCollection.convertTimestampForType(TEST_EPOCH_MILLIS, PostgresDataType.JSONB); + + assertInstanceOf(String.class, result); + assertEquals(String.valueOf(TEST_EPOCH_MILLIS), result); + } + } }