Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -95,6 +96,12 @@ public static void init() throws IOException {
postgresConfig.put("url", postgresConnectionUrl);
postgresConfig.put("user", "postgres");
postgresConfig.put("password", "postgres");
postgresConfig.put(
"postgres.collectionConfigs." + FLAT_COLLECTION_NAME + ".timestampFields.created",
"createdTime");
postgresConfig.put(
"postgres.collectionConfigs." + FLAT_COLLECTION_NAME + ".timestampFields.lastUpdated",
"lastUpdateTime");

postgresDatastore =
DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(postgresConfig));
Expand Down Expand Up @@ -125,7 +132,9 @@ private static void createFlatCollectionSchema() {
+ "\"big_number\" BIGINT,"
+ "\"rating\" REAL,"
+ "\"created_date\" DATE,"
+ "\"weight\" DOUBLE PRECISION"
+ "\"weight\" DOUBLE PRECISION,"
+ "\"createdTime\" BIGINT,"
+ "\"lastUpdateTime\" TIMESTAMP WITH TIME ZONE"
+ ");",
FLAT_COLLECTION_NAME);

Expand Down Expand Up @@ -2926,4 +2935,209 @@ void testSetWithJsonDocumentValue() throws Exception {
}
}
}

@Nested
@DisplayName("Timestamp Auto-Population Tests")
class TimestampTests {

@Test
@DisplayName(
"Should auto-populate createdTime (BIGINT) and lastUpdateTime (TIMESTAMPTZ) on create")
void testTimestampsOnCreate() throws Exception {
long beforeCreate = System.currentTimeMillis();

ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put("id", "ts-test-1");
objectNode.put("item", "TimestampTestItem");
objectNode.put("price", 100);
Document document = new JSONDocument(objectNode);
Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-1");

CreateResult result = flatCollection.create(key, document);
assertTrue(result.isSucceed());

long afterCreate = System.currentTimeMillis();

PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
FLAT_COLLECTION_NAME, key));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());

long createdTime = rs.getLong("createdTime");
assertFalse(rs.wasNull(), "createdTime should not be NULL");
assertTrue(
createdTime >= beforeCreate && createdTime <= afterCreate,
"createdTime should be within test execution window");

Timestamp lastUpdateTime = rs.getTimestamp("lastUpdateTime");
assertNotNull(lastUpdateTime, "lastUpdateTime should not be NULL");
assertTrue(
lastUpdateTime.getTime() >= beforeCreate && lastUpdateTime.getTime() <= afterCreate,
"lastUpdateTime should be within test execution window");
}
}

@Test
@DisplayName("Should preserve createdTime and update lastUpdateTime on upsert")
void testTimestampsOnUpsert() throws Exception {
// First create
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put("id", "ts-test-2");
objectNode.put("item", "UpsertTimestampTest");
objectNode.put("price", 100);
Document document = new JSONDocument(objectNode);
Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-2");

flatCollection.create(key, document);

PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
long originalCreatedTime;
long originalLastUpdateTime;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
FLAT_COLLECTION_NAME, key.toString()));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());
originalCreatedTime = rs.getLong("createdTime");
originalLastUpdateTime = rs.getTimestamp("lastUpdateTime").getTime();
}

// Wait a bit to ensure time difference
Thread.sleep(50);

// Upsert (update existing)
long beforeUpsert = System.currentTimeMillis();
objectNode.put("price", 200);
Document updatedDoc = new JSONDocument(objectNode);
flatCollection.createOrReplace(key, updatedDoc);
long afterUpsert = System.currentTimeMillis();

try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
FLAT_COLLECTION_NAME, key.toString()));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());

long newCreatedTime = rs.getLong("createdTime");
assertEquals(
originalCreatedTime, newCreatedTime, "createdTime should be preserved on upsert");

long newLastUpdateTime = rs.getTimestamp("lastUpdateTime").getTime();
assertTrue(newLastUpdateTime > originalLastUpdateTime, "lastUpdateTime should be updated");
assertTrue(
newLastUpdateTime >= beforeUpsert && newLastUpdateTime <= afterUpsert,
"lastUpdateTime should be within upsert execution window");
}
}

@Test
@DisplayName(
"Should not throw exception when timestampFields config is missing - cols remain NULL")
void testNoExceptionWhenTimestampConfigMissing() throws Exception {
// Create a collection WITHOUT timestampFields config
String postgresConnectionUrl =
String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432));

Map<String, String> configWithoutTimestamps = new HashMap<>();
configWithoutTimestamps.put("url", postgresConnectionUrl);
configWithoutTimestamps.put("user", "postgres");
configWithoutTimestamps.put("password", "postgres");
// Note: NO customParams.timestampFields config

