From ae3f32585c24aef495ad7e54d6470e31341dbae9 Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Fri, 20 Mar 2026 10:58:08 +0530 Subject: [PATCH 1/4] opentelemetry testcase retry addition --- .../metrics/otel/OpenTelemetryTestBase.java | 6 +- .../OpenTelemetryTraceReporterITCase.java | 5 +- .../RetryingTestContainerExtension.java | 132 ++++++++++++++++++ 3 files changed, 138 insertions(+), 5 deletions(-) create mode 100755 flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java index 461a1683573cf..f1ebcfb9e8263 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java @@ -20,7 +20,7 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.core.testutils.AllCallbackWrapper; -import org.apache.flink.core.testutils.TestContainerExtension; +import org.apache.flink.core.testutils.RetryingTestContainerExtension; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.function.ThrowingConsumer; @@ -60,10 +60,10 @@ public class OpenTelemetryTestBase { @RegisterExtension @Order(1) - private static final AllCallbackWrapper> + private static final AllCallbackWrapper> OTEL_EXTENSION = new AllCallbackWrapper<>( - new TestContainerExtension<>(() -> new OtelTestContainer(outputDir))); + new RetryingTestContainerExtension<>(() -> new OtelTestContainer(outputDir))); @BeforeEach public void setup() { diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java index 7d452a90d5a39..a61d1aac917c9 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -58,7 +59,7 @@ public void tearDownEach() { reporter.close(); } - @Test + @RepeatedTest(300) public void testReportSpan() throws Exception { MetricConfig metricConfig = createMetricConfig(); @@ -130,7 +131,7 @@ public void testReportSpan() throws Exception { }); } - @Test + @RepeatedTest(300) public void testReportNestedSpan() throws Exception { String scope = this.getClass().getCanonicalName(); diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java new file mode 100755 index 0000000000000..8a3385216e965 --- /dev/null +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.testutils; + +import com.github.dockerjava.api.command.PullImageResultCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.DockerClientFactory; +import org.testcontainers.containers.GenericContainer; + +import javax.annotation.Nullable; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * A {@link CustomExtension} that manages a {@link GenericContainer} with retry logic, including + * re-pulling the Docker image on failure. This handles transient Docker image pull/build failures + * that can occur in CI environments. + * + * @param The {@link GenericContainer} that shall be managed. + */ +public class RetryingTestContainerExtension> + implements CustomExtension { + + private static final Logger LOG = LoggerFactory.getLogger(RetryingTestContainerExtension.class); + + private static final int DEFAULT_MAX_RETRIES = 3; + private static final long DEFAULT_RETRY_DELAY_MS = 2000; + private static final long IMAGE_PULL_TIMEOUT_MINUTES = 2; + + @Nullable private T testContainer; + + private final Supplier testContainerCreator; + private final int maxRetries; + private final long retryDelayMs; + + public RetryingTestContainerExtension(Supplier testContainerCreator) { + this(testContainerCreator, DEFAULT_MAX_RETRIES, DEFAULT_RETRY_DELAY_MS); + } + + public RetryingTestContainerExtension( + Supplier testContainerCreator, int maxRetries, long retryDelayMs) { + this.testContainerCreator = testContainerCreator; + this.maxRetries = maxRetries; + this.retryDelayMs = retryDelayMs; + } + + public T getTestContainer() { + assert testContainer != null; + return testContainer; + } + + private void terminateTestContainer() { + if (testContainer != null) { + testContainer.stop(); + testContainer = null; + } + } + + private void instantiateTestContainer() { + assert testContainer == null; + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + testContainer = testContainerCreator.get(); + testContainer.start(); + return; + } catch (Exception e) { + LOG.warn( + "Container start attempt {}/{} failed: {}", + attempt, + maxRetries, + e.getMessage()); + testContainer = null; + if (attempt == maxRetries) { + throw e; + } + pullImage(); + try { + Thread.sleep(retryDelayMs); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted during container start retry", ie); + } + } + } + } + + private void pullImage() { + try { + T tempContainer = testContainerCreator.get(); + String imageName = tempContainer.getDockerImageName(); + LOG.info("Re-pulling image {} before retry...", imageName); + DockerClientFactory.instance() + .client() + .pullImageCmd(imageName) + .exec(new PullImageResultCallback()) + .awaitCompletion(IMAGE_PULL_TIMEOUT_MINUTES, TimeUnit.MINUTES); + LOG.info("Image {} pulled successfully", imageName); + } catch (Exception e) { + LOG.warn("Failed to pull image: {}", e.getMessage()); + } + } + + @Override + public void after(ExtensionContext context) throws Exception { + terminateTestContainer(); + } + + @Override + public void before(ExtensionContext context) throws Exception { + terminateTestContainer(); + instantiateTestContainer(); + } +} From a1ff6a2007ea562d404878fe2405437ffdefc470 Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Fri, 20 Mar 2026 12:48:03 +0530 Subject: [PATCH 2/4] opentelemetry testcase retry addition 1 --- .../flink/core/testutils/RetryingTestContainerExtension.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java index 8a3385216e965..0374aecb4bc97 100755 --- a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java +++ b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/RetryingTestContainerExtension.java @@ -41,7 +41,6 @@ public class RetryingTestContainerExtension> implements CustomExtension { private static final Logger LOG = LoggerFactory.getLogger(RetryingTestContainerExtension.class); - private static final int DEFAULT_MAX_RETRIES = 3; private static final long DEFAULT_RETRY_DELAY_MS = 2000; private static final long IMAGE_PULL_TIMEOUT_MINUTES = 2; From 1a2e9764e07f184313b7de047ce5f88aea9e3c2e Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Fri, 20 Mar 2026 13:15:30 +0530 Subject: [PATCH 3/4] opentelemetry testcase retry addition 2 --- .../org/apache/flink/metrics/otel/OpenTelemetryTestBase.java | 3 ++- .../flink/traces/otel/OpenTelemetryTraceReporterITCase.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java index f1ebcfb9e8263..54d03f32e01f8 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/metrics/otel/OpenTelemetryTestBase.java @@ -63,7 +63,8 @@ public class OpenTelemetryTestBase { private static final AllCallbackWrapper> OTEL_EXTENSION = new AllCallbackWrapper<>( - new RetryingTestContainerExtension<>(() -> new OtelTestContainer(outputDir))); + new RetryingTestContainerExtension<>( + () -> new OtelTestContainer(outputDir))); @BeforeEach public void setup() { diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java index a61d1aac917c9..8e6eae2e060e4 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; From a5017464b132832d62516f46c71ba28a3d312fdb Mon Sep 17 00:00:00 2001 From: Taranpreet Kaur Date: Mon, 23 Mar 2026 11:28:44 +0530 Subject: [PATCH 4/4] opentelemetry testcase retry addition 3 --- .../flink/traces/otel/OpenTelemetryTraceReporterITCase.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java index 8e6eae2e060e4..7d452a90d5a39 100644 --- a/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java +++ b/flink-metrics/flink-metrics-otel/src/test/java/org/apache/flink/traces/otel/OpenTelemetryTraceReporterITCase.java @@ -29,7 +29,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import java.time.Duration; @@ -58,7 +58,7 @@ public void tearDownEach() { reporter.close(); } - @RepeatedTest(300) + @Test public void testReportSpan() throws Exception { MetricConfig metricConfig = createMetricConfig(); @@ -130,7 +130,7 @@ public void testReportSpan() throws Exception { }); } - @RepeatedTest(300) + @Test public void testReportNestedSpan() throws Exception { String scope = this.getClass().getCanonicalName();