Add TableProvisionJob CRD and JobTemplate support for CREATE TABLE#196
Add TableProvisionJob CRD and JobTemplate support for CREATE TABLE#196
Conversation
|
Wondering if we can re-use |
| if (!(obj instanceof Sink)) { | ||
| // Sets up a table provisioning job for the source. | ||
| // The job would be a no-op if the source is already provisioned. | ||
| list.add(new K8sJobDeployer(jobFromSource((Source) obj), context)); | ||
| } |
There was a problem hiding this comment.
Will this have unintended side effects of deploying flink jobs now if you issue a CREATE TABLE statement on something like kafka topic which already has a job template defined, but is really meant to only be used in a CREATE MATERIALIZED VIEW pipeline flow?
I don't think I'm fully understanding why we need this change, it seems like a general template type such that we don't need custom crds per DB like how we have kafkatopics.hoptimator.linkedin.com & acls.hoptimator.linkedin.com & others internally?
There was a problem hiding this comment.
I don't think there would be any unintended side effects here. When a Job is created via jobFromSource, the SQL-related lazy evals (sql, query, fieldMap) all return null. So for the Flink JobTemplate references like {{flinksql}}, the template engine resolves it to null and skips the entire template and hence existing Flink JobTemplates won't accidentally fire for CREATE TABLE statements.
And for the TableProvisionJob CRD, yes, it's intentionally kept generic. It follows the same pattern as SqlJob, which is a generic CRD reconciled by the CPO adapter. Similarly, TableProvisionJob is a generic "provision this table's backing infrastructure" CRD. Different operators can watch it and act based on the database like we can have multiple reconcilers for different databases for example, Iceberg Provisioner, or OpenHouse Provisioner. This way we won't need a separate custom CRD for each database type.
There was a problem hiding this comment.
Ahh I see that the flink-beam-sqljob-template are matching as a side effect here. Need to fix that 🤔
There was a problem hiding this comment.
Yea these jobtemplates are already a little flimsy for this reason.
But still I'm not convinced we need a new job template here, first this doesn't feel like a "job", there is no ongoing processing here, it's a one-off creation. One-off creations should happen in the hot path via the Deployer pattern if possible to allow us to fail fast back to users. If absolutely not possible due to reason outside our control we should either have a custom crd for that database type and a reconciler (or several listening if needed), or maybe we can consider having a new generic crd type and have many different reconcilers that look and differentiate which ones to act on by some "database" type field.
There was a problem hiding this comment.
Ahh I see that the
flink-beam-sqljob-templateare matching as a side effect here. Need to fix that 🤔
It was a silly issue, for the flink-beam-sqljob-template, when fieldMap.apply() returns null, Java concatenation produces "'null'" instead of null, so the template engine renders it instead of skipping it. fixed it.
There was a problem hiding this comment.
But still I'm not convinced we need a new job template here, first this doesn't feel like a "job", there is no ongoing processing here, it's a one-off creation. One-off creations should happen in the hot path via the Deployer pattern if possible to allow us to fail fast back to users. If absolutely not possible due to reason outside our control we should either have a custom crd for that database type and a reconciler (or several listening if needed), or maybe we can consider having a new generic crd type and have many different reconcilers that look and differentiate which ones to act on by some "database" type field.
I thought of using a Job CRD and the reconciler pattern for cases where table provisioning involves asynchronous external systems that can’t complete within the CREATE TABLE request path. For example, provisioning an Iceberg table might require registering a Kafka topic in nearline and coordinating with an external ETL engine that ingests data into the Iceberg table. The ETL engine may acknowledge the request quickly, but the actual provisioning can take longer and hence, I chose the reconciler pattern to watch the status until the table is fully onboarded for consumption.
There was a problem hiding this comment.
Hmm, would we always want a CREATE TABLE on an iceberg table to do that though?
I could totally see a world where we have CREATE TABLE, then maybe some trigger than ingests data from somewhere periodically or a spark job that that runs periodically, etc. and that would be specified using the "CREATE TRIGGER" syntax on the table or "CREATE MATERIALIZED VIEW my_iceberg_table$spark_backfill AS ... FROM ...", etc. depending on the user's intent.
My point being, a separate syntax makes sense to me beyond CREATE TABLE if a user wanted to specify "create this iceberg table and backfill it from some kafka topic" (CREATE MATERIALIZED VIEW I think should already cover this). I do understand that would likely need some new crd in this case that is different from the "sqljobs" template we have today but it wouldn't be driven by CREATE TABLE.
"CREATE TABLE AS ..." may also be relevant. This syntax is supported but we don't use it yet, to me this could semantically mean CREATE TABLE + CREATE TRIGGER (as a one-off)
| ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \ | ||
| /generate.sh -o "$(pwd)" -n "" -p "com.linkedin.hoptimator.k8s" \ | ||
| -u "$(pwd)/src/main/resources/tabletriggers.crd.yaml" \ | ||
| -u "$(pwd)/src/main/resources/tableprovisionjobs.crd.yaml" \ |
There was a problem hiding this comment.
Do we actually use this file? I think just the top level generate-models.sh is used. We can likely delete this file.
I introduced a new CRD to enable table provisioning across multiple databases via dedicated controllers. For Iceberg and OpenHouse, each can use its own controller with this TableProvisionJob CRD. If we use the batch/Job type, we’d have to bundle all the custom provisioning logic into a container image or something that runs in the Job’s pod. |
918340d to
9f3be71
Compare
Summary
Scenarios
Case 1
CREATE TABLE iceberg.namespace.tableWhen a table is created in Iceberg using CREATE TABLE, the system triggers both the Source Deployer and the Job Deployer for the iceberg table.
The Job Deployer then deploys a TableProvisionJob, which is responsible for ensuring the table actually exists in the infrastructure.
A custom table reconciler for the TableProvisionJob CRD may processes the job and performs the actual provisioning of the table.
Case 2
CREATE MATERIALIZED VIEW venice.sink AS SELECT * FROM iceberg.namespace.tableWhen a materialized view is created that reads from an Iceberg table and writes to a Venice sink, both the Source Deployer and the Job Deployer are triggered again for the iceberg table.
The Job Deployer creates a TableProvisionJob to ensure that the Iceberg source table exists.
The table reconciler checks whether the table is already provisioned:
Case 3
CREATE MATERIALIZED VIEW iceberg.namespace.table1 AS SELECT * FROM iceberg.namespace.table2When a materialized view writes to an Iceberg table (table1) and reads from another Iceberg table (table2), provisioning happens for both iceberg tables.
The source table (table2) is provisioned exactly like in Case 2.
The sink table (table1) triggers a separate provisioning job created during pipeline planning.
The Job Deployer (created while planning the pipeline) deploys a TableProvisionJob to verify that the sink table exists. If the sink table is missing, it is created; otherwise the operation becomes a no-op.