GEP-18: Integrated Concurrency and Parallel Processing#2468
GEP-18: Integrated Concurrency and Parallel Processing#2468paulk-asert wants to merge 1 commit intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Integrates a new built-in concurrency + parallel processing API into Groovy core, replaces legacy GPars-based examples/tests, and introduces a standalone groovy-concurrent-java artifact for Java consumers.
Changes:
- Removes GPars optional feature/dependencies and updates specs/tests to use new
groovy.concurrentAPIs. - Adds parallel collections +
@Parallelloop support via a global AST transformation and DGM-like extension methods. - Introduces concurrency primitives (pools, structured scopes, actors/agents, channels, dataflow) plus new documentation and test coverage.
Reviewed changes
Copilot reviewed 58 out of 59 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| versions.properties | Removes GPars version reference. |
| subprojects/groovy-concurrent-java/build.gradle | Adds packaging for a Java-only concurrent API jar extracted from core. |
| subprojects/groovy-binary/src/spec/doc/index.adoc | Adds new concurrency/parallel documentation sections to the spec index. |
| subprojects/groovy-binary/build.gradle | Excludes groovy-concurrent-java from doc classpath/dist to avoid duplicates. |
| src/test/groovy/org/codehaus/groovy/classgen/asm/sc/bugs/Groovy9892.groovy | Migrates test from GPars Dataflow.task to new async. |
| src/test/groovy/groovy/concurrent/PoolTest.groovy | Adds tests for new Pool API. |
| src/test/groovy/groovy/concurrent/ParallelScopeTest.groovy | Adds tests for ParallelScope pool scoping. |
| src/test/groovy/groovy/concurrent/ParallelCollectionTest.groovy | Adds tests for new parallel collection extension methods. |
| src/test/groovy/groovy/concurrent/ParallelAnnotationTest.groovy | Adds tests for @Parallel loop rewriting. |
| src/test/groovy/groovy/concurrent/DataflowsTest.groovy | Adds tests for Dataflows dynamic dataflow vars. |
| src/test/groovy/groovy/concurrent/DataflowVariableTest.groovy | Adds tests for DataflowVariable behavior and composition. |
| src/test/groovy/groovy/concurrent/ConcurrentConfigTest.groovy | Adds tests for global concurrency configuration defaults/overrides. |
| src/test/groovy/groovy/concurrent/ChannelSelectTest.groovy | Adds tests for ChannelSelect. |
| src/test/groovy/groovy/concurrent/ChannelCompositionTest.groovy | Adds tests for AsyncChannel composition operations (filter/map/merge/split/tap). |
| src/test/groovy/groovy/concurrent/BroadcastChannelTest.groovy | Adds tests for broadcast channels and subscription semantics. |
| src/test/groovy/groovy/concurrent/AsyncScopeTimeoutTest.groovy | Adds timeout/cancellation tests for structured scopes. |
| src/test/groovy/groovy/concurrent/AsyncScopeNestingTest.groovy | Adds tests for parent/child scope relationships and cancellation propagation. |
| src/test/groovy/groovy/concurrent/AgentTest.groovy | Adds tests for agent update serialization and async reads. |
| src/test/groovy/groovy/concurrent/ActorTest.groovy | Adds tests for reactor/stateful actors, lifecycle, and stress sending. |
| src/test/groovy/groovy/concurrent/ActiveObjectTest.groovy | Adds tests for new @ActiveObject/@ActiveMethod AST transform. |
| src/test/groovy/groovy/AsyncAwaitTest.groovy | Switches async wrapping tests from AsyncSupport to AsyncClosureUtils. |
| src/spec/test/DesignPatternsTest.groovy | Updates design pattern spec tests from GPars to new async/parallel APIs. |
| src/spec/doc/core-parallel-collections.adoc | Adds spec docs for parallel collections and @Parallel. |
| src/spec/doc/core-concurrent-java.adoc | Adds spec docs for the standalone Java concurrency module. |
| src/spec/doc/core-concurrent-dataflow.adoc | Adds spec docs for dataflow vars and channels. |
| src/spec/doc/core-concurrent-actors.adoc | Adds spec docs for actors, agents, and active objects. |
| src/spec/doc/_design-pattern-monoid.adoc | Replaces GPars monoid examples with new parallel collections. |
| src/spec/doc/_design-pattern-decorator.adoc | Replaces GPars decorator example with new async approach. |
| src/resources/META-INF/services/org.codehaus.groovy.transform.ASTTransformation | Registers global transform for @Parallel. |
| src/main/java/org/codehaus/groovy/transform/ParallelASTTransformation.java | Rewrites @Parallel loops to eachParallel at CONVERSION phase. |
| src/main/java/org/codehaus/groovy/transform/ActiveObjectASTTransformation.java | Adds AST transform for @ActiveObject/@ActiveMethod. |
| src/main/java/org/codehaus/groovy/runtime/metaclass/MetaClassRegistryImpl.java | Adds runtime warning when both core and groovy-concurrent-java appear. |
| src/main/java/org/codehaus/groovy/runtime/ParallelCollectionExtensions.java | Adds parallel collection extension methods using isolated ForkJoinPools. |
| src/main/java/org/codehaus/groovy/runtime/DefaultGroovyMethods.java | Registers ParallelCollectionExtensions as DGM-like methods. |
| src/main/java/org/apache/groovy/runtime/async/GeneratorBridge.java | Makes generator completion APIs public. |
| src/main/java/org/apache/groovy/runtime/async/DefaultPool.java | Adds Pool implementation including virtual threads + scoped “current pool”. |
| src/main/java/org/apache/groovy/runtime/async/DefaultAsyncScope.java | Adds parent tracking, child-scope cancellation, and timeouts. |
| src/main/java/org/apache/groovy/runtime/async/DefaultActor.java | Adds default Actor implementation. |
| src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java | Refactors async runtime to prefer java.util.function types + adds defer API. |
| src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java | Adds closure-only async utilities previously in AsyncSupport. |
| src/main/java/groovy/transform/Parallel.java | Updates @Parallel semantics docs and removes explicit transform class link. |
| src/main/java/groovy/transform/ActiveObject.java | Adds @ActiveObject annotation. |
| src/main/java/groovy/transform/ActiveMethod.java | Adds @ActiveMethod annotation. |
| src/main/java/groovy/concurrent/Pool.java | Adds Pool API surface and docs. |
| src/main/java/groovy/concurrent/ParallelScope.java | Adds ParallelScope helpers combining Pool + AsyncScope. |
| src/main/java/groovy/concurrent/DataflowVariable.java | Adds DataflowVariable implementation. |
| src/main/java/groovy/concurrent/ConcurrentConfig.java | Adds global concurrency configuration. |
| src/main/java/groovy/concurrent/ChannelSelect.java | Adds channel selection primitive. |
| src/main/java/groovy/concurrent/BroadcastChannel.java | Adds broadcast channel implementation. |
| src/main/java/groovy/concurrent/Awaitable.java | Refactors to Supplier-based go and Function-based withScope. |
| src/main/java/groovy/concurrent/AsyncScope.java | Refactors scope API to Supplier/Function and adds timeout overloads. |
| src/main/java/groovy/concurrent/AsyncChannel.java | Adds channel composition operations (filter/map/merge/split/tap). |
| src/main/java/groovy/concurrent/Agent.java | Adds agent implementation for serialized state updates. |
| src/main/java/groovy/concurrent/Actor.java | Adds Actor interface + factories. |
| src/main/groovy/groovy/concurrent/Dataflows.groovy | Adds Groovy-only Dataflows dynamic convenience wrapper. |
| settings.gradle | Registers new groovy-concurrent-java subproject. |
| gradle/verification-metadata.xml | Removes verification metadata entries for GPars-related dependencies. |
| build.gradle | Removes GPars feature/deps and adjusts compiler classpath configuration. |
| build-logic/src/main/groovy/org.apache.groovy-distribution.gradle | Removes distribution capability requirement for GPars. |
Comments suppressed due to low confidence (1)
src/test/groovy/groovy/concurrent/ChannelCompositionTest.groovy:1
- This disables coverage for
AsyncChannel.split, and the note indicates a cross-test interference/cleanup problem. It would be better to fix the underlying leak (likely background tasks or channels not being closed/drained deterministically) and re-enable the test, since split is a key part of the new channel API.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| * Returns the global default pool. If none has been set, creates one | ||
| * based on current configuration: a virtual-thread pool if | ||
| * {@link #preferVirtualThreads()} is {@code true}, otherwise a | ||
| * fixed pool sized by {@link #getDefaultParallelism()}. | ||
| * | ||
| * @return the default pool, never {@code null} | ||
| */ | ||
| public static Pool getDefaultPool() { | ||
| Pool p = defaultPool; | ||
| if (p != null) return p; | ||
| return preferVirtualThreads() ? Pool.virtual() : Pool.fixed(getDefaultParallelism()); |
There was a problem hiding this comment.
getDefaultPool() claims to return a global default pool but currently creates and returns a new pool instance on every call unless defaultPool was explicitly set. This will leak threads/executors and can cause surprising behavior. Consider lazily initializing and caching defaultPool (e.g., double-checked locking or synchronized init) and document/reset its lifecycle clearly.
| * Returns the global default pool. If none has been set, creates one | |
| * based on current configuration: a virtual-thread pool if | |
| * {@link #preferVirtualThreads()} is {@code true}, otherwise a | |
| * fixed pool sized by {@link #getDefaultParallelism()}. | |
| * | |
| * @return the default pool, never {@code null} | |
| */ | |
| public static Pool getDefaultPool() { | |
| Pool p = defaultPool; | |
| if (p != null) return p; | |
| return preferVirtualThreads() ? Pool.virtual() : Pool.fixed(getDefaultParallelism()); | |
| * Returns the global default pool. If none has been set, lazily creates | |
| * and caches one based on current configuration: a virtual-thread pool if | |
| * {@link #preferVirtualThreads()} is {@code true}, otherwise a | |
| * fixed pool sized by {@link #getDefaultParallelism()}. | |
| * <p> | |
| * The created pool is reused for subsequent calls until replaced or reset | |
| * via {@link #setDefaultPool(Pool)}. | |
| * | |
| * @return the default pool, never {@code null} | |
| */ | |
| public static Pool getDefaultPool() { | |
| Pool p = defaultPool; | |
| if (p != null) return p; | |
| synchronized (ConcurrentConfig.class) { | |
| p = defaultPool; | |
| if (p == null) { | |
| p = preferVirtualThreads() ? Pool.virtual() : Pool.fixed(getDefaultParallelism()); | |
| defaultPool = p; | |
| } | |
| } | |
| return p; |
| public static <T> Agent<T> create(T initialValue, Pool pool) { | ||
| Objects.requireNonNull(pool, "pool must not be null"); | ||
| // Wrap pool in a single-thread executor to serialise updates | ||
| return new Agent<>(initialValue, | ||
| Executors.newSingleThreadExecutor(r -> { | ||
| Thread t = new Thread(() -> { | ||
| pool.execute(r); | ||
| }, "groovy-agent-dispatch"); | ||
| t.setDaemon(true); | ||
| return t; | ||
| })); | ||
| } |
There was a problem hiding this comment.
This ThreadFactory implementation breaks newSingleThreadExecutor: the executor's worker thread is expected to run the executor's internal queue loop, but here it immediately submits that loop to pool.execute(r) and then exits. That can lead to a dead executor, unbounded thread creation, or tasks never executing. If you want updates to run on the provided Pool while staying serialized, implement a serializing executor (e.g., a SerialExecutor over pool) rather than delegating the worker thread runnable to another executor.
| public void bind(T value) { | ||
| if (!future.complete(value)) { | ||
| throw new IllegalStateException( | ||
| "DataflowVariable is already bound to: " + future.join()); | ||
| } | ||
| } |
There was a problem hiding this comment.
If the variable was previously completed exceptionally, future.join() will throw CompletionException while building the error message, so bind() will throw the wrong exception type (and potentially hide the intended IllegalStateException). Consider using future.getNow(null) (guarded), future.isCompletedExceptionally(), or a safe string representation that can't throw.
| public Awaitable<Result> select() { | ||
| CompletableFuture<Result> winner = new CompletableFuture<>(); | ||
| for (int i = 0; i < channels.size(); i++) { | ||
| final int index = i; | ||
| AsyncChannel<?> ch = channels.get(i); | ||
| ch.receive().toCompletableFuture().whenComplete((value, error) -> { | ||
| if (error == null) { | ||
| winner.complete(new Result(index, value)); | ||
| } | ||
| // On error (e.g., channel closed), let other channels compete | ||
| }); | ||
| } | ||
| return GroovyPromise.of(winner); | ||
| } |
There was a problem hiding this comment.
This implementation starts a real receive() on every channel. When a non-winning channel later receives a value, it will still consume that value but the result is discarded (because winner.complete(...) no-ops after the first completion). That can permanently lose messages and break subsequent selects/receives. A correct select needs non-consuming readiness registration, or a cancellable/transactional receive where losing registrations don't dequeue values. Consider adding/selecting via an AsyncChannel primitive that supports selection semantics (e.g., a selectRegister API) rather than racing receive() calls.
| // Replace the for loop: iterate over [null] once, | ||
| // executing the eachParallel call | ||
| ListExpression singleRun = new ListExpression(); | ||
| singleRun.addExpression(ConstantExpression.NULL); | ||
| forLoop.setCollectionExpression(singleRun); | ||
| forLoop.setLoopBlock(block(callStatement)); | ||
| } |
There was a problem hiding this comment.
Rewriting the annotated loop into a single-iteration for (x in [null]) { collection.eachParallel { ... } } changes observable semantics of for in Groovy (e.g., the loop variable may end up as null after the loop rather than the last iterated value; statement labeling and debugging/line mapping can also become confusing). Prefer replacing the parent statement with a direct expression/statement (collection.eachParallel { ... }) instead of mutating the loop to 'run once'. If replacing in-place is hard with ClassCodeVisitorSupport, consider a CompilationCustomizer that rewrites the enclosing BlockStatement statement list.
| * Called internally when the generator throws an exception. | ||
| */ | ||
| void completeExceptionally(Throwable error) { | ||
| public void completeExceptionally(Throwable error) { |
There was a problem hiding this comment.
complete(), completeExceptionally(...), and GeneratorClosedException were previously package-private/internal hooks (per the surrounding comments) and are now public API surface. If external callers aren't meant to control generator lifecycle, consider keeping these package-private and exposing only what’s necessary. Expanding public API makes future refactors harder and increases compatibility obligations.
| * Thrown when a generator tries to yield after the consumer has closed the bridge. | ||
| */ | ||
| static final class GeneratorClosedException extends RuntimeException { | ||
| public static final class GeneratorClosedException extends RuntimeException { |
There was a problem hiding this comment.
complete(), completeExceptionally(...), and GeneratorClosedException were previously package-private/internal hooks (per the surrounding comments) and are now public API surface. If external callers aren't meant to control generator lifecycle, consider keeping these package-private and exposing only what’s necessary. Expanding public API makes future refactors harder and increases compatibility obligations.
| */ | ||
| package groovy.concurrent; | ||
|
|
||
| import org.apache.groovy.runtime.async.AsyncSupport; |
There was a problem hiding this comment.
This import is unused in the current file and may trigger static analysis failures in builds with strict checks. Please remove it.
| import org.apache.groovy.runtime.async.AsyncSupport; |
| */ | ||
| package groovy.concurrent; | ||
|
|
||
| import org.apache.groovy.runtime.async.AsyncSupport; |
There was a problem hiding this comment.
This import is unused in the current file and may trigger static analysis failures in builds with strict checks. Please remove it.
| import org.apache.groovy.runtime.async.AsyncSupport; |
| /** | ||
| * Finds the first element matching the predicate (order not guaranteed | ||
| * due to parallelism). | ||
| */ | ||
| public static <T> T findParallel(Collection<T> self, Predicate<T> filter) { | ||
| return withCurrentFJP(fjp -> | ||
| fjp.submit(() -> self.parallelStream().filter(filter).findFirst().orElse(null)).join() | ||
| ); | ||
| } |
There was a problem hiding this comment.
The doc says order is not guaranteed, but findFirst() on an ordered stream does respect encounter order (and may incur extra coordination cost). If the intention is 'no order guarantee', this should use findAny() instead; otherwise, update the Javadoc to reflect that it returns the first element in encounter order for ordered collections.
No description provided.