Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions deploy/config/hoptimator-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,3 @@ data:
flink.config: |
flink.app.name=hoptimator-flink-runner
flink.app.type=SQL

venice.config: |
router.url=http://localhost:7777
clusters=venice-cluster0
5 changes: 2 additions & 3 deletions deploy/docker/mysql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ INSERT INTO daily_metrics (metric_date, metric_name, metric_value) VALUES
(DATE_SUB(CURDATE(), INTERVAL 1 DAY), 'revenue', 4800.00),
(DATE_SUB(CURDATE(), INTERVAL 1 DAY), 'orders', 145.00);

-- Grant permissions to hoptimator user on both databases
GRANT ALL PRIVILEGES ON testdb.* TO 'hoptimator'@'%';
GRANT ALL PRIVILEGES ON analytics.* TO 'hoptimator'@'%';
-- Grant permissions to hoptimator user on all databases
GRANT ALL PRIVILEGES ON *.* TO 'hoptimator'@'%';
FLUSH PRIVILEGES;
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ commons-cli = "commons-cli:commons-cli:1.4"
commons-dbcp2 = "org.apache.commons:commons-dbcp2:2.11.0"
cron-utils = "com.cronutils:cron-utils:9.2.1"
findbugs = "com.google.code.findbugs:jsr305:3.0.0"
findbugs-annotations = "com.google.code.findbugs:annotations:3.0.0"
flink-clients = "org.apache.flink:flink-clients:1.18.1"
mysql-connector = "mysql:mysql-connector-java:8.0.33"
flink-connector-base = "org.apache.flink:flink-connector-base:1.18.1"
flink-connector-kafka = "org.apache.flink:flink-sql-connector-kafka:3.2.0-1.18"
flink-connector-mysql-cdc = "com.ververica:flink-sql-connector-mysql-cdc:2.3.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package com.linkedin.hoptimator.jdbc;

import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;
import org.apache.calcite.avatica.ConnectStringParser;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.commons.dbcp2.BasicDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import java.sql.Connection;
import java.util.Map;
import java.util.Properties;

