Skip to content

Add TableProvisionJob CRD and JobTemplate support for CREATE TABLE#196

Open
srnand wants to merge 4 commits intomainfrom
sthakkar/mar13_26/addTableProvisionerJob
Open

Add TableProvisionJob CRD and JobTemplate support for CREATE TABLE#196
srnand wants to merge 4 commits intomainfrom
sthakkar/mar13_26/addTableProvisionerJob

Conversation

@srnand
Copy link
Collaborator

@srnand srnand commented Mar 14, 2026

Summary

  • Adds a generic TableProvisionJob CRD for provisioning tables via job templates.
  • Extends K8sDeployerProvider to apply JobTemplate CRDs during the CREATE TABLE flow, enabling table provisioning.

Scenarios

Case 1

CREATE TABLE iceberg.namespace.table

  • When a table is created in Iceberg using CREATE TABLE, the system triggers both the Source Deployer and the Job Deployer for the iceberg table.

  • The Job Deployer then deploys a TableProvisionJob, which is responsible for ensuring the table actually exists in the infrastructure.

  • A custom table reconciler for the TableProvisionJob CRD may processes the job and performs the actual provisioning of the table.

    image image

Case 2

CREATE MATERIALIZED VIEW venice.sink AS SELECT * FROM iceberg.namespace.table

  • When a materialized view is created that reads from an Iceberg table and writes to a Venice sink, both the Source Deployer and the Job Deployer are triggered again for the iceberg table.

  • The Job Deployer creates a TableProvisionJob to ensure that the Iceberg source table exists.

  • The table reconciler checks whether the table is already provisioned:

    • If it exists → no action is taken (no-op)
    • If it does not exist → the table is provisioned.
    image image

Case 3

CREATE MATERIALIZED VIEW iceberg.namespace.table1 AS SELECT * FROM iceberg.namespace.table2

  • When a materialized view writes to an Iceberg table (table1) and reads from another Iceberg table (table2), provisioning happens for both iceberg tables.

  • The source table (table2) is provisioned exactly like in Case 2.

  • The sink table (table1) triggers a separate provisioning job created during pipeline planning.

  • The Job Deployer (created while planning the pipeline) deploys a TableProvisionJob to verify that the sink table exists. If the sink table is missing, it is created; otherwise the operation becomes a no-op.

    image image

@ryannedolan
Copy link
Collaborator

Wondering if we can re-use batch/Job like we do for TableTriggers? Why do we need a whole new CRD? In theory, it's easier to re-use Job than to create a whole new controller for each job type.

Comment on lines +38 to +42
if (!(obj instanceof Sink)) {
// Sets up a table provisioning job for the source.
// The job would be a no-op if the source is already provisioned.
list.add(new K8sJobDeployer(jobFromSource((Source) obj), context));
}
Copy link
Collaborator

@jogrogan jogrogan Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this have unintended side effects of deploying flink jobs now if you issue a CREATE TABLE statement on something like kafka topic which already has a job template defined, but is really meant to only be used in a CREATE MATERIALIZED VIEW pipeline flow?

I don't think I'm fully understanding why we need this change, it seems like a general template type such that we don't need custom crds per DB like how we have kafkatopics.hoptimator.linkedin.com & acls.hoptimator.linkedin.com & others internally?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there would be any unintended side effects here. When a Job is created via jobFromSource, the SQL-related lazy evals (sql, query, fieldMap) all return null. So for the Flink JobTemplate references like {{flinksql}}, the template engine resolves it to null and skips the entire template and hence existing Flink JobTemplates won't accidentally fire for CREATE TABLE statements.

And for the TableProvisionJob CRD, yes, it's intentionally kept generic. It follows the same pattern as SqlJob, which is a generic CRD reconciled by the CPO adapter. Similarly, TableProvisionJob is a generic "provision this table's backing infrastructure" CRD. Different operators can watch it and act based on the database like we can have multiple reconcilers for different databases for example, Iceberg Provisioner, or OpenHouse Provisioner. This way we won't need a separate custom CRD for each database type.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see that the flink-beam-sqljob-template are matching as a side effect here. Need to fix that 🤔

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea these jobtemplates are already a little flimsy for this reason.

