-
Notifications
You must be signed in to change notification settings - Fork 2.8k
[ZEPPELIN-6406] Remove deprecated Flink 1.15/1.16/1.17 shims and add Flink 1.19/1.20 support #5205
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
8a3fe07
cca6086
39e0ae2
3fe7a39
e798d02
72428e9
27d4d68
53c43de
bdbf8c7
69bfcdb
3482c42
9dc2b36
aa88f31
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |||||||||||||||||
| import org.apache.flink.streaming.experimental.SocketStreamIterator; | ||||||||||||||||||
| import org.apache.flink.table.api.Table; | ||||||||||||||||||
| import org.apache.flink.table.api.TableEnvironment; | ||||||||||||||||||
| import org.apache.flink.table.api.TableResult; | ||||||||||||||||||
| import org.apache.flink.table.api.TableSchema; | ||||||||||||||||||
| import org.apache.flink.table.sinks.RetractStreamTableSink; | ||||||||||||||||||
| import org.apache.flink.types.Row; | ||||||||||||||||||
|
|
@@ -65,6 +66,9 @@ public abstract class AbstractStreamSqlJob { | |||||||||||||||||
| protected InterpreterContext context; | ||||||||||||||||||
| protected TableSchema schema; | ||||||||||||||||||
| protected SocketStreamIterator<Tuple2<Boolean, Row>> iterator; | ||||||||||||||||||
| private volatile TableResult insertResult; | ||||||||||||||||||
| private volatile boolean cancelled = false; | ||||||||||||||||||
| private volatile boolean cancelledWithSavepoint = false; | ||||||||||||||||||
| protected Object resultLock = new Object(); | ||||||||||||||||||
| protected volatile boolean enableToRefresh = true; | ||||||||||||||||||
| protected int defaultParallelism; | ||||||||||||||||||
|
|
@@ -151,7 +155,38 @@ public String run(Table table, String tableName) throws IOException { | |||||||||||||||||
|
|
||||||||||||||||||
| LOGGER.info("Run job: {}, parallelism: {}", tableName, parallelism); | ||||||||||||||||||
| String jobName = context.getStringLocalProperty("jobName", tableName); | ||||||||||||||||||
| table.executeInsert(tableName).await(); | ||||||||||||||||||
| this.insertResult = table.executeInsert(tableName); | ||||||||||||||||||
| // Register the job with JobManager so that cancel (with savepoint) works properly | ||||||||||||||||||
| if (insertResult.getJobClient().isPresent()) { | ||||||||||||||||||
| jobManager.addJob(context, insertResult.getJobClient().get()); | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+158
to
+162
|
||||||||||||||||||
| // Use a CountDownLatch to wait for job completion while supporting cancellation | ||||||||||||||||||
| java.util.concurrent.CountDownLatch jobDone = new java.util.concurrent.CountDownLatch(1); | ||||||||||||||||||
| Thread jobThread = new Thread(() -> { | ||||||||||||||||||
| try { | ||||||||||||||||||
| insertResult.await(); | ||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||
| LOGGER.debug("Job await interrupted or failed", e); | ||||||||||||||||||
| } finally { | ||||||||||||||||||
| jobDone.countDown(); | ||||||||||||||||||
| } | ||||||||||||||||||
| }, "flink-job-await"); | ||||||||||||||||||
| jobThread.setDaemon(true); | ||||||||||||||||||
| jobThread.start(); | ||||||||||||||||||
|
|
||||||||||||||||||
| // Wait for either job completion or cancellation | ||||||||||||||||||
| while (!cancelled && !jobDone.await(1, java.util.concurrent.TimeUnit.SECONDS)) { | ||||||||||||||||||
| // keep waiting | ||||||||||||||||||
| } | ||||||||||||||||||
| if (cancelled) { | ||||||||||||||||||
| // Wait briefly for the job to finish (e.g. stopped with savepoint) | ||||||||||||||||||
| jobDone.await(10, java.util.concurrent.TimeUnit.SECONDS); | ||||||||||||||||||
| if (cancelledWithSavepoint) { | ||||||||||||||||||
| LOGGER.info("Stream sql job stopped with savepoint, jobName: {}", jobName); | ||||||||||||||||||
| return buildResult(); | ||||||||||||||||||
| } | ||||||||||||||||||
| throw new InterruptedException("Job was cancelled"); | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+181
to
+189
|
||||||||||||||||||
| LOGGER.info("Flink Job is finished, jobName: {}", jobName); | ||||||||||||||||||
| // wait for retrieve thread consume all data | ||||||||||||||||||
| LOGGER.info("Waiting for retrieve thread to be done"); | ||||||||||||||||||
|
|
@@ -162,7 +197,7 @@ public String run(Table table, String tableName) throws IOException { | |||||||||||||||||
| return finalResult; | ||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||
| LOGGER.error("Fail to run stream sql job", e); | ||||||||||||||||||
| throw new IOException("Fail to run stream sql job", e); | ||||||||||||||||||
| throw new IOException("Job was cancelled", e); | ||||||||||||||||||
|
||||||||||||||||||
| throw new IOException("Job was cancelled", e); | |
| if (cancelled) { | |
| throw new IOException("Job was cancelled", e); | |
| } else if (e instanceof IOException) { | |
| throw (IOException) e; | |
| } else { | |
| throw new IOException("Fail to run stream sql job", e); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These "version-specific notes" hard-code patch-level jar names (1.19.3) even though the page states Flink 1.19+ is supported. This will be incorrect for 1.20.x (and even other 1.19.x patch levels). Consider using a
${FLINK_VERSION}placeholder and/or wording that instructs users to substitute their installed Flink version.