-
Notifications
You must be signed in to change notification settings - Fork 14
CREATE TABLE support for existing drivers #195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
fc1e108
Support Quidem describe command
jogrogan 5c4eaac
Support CREATE TABLE for Kafka topics - Intentionally disabled in favor
jogrogan 63bca1c
Refactor Venice deployer, add pre-validation and tests
jogrogan fb87857
Support CREATE TABLE for MySQL databases and tables
jogrogan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
172 changes: 172 additions & 0 deletions
172
hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/DeployerUtils.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,172 @@ | ||
| package com.linkedin.hoptimator.jdbc; | ||
|
|
||
| import com.linkedin.hoptimator.util.planner.HoptimatorJdbcSchema; | ||
| import org.apache.calcite.avatica.ConnectStringParser; | ||
| import org.apache.calcite.schema.SchemaPlus; | ||
| import org.apache.commons.dbcp2.BasicDataSource; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
| import javax.annotation.Nullable; | ||
| import java.sql.Connection; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
|
|
||
| /** | ||
| * Utility methods for Deployer implementations. | ||
| * | ||
| * <p>Provides common functionality for: | ||
| * <ul> | ||
| * <li>Parsing configuration options from maps</li> | ||
| * <li>Extracting connection properties from JDBC schemas</li> | ||
| * </ul> | ||
| */ | ||
| public final class DeployerUtils { | ||
| private static final Logger log = LoggerFactory.getLogger(DeployerUtils.class); | ||
|
|
||
| private DeployerUtils() { | ||
| // Utility class - prevent instantiation | ||
| } | ||
|
|
||
| /** | ||
| * Parses an integer option from a map of options. | ||
| * | ||
| * @param options the options map to parse from | ||
| * @param key the option key to look up | ||
| * @param defaultValue the default value to return if the option is not set or invalid | ||
| * @return the parsed integer value, or defaultValue if not set or invalid | ||
| */ | ||
| public static Integer parseIntOption(Map<String, String> options, String key, Integer defaultValue) { | ||
| String value = options.get(key); | ||
| if (value == null) { | ||
| return defaultValue; | ||
| } | ||
| try { | ||
| return Integer.parseInt(value); | ||
| } catch (NumberFormatException e) { | ||
| log.warn("Invalid integer value for option '{}': '{}'. Using default: {}", key, value, defaultValue); | ||
| return defaultValue; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Parses a long option from a map of options. | ||
| * | ||
| * @param options the options map to parse from | ||
| * @param key the option key to look up | ||
| * @param defaultValue the default value to return if the option is not set or invalid | ||
| * @return the parsed long value, or defaultValue if not set or invalid | ||
| */ | ||
| public static Long parseLongOption(Map<String, String> options, String key, Long defaultValue) { | ||
| String value = options.get(key); | ||
| if (value == null) { | ||
| return defaultValue; | ||
| } | ||
| try { | ||
| return Long.parseLong(value); | ||
| } catch (NumberFormatException e) { | ||
| log.warn("Invalid long value for option '{}': '{}'. Using default: {}", key, value, defaultValue); | ||
| return defaultValue; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Parses a boolean option from a map of options. | ||
| * | ||
| * @param options the options map to parse from | ||
| * @param key the option key to look up | ||
| * @param defaultValue the default value to return if the option is not set | ||
| * @return the parsed boolean value, or defaultValue if not set | ||
| */ | ||
| public static Boolean parseBooleanOption(Map<String, String> options, String key, Boolean defaultValue) { | ||
| String value = options.get(key); | ||
| if (value == null) { | ||
| return defaultValue; | ||
| } | ||
| return Boolean.parseBoolean(value); | ||
| } | ||
|
|
||
| /** | ||
| * Parses a double option from a map of options. | ||
| * | ||
| * @param options the options map to parse from | ||
| * @param key the option key to look up | ||
| * @param defaultValue the default value to return if the option is not set or invalid | ||
| * @return the parsed double value, or defaultValue if not set or invalid | ||
| */ | ||
| public static Double parseDoubleOption(Map<String, String> options, String key, Double defaultValue) { | ||
| String value = options.get(key); | ||
| if (value == null) { | ||
| return defaultValue; | ||
| } | ||
| try { | ||
| return Double.parseDouble(value); | ||
| } catch (NumberFormatException e) { | ||
| log.warn("Invalid double value for option '{}': '{}'. Using default: {}", key, value, defaultValue); | ||
| return defaultValue; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Extracts connection properties from a JDBC-backed schema. | ||
| * | ||
| * <p>This method: | ||
| * <ol> | ||
| * <li>Looks up the schema by name in the connection's root schema</li> | ||
| * <li>Unwraps it to get the HoptimatorJdbcSchema</li> | ||
| * <li>Extracts the JDBC URL from the BasicDataSource</li> | ||
| * <li>Parses the URL (after removing the connection prefix) into Properties</li> | ||
| * </ol> | ||
| * | ||
| * @param catalogName Optional catalog name to look up | ||
| * @param schemaName the name of the schema to look up | ||
| * @param connection the connection to search in | ||
| * @param connectionPrefix the JDBC connection prefix to strip (e.g., "jdbc:kafka://") | ||
| * @param logger optional logger for debug messages | ||
| * @return Properties extracted from the JDBC URL, or null if schema not found or not JDBC-backed | ||
| */ | ||
| public static Properties extractPropertiesFromJdbcSchema(@Nullable String catalogName, String schemaName, | ||
| Connection connection, String connectionPrefix, @Nullable Logger logger) { | ||
|
|
||
| if (schemaName == null) { | ||
| return null; | ||
| } | ||
|
|
||
| try { | ||
| if (!(connection instanceof HoptimatorConnection)) { | ||
| return null; | ||
| } | ||
|
|
||
| HoptimatorConnection hoptimatorConnection = | ||
| (HoptimatorConnection) connection; | ||
| SchemaPlus rootSchema = hoptimatorConnection.calciteConnection().getRootSchema(); | ||
| if (catalogName != null) { | ||
| rootSchema = rootSchema.subSchemas().get(catalogName); | ||
| if (rootSchema == null) { | ||
| return null; | ||
| } | ||
| } | ||
| SchemaPlus subSchemaPlus = rootSchema.subSchemas().get(schemaName); | ||
|
|
||
| if (subSchemaPlus == null) { | ||
| return null; | ||
| } | ||
|
|
||
| HoptimatorJdbcSchema schema = subSchemaPlus.unwrap(HoptimatorJdbcSchema.class); | ||
| if (schema == null) { | ||
| return null; | ||
| } | ||
|
|
||
| String jdbcUrl = ((BasicDataSource) schema.getDataSource()).getUrl(); | ||
|
|
||
| Properties properties = new Properties(); | ||
| properties.putAll(ConnectStringParser.parse(jdbcUrl.substring(connectionPrefix.length()))); | ||
| return properties; | ||
| } catch (Exception e) { | ||
| if (logger != null) { | ||
| logger.debug("Could not extract properties from schema '{}': {}", schemaName, e.getMessage()); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we should add a properties field to HoptimatorJdbcSchema at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea we could, doesn't really save us much besides maybe some weird DataSource URL unwrapping. Ideally we'd make HoptimatorJdbcSchema aware of which underlying Driver a table came from (e.g. KafkaDriver) then we'd be able to simplify a lot of this logic to just check table types and fetch properties associated with a table instead of reconstructing a lot of it like we need to do here in each Deployer.