But still I'm not convinced we need a new job template here, first this doesn't feel like a "job", there is no ongoing processing here, it's a one-off creation. One-off creations should happen in the hot path via the Deployer pattern if possible to allow us to fail fast back to users. If absolutely not possible due to reason outside our control we should either have a custom crd for that database type and a reconciler (or several listening if needed), or maybe we can consider having a new generic crd type and have many different reconcilers that look and differentiate which ones to act on by some "database" type field.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I see that the flink-beam-sqljob-template are matching as a side effect here. Need to fix that 🤔

It was a silly issue, for the flink-beam-sqljob-template, when fieldMap.apply() returns null, Java concatenation produces "'null'" instead of null, so the template engine renders it instead of skipping it. fixed it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But still I'm not convinced we need a new job template here, first this doesn't feel like a "job", there is no ongoing processing here, it's a one-off creation. One-off creations should happen in the hot path via the Deployer pattern if possible to allow us to fail fast back to users. If absolutely not possible due to reason outside our control we should either have a custom crd for that database type and a reconciler (or several listening if needed), or maybe we can consider having a new generic crd type and have many different reconcilers that look and differentiate which ones to act on by some "database" type field.

I thought of using a Job CRD and the reconciler pattern for cases where table provisioning involves asynchronous external systems that can’t complete within the CREATE TABLE request path. For example, provisioning an Iceberg table might require registering a Kafka topic in nearline and coordinating with an external ETL engine that ingests data into the Iceberg table. The ETL engine may acknowledge the request quickly, but the actual provisioning can take longer and hence, I chose the reconciler pattern to watch the status until the table is fully onboarded for consumption.

Copy link
Collaborator

@jogrogan jogrogan Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, would we always want a CREATE TABLE on an iceberg table to do that though?

I could totally see a world where we have CREATE TABLE, then maybe some trigger than ingests data from somewhere periodically or a spark job that that runs periodically, etc. and that would be specified using the "CREATE TRIGGER" syntax on the table or "CREATE MATERIALIZED VIEW my_iceberg_table$spark_backfill AS ... FROM ...", etc. depending on the user's intent.

My point being, a separate syntax makes sense to me beyond CREATE TABLE if a user wanted to specify "create this iceberg table and backfill it from some kafka topic" (CREATE MATERIALIZED VIEW I think should already cover this). I do understand that would likely need some new crd in this case that is different from the "sqljobs" template we have today but it wouldn't be driven by CREATE TABLE.

"CREATE TABLE AS ..." may also be relevant. This syntax is supported but we don't use it yet, to me this could semantically mean CREATE TABLE + CREATE TRIGGER (as a one-off)

ghcr.io/kubernetes-client/java/crd-model-gen:v1.0.6 \
/generate.sh -o "$(pwd)" -n "" -p "com.linkedin.hoptimator.k8s" \
-u "$(pwd)/src/main/resources/tabletriggers.crd.yaml" \
-u "$(pwd)/src/main/resources/tableprovisionjobs.crd.yaml" \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually use this file? I think just the top level generate-models.sh is used. We can likely delete this file.

@srnand
Copy link
Collaborator Author

srnand commented Mar 17, 2026

Wondering if we can re-use batch/Job like we do for TableTriggers? Why do we need a whole new CRD? In theory, it's easier to re-use Job than to create a whole new controller for each job type.

I introduced a new CRD to enable table provisioning across multiple databases via dedicated controllers. For Iceberg and OpenHouse, each can use its own controller with this TableProvisionJob CRD. If we use the batch/Job type, we’d have to bundle all the custom provisioning logic into a container image or something that runs in the Job’s pod.

@srnand srnand force-pushed the sthakkar/mar13_26/addTableProvisionerJob branch from 918340d to 9f3be71 Compare March 17, 2026 20:33
@srnand srnand requested review from jogrogan and ryannedolan March 17, 2026 22:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants