diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 068c0d1b56fd..fd953fcf5d4b 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -158,3 +158,10 @@ task wordCount(type:JavaExec) { systemProperties = System.getProperties() args = ["--output=/tmp/output.txt"] } + +task exec (type:JavaExec) { + main = System.getProperty("mainClass") + classpath = sourceSets.main.runtimeClasspath + systemProperties System.getProperties() + args System.getProperty("exec.args", "").split() +} \ No newline at end of file diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimitedPubsubReader.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimitedPubsubReader.java new file mode 100644 index 000000000000..a5ac5960c0d0 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimitedPubsubReader.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext; +import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterContext; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple example demonstrating how to read from a Pub/Sub topic, rate limit the stream using an + * Envoy Rate Limit Service, and simply log the raw records. + * + *
To run this example, you need a running Envoy Rate Limit Service and access to the GCP Pub/Sub
+ * topic.
+ */
+public class RateLimitedPubsubReader {
+
+ public interface Options extends PipelineOptions {
+ @Description("Address of the Envoy Rate Limit Service (eg: localhost:8081)")
+ @Validation.Required
+ String getRateLimiterAddress();
+
+ void setRateLimiterAddress(String value);
+
+ @Description("Domain for the Rate Limit Service (eg: mydomain)")
+ @Validation.Required
+ String getRateLimiterDomain();
+
+ void setRateLimiterDomain(String value);
+
+ @Description(
+ "The Pub/Sub topic to read from (eg: projects/pubsub-public-data/topics/taxirides-realtime)")
+ @Validation.Required
+ String getTopic();
+
+ void setTopic(String value);
+ }
+
+ static class RateLimitAndLogFn extends DoFn