/**
* Utility methods for Deployer implementations.
*
* <p>Provides common functionality for:
* <ul>
* <li>Parsing configuration options from maps</li>
* <li>Extracting connection properties from JDBC schemas</li>
* </ul>
*/
public final class DeployerUtils {
private static final Logger log = LoggerFactory.getLogger(DeployerUtils.class);

private DeployerUtils() {
// Utility class - prevent instantiation
}

/**
* Parses an integer option from a map of options.
*
* @param options the options map to parse from
* @param key the option key to look up
* @param defaultValue the default value to return if the option is not set or invalid
* @return the parsed integer value, or defaultValue if not set or invalid
*/
public static Integer parseIntOption(Map<String, String> options, String key, Integer defaultValue) {
String value = options.get(key);
if (value == null) {
return defaultValue;
}
try {
return Integer.parseInt(value);
} catch (NumberFormatException e) {
log.warn("Invalid integer value for option '{}': '{}'. Using default: {}", key, value, defaultValue);
return defaultValue;
}
}

/**
* Parses a long option from a map of options.
*
* @param options the options map to parse from
* @param key the option key to look up
* @param defaultValue the default value to return if the option is not set or invalid
* @return the parsed long value, or defaultValue if not set or invalid
*/
public static Long parseLongOption(Map<String, String> options, String key, Long defaultValue) {
String value = options.get(key);
if (value == null) {
return defaultValue;
}
try {
return Long.parseLong(value);
} catch (NumberFormatException e) {
log.warn("Invalid long value for option '{}': '{}'. Using default: {}", key, value, defaultValue);
return defaultValue;
}
}

/**
* Parses a boolean option from a map of options.
*
* @param options the options map to parse from
* @param key the option key to look up
* @param defaultValue the default value to return if the option is not set
* @return the parsed boolean value, or defaultValue if not set
*/
public static Boolean parseBooleanOption(Map<String, String> options, String key, Boolean defaultValue) {
String value = options.get(key);
if (value == null) {
return defaultValue;
}
return Boolean.parseBoolean(value);
}

/**
* Parses a double option from a map of options.
*
* @param options the options map to parse from
* @param key the option key to look up
* @param defaultValue the default value to return if the option is not set or invalid
* @return the parsed double value, or defaultValue if not set or invalid
*/
public static Double parseDoubleOption(Map<String, String> options, String key, Double defaultValue) {
String value = options.get(key);
if (value == null) {
return defaultValue;
}
try {
return Double.parseDouble(value);
} catch (NumberFormatException e) {
log.warn("Invalid double value for option '{}': '{}'. Using default: {}", key, value, defaultValue);
return defaultValue;
}
}

/**
* Extracts connection properties from a JDBC-backed schema.
*
* <p>This method:
* <ol>
* <li>Looks up the schema by name in the connection's root schema</li>
* <li>Unwraps it to get the HoptimatorJdbcSchema</li>
* <li>Extracts the JDBC URL from the BasicDataSource</li>
* <li>Parses the URL (after removing the connection prefix) into Properties</li>
* </ol>
*
* @param catalogName Optional catalog name to look up
* @param schemaName the name of the schema to look up
* @param connection the connection to search in
* @param connectionPrefix the JDBC connection prefix to strip (e.g., "jdbc:kafka://")
* @param logger optional logger for debug messages
* @return Properties extracted from the JDBC URL, or null if schema not found or not JDBC-backed
*/
public static Properties extractPropertiesFromJdbcSchema(@Nullable String catalogName, String schemaName,
Connection connection, String connectionPrefix, @Nullable Logger logger) {

if (schemaName == null) {
return null;
}

try {
if (!(connection instanceof HoptimatorConnection)) {
return null;
}

HoptimatorConnection hoptimatorConnection =
(HoptimatorConnection) connection;
SchemaPlus rootSchema = hoptimatorConnection.calciteConnection().getRootSchema();
if (catalogName != null) {
rootSchema = rootSchema.subSchemas().get(catalogName);
if (rootSchema == null) {
return null;
}
}
SchemaPlus subSchemaPlus = rootSchema.subSchemas().get(schemaName);

if (subSchemaPlus == null) {
return null;
}

HoptimatorJdbcSchema schema = subSchemaPlus.unwrap(HoptimatorJdbcSchema.class);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should add a properties field to HoptimatorJdbcSchema at some point.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea we could, doesn't really save us much besides maybe some weird DataSource URL unwrapping. Ideally we'd make HoptimatorJdbcSchema aware of which underlying Driver a table came from (e.g. KafkaDriver) then we'd be able to simplify a lot of this logic to just check table types and fetch properties associated with a table instead of reconstructing a lot of it like we need to do here in each Deployer.

if (schema == null) {
return null;
}

String jdbcUrl = ((BasicDataSource) schema.getDataSource()).getUrl();

Properties properties = new Properties();
properties.putAll(ConnectStringParser.parse(jdbcUrl.substring(connectionPrefix.length())));
return properties;
} catch (Exception e) {
if (logger != null) {
logger.debug("Could not extract properties from schema '{}': {}", schemaName, e.getMessage());
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.linkedin.hoptimator.jdbc.ddl.SqlPauseTrigger;
import com.linkedin.hoptimator.jdbc.ddl.SqlResumeTrigger;
import com.linkedin.hoptimator.util.DeploymentService;
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcCatalogSchema;
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema;
import com.linkedin.hoptimator.util.planner.HoptimatorJdbcTable;
import com.linkedin.hoptimator.util.planner.PipelineRel;
Expand Down Expand Up @@ -376,9 +377,19 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
throw new DdlException(create, e.getMessage(), e);
}

final Pair<CalciteSchema, String> pair = HoptimatorDdlUtils.schema(context, true, create.name);
boolean isNewSchema = false;
Pair<CalciteSchema, String> pair = HoptimatorDdlUtils.schema(context, true, create.name);
if (pair.left == null) {
throw new DdlException(create, "Schema for " + create.name + " not found.");
// If the schema is not found, it might be because it's a 3-level path (CATALOG.SCHEMA.TABLE)
if (create.name.names.size() > 2) {
pair = HoptimatorDdlUtils.catalog(context, true, create.name);
isNewSchema = true;
if (pair.left == null) {
throw new DdlException(create, "Catalog for " + create.name + " not found.");
}
} else {
throw new DdlException(create, "Schema for " + create.name + " not found.");
}
}

// TODO: Add support for populating new tables from a query as a one-time operation.
Expand All @@ -390,8 +401,17 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
}

final SchemaPlus schemaPlus = pair.left.plus();
final String tableName = pair.right;
if (schemaPlus.tables().get(tableName) != null) {
String database = null;
String tableName;
if (isNewSchema) {
int idx = pair.right.indexOf(".");
database = pair.right.substring(0, idx);
tableName = pair.right.substring(idx + 1);
} else {
tableName = pair.right;
}

if (!isNewSchema && schemaPlus.tables().get(tableName) != null) {
if (!create.ifNotExists && !create.getReplace()) {
// They did not specify IF NOT EXISTS, so give error.
throw new DdlException(create,
Expand All @@ -402,11 +422,12 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
Collection<Deployer> deployers = null;
Pair<SchemaPlus, Table> schemaSnapshot = null;
try {
String database;
if (pair.left.schema instanceof Database) {
database = ((Database) pair.left.schema).databaseName();
} else {
database = connection.getSchema();
if (database == null) {
if (pair.left.schema instanceof Database) {
database = ((Database) pair.left.schema).databaseName();
} else {
database = connection.getSchema();
}
}

final JavaTypeFactory typeFactory = context.getTypeFactory();
Expand Down Expand Up @@ -447,18 +468,37 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
return super.newColumnDefaultValue(table, iColumn, context);
}
};
// Table does not exist. Create it.
Table currentViewTable = schemaPlus.tables().get(tableName);
schemaSnapshot = Pair.of(schemaPlus, currentViewTable);
// Snapshot current state for rollback
if (!isNewSchema) {
Table currentTable = schemaPlus.tables().get(tableName);
schemaSnapshot = Pair.of(schemaPlus, currentTable);
}

// Table does not exist. Create it.
// Add a temporary table with the correct row type so deployers can resolve the schema
// TODO: This may cause problems if we reuse connections, only the next connection will load this as a HoptimatorJdbcTable.
Table tempTable = new TemporaryTable(rowType, ief, database);
schemaPlus.add(tableName, tempTable);
logger.info("Added table {} to schema {}", tableName, schemaPlus.getName());
if (isNewSchema) {
HoptimatorJdbcCatalogSchema catalogSchema = schemaPlus.unwrap(HoptimatorJdbcCatalogSchema.class);
if (catalogSchema == null) {
throw new DdlException(create, "Catalog for " + schemaPlus.getName() + " not found.");
}
SchemaPlus databaseSchema = schemaPlus.add(database, catalogSchema.createSchema(database));
logger.info("Added schema {} to catalog {}", database, schemaPlus.getName());

Table tempTable = new TemporaryTable(rowType, ief, database);
databaseSchema.add(tableName, tempTable);
logger.info("Added table {} to schema {}", tableName, databaseSchema.getName());
} else {
Table tempTable = new TemporaryTable(rowType, ief, database);
schemaPlus.add(tableName, tempTable);
logger.info("Added table {} to schema {}", tableName, schemaPlus.getName());
}

final List<String> schemaPath = pair.left.path(null);
List<String> tablePath = new ArrayList<>(schemaPath);
if (isNewSchema) {
tablePath.add(database);
}
tablePath.add(tableName);

Map<String, String> tableOptions = HoptimatorDdlUtils.options(create.options);
Expand Down Expand Up @@ -491,6 +531,9 @@ public void execute(SqlCreateTable create, CalcitePrepare.Context context) {
schemaPlus.add(tableName, schemaSnapshot.right);
logger.info("Restored schema for table {}", tableName);
}
} else {
pair.left.removeSubSchema(database);
logger.info("Removed schema {} from catalog", database);
}
throw new DdlException(create, e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ public static Pair<CalciteSchema, String> schema(CalcitePrepare.Context context,
return Pair.of(schema, name);
}

/** Returns the catalog in which to create an object;
* the left part is null if the catalog does not exist. */
public static Pair<CalciteSchema, String> catalog(CalcitePrepare.Context context, boolean mutable, SqlIdentifier id) {
if (id.names.size() < 3) {
throw new IllegalArgumentException("CATALOG.SCHEMA.TABLE identified expected but found: " + id);
}
final List<String> schemaTablePath = Util.last(id.names, 2);
final List<String> catalogPath = Util.skipLast(id.names, 2);
CalciteSchema schema = mutable ? context.getMutableRootSchema() : context.getRootSchema();
for (String p : catalogPath) {
schema = Objects.requireNonNull(schema).getSubSchema(p, true);
}
return Pair.of(schema, String.join(".", schemaTablePath));
}

// N.B. copy-pasted from Apache Calcite
/** Wraps a query to rename its columns. Used by CREATE VIEW and CREATE
* MATERIALIZED VIEW. */
Expand Down
Loading