Datastore datastoreWithoutTimestamps =
DatastoreProvider.getDatastore(
"Postgres", ConfigFactory.parseMap(configWithoutTimestamps));
Collection collectionWithoutTimestamps =
datastoreWithoutTimestamps.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT);

// Create a document - should NOT throw exception
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put("id", "ts-test-no-config");
objectNode.put("item", "NoTimestampConfigTest");
objectNode.put("price", 100);
Document document = new JSONDocument(objectNode);
Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-no-config");

CreateResult result = collectionWithoutTimestamps.create(key, document);
assertTrue(result.isSucceed());

// Verify timestamp columns are NULL
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
FLAT_COLLECTION_NAME, key));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());

assertNull(
rs.getObject("createdTime"), "createdTime should be NULL when config is missing");
assertNull(
rs.getObject("lastUpdateTime"), "lastUpdateTime should be NULL when config is missing");
}
}

@Test
@DisplayName(
"Should not throw exception when timestampFields config is invalid JSON - cols remain NULL")
void testNoExceptionWhenTimestampConfigInvalidJson() throws Exception {
// Create a collection with INVALID JSON in timestampFields config
String postgresConnectionUrl =
String.format("jdbc:postgresql://localhost:%s/", postgres.getMappedPort(5432));

Map<String, String> configWithInvalidJson = new HashMap<>();
configWithInvalidJson.put("url", postgresConnectionUrl);
configWithInvalidJson.put("user", "postgres");
configWithInvalidJson.put("password", "postgres");
// Invalid JSON - missing quotes, malformed
configWithInvalidJson.put("customParams.timestampFields", "not valid json {{{");

Datastore datastoreWithInvalidConfig =
DatastoreProvider.getDatastore("Postgres", ConfigFactory.parseMap(configWithInvalidJson));
Collection collectionWithInvalidConfig =
datastoreWithInvalidConfig.getCollectionForType(FLAT_COLLECTION_NAME, DocumentType.FLAT);

// Create a document - should NOT throw exception
ObjectNode objectNode = OBJECT_MAPPER.createObjectNode();
objectNode.put("id", "ts-test-invalid-json");
objectNode.put("item", "InvalidJsonConfigTest");
objectNode.put("price", 100);
Document document = new JSONDocument(objectNode);
Key key = new SingleValueKey(DEFAULT_TENANT, "ts-test-invalid-json");

CreateResult result = collectionWithInvalidConfig.create(key, document);
assertTrue(result.isSucceed());

// Verify timestamp columns are NULL (config parsing failed gracefully)
PostgresDatastore pgDatastore = (PostgresDatastore) postgresDatastore;
try (Connection conn = pgDatastore.getPostgresClient();
PreparedStatement ps =
conn.prepareStatement(
String.format(
"SELECT \"createdTime\", \"lastUpdateTime\" FROM \"%s\" WHERE \"id\" = '%s'",
FLAT_COLLECTION_NAME, key.toString()));
ResultSet rs = ps.executeQuery()) {
assertTrue(rs.next());

rs.getLong("createdTime");
assertTrue(rs.wasNull(), "createdTime should be NULL when config JSON is invalid");

rs.getTimestamp("lastUpdateTime");
assertTrue(rs.wasNull(), "lastUpdateTime should be NULL when config JSON is invalid");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ public DatastoreConfig convert(final Config config) {
connectionConfig.applicationName(),
connectionConfig.connectionPoolConfig(),
connectionConfig.queryTimeout(),
connectionConfig.customParameters()) {
connectionConfig.customParameters(),
connectionConfig.collectionConfigs()) {
@Override
public String toConnectionString() {
return config.hasPath("url")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,21 @@ public static class ConnectionConfigBuilder {
String applicationName = DEFAULT_APP_NAME;
String replicaSet;
Map<String, String> customParameters = new HashMap<>();
Map<String, org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig>
collectionConfigs = new HashMap<>();

public ConnectionConfigBuilder customParameter(String key, String value) {
this.customParameters.put(key, value);
return this;
}

public ConnectionConfigBuilder collectionConfig(
String collectionName,
org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig config) {
this.collectionConfigs.put(collectionName, config);
return this;
}

ConnectionPoolConfig connectionPoolConfig;
AggregatePipelineMode aggregationPipelineMode = AggregatePipelineMode.DEFAULT_ALWAYS;
DataFreshness dataFreshness = DataFreshness.SYSTEM_DEFAULT;
Expand Down Expand Up @@ -127,7 +136,8 @@ public ConnectionConfig build() {
applicationName,
connectionPoolConfig,
queryTimeout,
customParameters);
customParameters,
collectionConfigs);
}

throw new IllegalArgumentException("Unsupported database type: " + type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
import org.hypertrace.core.documentstore.model.config.ConnectionPoolConfig.ConnectionPoolConfigBuilder;
import org.hypertrace.core.documentstore.model.config.DatastoreConfig.DatastoreConfigBuilder;
import org.hypertrace.core.documentstore.model.config.Endpoint.EndpointBuilder;
import org.hypertrace.core.documentstore.model.config.postgres.TimestampFieldsConfig;
import org.hypertrace.core.documentstore.model.options.DataFreshness;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Value
public class TypesafeConfigDatastoreConfigExtractor {

private static final Logger LOGGER =
LoggerFactory.getLogger(TypesafeConfigDatastoreConfigExtractor.class);
private static final String DEFAULT_HOST_KEY = "host";
Expand All @@ -35,6 +37,10 @@ public class TypesafeConfigDatastoreConfigExtractor {
private static final String DEFAULT_QUERY_TIMEOUT_KEY = "queryTimeout";
private static final String DEFAULT_CONNECTION_TIMEOUT_KEY = "connectionTimeout";
private static final String DEFAULT_CUSTOM_PARAMETERS_PREFIX = "customParams";
private static final String DEFAULT_COLLECTION_CONFIGS_PREFIX = "collectionConfigs";
private static final String TIMESTAMP_FIELDS_KEY = "timestampFields";
private static final String TIMESTAMP_CREATED_KEY = "created";
private static final String TIMESTAMP_LAST_UPDATED_KEY = "lastUpdated";

@NonNull Config config;
DatastoreConfigBuilder datastoreConfigBuilder;
Expand Down Expand Up @@ -82,7 +88,8 @@ private TypesafeConfigDatastoreConfigExtractor(
.dataFreshnessKey(DEFAULT_DATA_FRESHNESS_KEY)
.queryTimeoutKey(DEFAULT_QUERY_TIMEOUT_KEY)
.connectionTimeoutKey(DEFAULT_CONNECTION_TIMEOUT_KEY)
.customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX);
.customParametersKey(DEFAULT_CUSTOM_PARAMETERS_PREFIX)
.collectionConfigsKey(dataStoreType + "." + DEFAULT_COLLECTION_CONFIGS_PREFIX);
}

public static TypesafeConfigDatastoreConfigExtractor from(
Expand Down Expand Up @@ -253,6 +260,53 @@ public TypesafeConfigDatastoreConfigExtractor connectionTimeoutKey(@NonNull fina
return this;
}

public TypesafeConfigDatastoreConfigExtractor collectionConfigsKey(@NonNull final String key) {
if (!config.hasPath(key)) {
return this;
}

try {
Config collectionsConfig = config.getConfig(key);
collectionsConfig
.root()
.keySet()
.forEach(
collectionName -> {
Config collectionConfig = collectionsConfig.getConfig(collectionName);
org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig
.CollectionConfigBuilder
builder =
org.hypertrace.core.documentstore.model.config.postgres.CollectionConfig
.builder();

if (collectionConfig.hasPath(TIMESTAMP_FIELDS_KEY)) {
builder.timestampFields(
parseTimestampFieldsConfig(collectionConfig.getConfig(TIMESTAMP_FIELDS_KEY)));
}

connectionConfigBuilder.collectionConfig(collectionName, builder.build());
});
} catch (Exception e) {
LOGGER.warn(
"Collection configs key '{}' exists but could not be parsed: {}", key, e.getMessage());
}

return this;
}

private TimestampFieldsConfig parseTimestampFieldsConfig(Config timestampConfig) {
TimestampFieldsConfig.TimestampFieldsConfigBuilder tsBuilder = TimestampFieldsConfig.builder();

if (timestampConfig.hasPath(TIMESTAMP_CREATED_KEY)) {
tsBuilder.created(timestampConfig.getString(TIMESTAMP_CREATED_KEY));
}
if (timestampConfig.hasPath(TIMESTAMP_LAST_UPDATED_KEY)) {
tsBuilder.lastUpdated(timestampConfig.getString(TIMESTAMP_LAST_UPDATED_KEY));
}

return tsBuilder.build();
}

public DatastoreConfig extract() {
if (connectionConfigBuilder.endpoints().isEmpty()
&& !Endpoint.builder().build().equals(endpointBuilder.build())) {
Expand All @@ -265,6 +319,7 @@ public DatastoreConfig extract() {
.connectionPoolConfig(connectionPoolConfigBuilder.build())
.credentials(connectionCredentialsBuilder.build())
.customParameters(connectionConfigBuilder.customParameters())
.collectionConfigs(connectionConfigBuilder.collectionConfigs())
.build())
.build();
}
Expand Down
Loading
Loading