Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,61 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import lombok.AllArgsConstructor;
import org.hypertrace.core.documentstore.expression.impl.DataType;
import org.hypertrace.core.documentstore.postgres.model.PostgresColumnMetadata;
import org.hypertrace.core.documentstore.postgres.query.v1.parser.filter.nonjson.field.PostgresDataType;
import org.hypertrace.core.documentstore.postgres.utils.PostgresUtils;

/**
* Fetches schema metadata directly from Postgres system catalogs. Hardcoded to query
* information_schema.columns.
*/
/** Fetches schema metadata directly from Postgres system catalogs (pg_catalog). */
@AllArgsConstructor
public class PostgresMetadataFetcher {

private final PostgresClient client;

private static final String DISCOVERY_SQL =
"SELECT column_name, udt_name, is_nullable "
+ "FROM information_schema.columns "
+ "WHERE table_schema = 'public' AND table_name = ?";

private static final String PRIMARY_KEY_SQL =
"SELECT a.attname as column_name "
+ "FROM pg_index i "
+ "JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) "
+ "WHERE i.indrelid = ?::regclass AND i.indisprimary";
"SELECT a.attname AS column_name, "
+ "t.typname AS udt_name, "
+ "NOT a.attnotnull AS is_nullable, "
+ "COALESCE(pk.is_pk, false) AS is_primary_key "
+ "FROM pg_attribute a "
+ "JOIN pg_class c ON a.attrelid = c.oid "
+ "JOIN pg_namespace n ON c.relnamespace = n.oid "
+ "JOIN pg_type t ON a.atttypid = t.oid "
+ "LEFT JOIN ( "
+ " SELECT a2.attname, true AS is_pk "
+ " FROM pg_index i "
+ " JOIN pg_attribute a2 ON a2.attrelid = i.indrelid AND a2.attnum = ANY(i.indkey) "
+ " WHERE i.indisprimary "
+ ") pk ON pk.attname = a.attname "
+ "WHERE c.relname = ? "
+ "AND n.nspname = 'public' "
+ "AND a.attnum > 0 "
+ "AND NOT a.attisdropped";

public Map<String, PostgresColumnMetadata> fetch(String tableName) {
Map<String, PostgresColumnMetadata> metadataMap = new HashMap<>();

try (Connection conn = client.getPooledConnection()) {
Set<String> primaryKeyColumns = fetchPrimaryKeyColumns(conn, tableName);

try (PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) {
ps.setString(1, tableName);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String columnName = rs.getString("column_name");
String udtName = rs.getString("udt_name");
boolean isNullable = "YES".equalsIgnoreCase(rs.getString("is_nullable"));
boolean isArray = udtName != null && udtName.startsWith("_");
String baseType = isArray ? udtName.substring(1) : udtName;
boolean isPrimaryKey = primaryKeyColumns.contains(columnName);
metadataMap.put(
columnName,
new PostgresColumnMetadata(
columnName,
mapToCanonicalType(baseType),
mapToPostgresType(baseType),
isNullable,
isArray,
isPrimaryKey));
}
try (Connection conn = client.getPooledConnection();
PreparedStatement ps = conn.prepareStatement(DISCOVERY_SQL)) {
ps.setString(1, tableName);
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String columnName = rs.getString("column_name");
String udtName = rs.getString("udt_name");
boolean isNullable = rs.getBoolean("is_nullable");
boolean isArray = udtName != null && udtName.startsWith("_");
String baseType = isArray ? udtName.substring(1) : udtName;
boolean isPrimaryKey = rs.getBoolean("is_primary_key");
metadataMap.put(
columnName,
new PostgresColumnMetadata(
columnName,
mapToCanonicalType(baseType),
mapToPostgresType(baseType),
isNullable,
isArray,
isPrimaryKey));
}
}
return metadataMap;
Expand All @@ -68,20 +68,6 @@ public Map<String, PostgresColumnMetadata> fetch(String tableName) {
}
}

private Set<String> fetchPrimaryKeyColumns(Connection conn, String tableName)
throws SQLException {
Set<String> pkColumns = new HashSet<>();
try (PreparedStatement ps = conn.prepareStatement(PRIMARY_KEY_SQL)) {
ps.setString(1, PostgresUtils.wrapFieldNamesWithDoubleQuotes(tableName));
try (ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
pkColumns.add(rs.getString("column_name"));
}
}
}
return pkColumns;
}

/** Maps Postgres udt_name to canonical DataType. */
private DataType mapToCanonicalType(String udtName) {
if (udtName == null) {
Expand Down
Loading
Loading