diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 068c0d1b56fd..fd953fcf5d4b 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -158,3 +158,10 @@ task wordCount(type:JavaExec) { systemProperties = System.getProperties() args = ["--output=/tmp/output.txt"] } + +task exec (type:JavaExec) { + main = System.getProperty("mainClass") + classpath = sourceSets.main.runtimeClasspath + systemProperties System.getProperties() + args System.getProperty("exec.args", "").split() +} \ No newline at end of file diff --git a/examples/java/src/main/java/org/apache/beam/examples/RateLimitedPubsubReader.java b/examples/java/src/main/java/org/apache/beam/examples/RateLimitedPubsubReader.java new file mode 100644 index 000000000000..a5ac5960c0d0 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/RateLimitedPubsubReader.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterContext; +import org.apache.beam.sdk.io.components.ratelimiter.EnvoyRateLimiterFactory; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiter; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterContext; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterFactory; +import org.apache.beam.sdk.io.components.ratelimiter.RateLimiterOptions; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Reshuffle; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A simple example demonstrating how to read from a Pub/Sub topic, rate limit the stream using an + * Envoy Rate Limit Service, and simply log the raw records. + * + *

To run this example, you need a running Envoy Rate Limit Service and access to the GCP Pub/Sub + * topic. + */ +public class RateLimitedPubsubReader { + + public interface Options extends PipelineOptions { + @Description("Address of the Envoy Rate Limit Service (eg: localhost:8081)") + @Validation.Required + String getRateLimiterAddress(); + + void setRateLimiterAddress(String value); + + @Description("Domain for the Rate Limit Service (eg: mydomain)") + @Validation.Required + String getRateLimiterDomain(); + + void setRateLimiterDomain(String value); + + @Description( + "The Pub/Sub topic to read from (eg: projects/pubsub-public-data/topics/taxirides-realtime)") + @Validation.Required + String getTopic(); + + void setTopic(String value); + } + + static class RateLimitAndLogFn extends DoFn { + private final String rlsAddress; + private final String rlsDomain; + private transient @Nullable RateLimiter rateLimiter; + private static final Logger LOG = LoggerFactory.getLogger(RateLimitAndLogFn.class); + + public RateLimitAndLogFn(String rlsAddress, String rlsDomain) { + this.rlsAddress = rlsAddress; + this.rlsDomain = rlsDomain; + } + + @Setup + public void setup() { + // Create the RateLimiterOptions. + RateLimiterOptions options = RateLimiterOptions.builder().setAddress(rlsAddress).build(); + + // Static RateLimiter with pre-configured domain and descriptors + RateLimiterFactory factory = new EnvoyRateLimiterFactory(options); + RateLimiterContext context = + EnvoyRateLimiterContext.builder() + .setDomain(rlsDomain) + .addDescriptor("database", "users") // generic descriptors + .build(); + this.rateLimiter = factory.getLimiter(context); + } + + @Teardown + public void teardown() { + if (rateLimiter != null) { + try { + rateLimiter.close(); + } catch (Exception e) { + LOG.warn("Failed to close RateLimiter", e); + } + } + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + String element = c.element(); + try { + Preconditions.checkNotNull(rateLimiter).allow(1); + } catch (Exception e) { + throw new RuntimeException("Failed to acquire rate limit token", e); + } + + // Simulate external API call or simply log the read entry + Thread.sleep(100); + LOG.info("Received and rate-limited record: " + element); + c.output(element); + } + } + + public static void main(String[] args) { + Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + Pipeline p = Pipeline.create(options); + + p.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic())) + .apply("Reshuffle", Reshuffle.viaRandomKey()) + .apply( + "RateLimitAndLog", + ParDo.of( + new RateLimitAndLogFn( + options.getRateLimiterAddress(), options.getRateLimiterDomain()))); + + p.run().waitUntilFinish(); + } +} \ No newline at end of file diff --git a/examples/terraform/envoy-ratelimiter/terraform.tfvars b/examples/terraform/envoy-ratelimiter/terraform.tfvars index 69d8bc8860e2..76df3bd3b995 100644 --- a/examples/terraform/envoy-ratelimiter/terraform.tfvars +++ b/examples/terraform/envoy-ratelimiter/terraform.tfvars @@ -16,11 +16,11 @@ * limitations under the License. */ -project_id = "PROJECT_ID" -region = "REGION" +project_id = "apache-beam-testing" +region = "us-central1" -vpc_name = "VPC_NAME" -subnet_name = "SUBNET_NAME" +vpc_name = "tarun-private-vpc" +subnet_name = "private-subnet" # update the below config value to match your need # https://github.com/envoyproxy/ratelimit?tab=readme-ov-file#examples @@ -31,7 +31,7 @@ descriptors: value: users rate_limit: unit: second - requests_per_unit: 1 + requests_per_unit: 10 EOF # Optional Resource Limits