Skip to content

GEP-18: Integrated Concurrency and Parallel Processing#2468

Open
paulk-asert wants to merge 1 commit intoapache:masterfrom
paulk-asert:gparsSpike
Open

GEP-18: Integrated Concurrency and Parallel Processing#2468
paulk-asert wants to merge 1 commit intoapache:masterfrom
paulk-asert:gparsSpike

Conversation

@paulk-asert
Copy link
Copy Markdown
Contributor

No description provided.

@paulk-asert paulk-asert changed the title GEB-18: Integrated Concurrency and Parallel Processing GEP-18: Integrated Concurrency and Parallel Processing Apr 15, 2026
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Note

Copilot was unable to run its full agentic suite in this review.

Integrates a new built-in concurrency + parallel processing API into Groovy core, replaces legacy GPars-based examples/tests, and introduces a standalone groovy-concurrent-java artifact for Java consumers.

Changes:

  • Removes GPars optional feature/dependencies and updates specs/tests to use new groovy.concurrent APIs.
  • Adds parallel collections + @Parallel loop support via a global AST transformation and DGM-like extension methods.
  • Introduces concurrency primitives (pools, structured scopes, actors/agents, channels, dataflow) plus new documentation and test coverage.

Reviewed changes

Copilot reviewed 58 out of 59 changed files in this pull request and generated 12 comments.

Show a summary per file
File Description
versions.properties Removes GPars version reference.
subprojects/groovy-concurrent-java/build.gradle Adds packaging for a Java-only concurrent API jar extracted from core.
subprojects/groovy-binary/src/spec/doc/index.adoc Adds new concurrency/parallel documentation sections to the spec index.
subprojects/groovy-binary/build.gradle Excludes groovy-concurrent-java from doc classpath/dist to avoid duplicates.
src/test/groovy/org/codehaus/groovy/classgen/asm/sc/bugs/Groovy9892.groovy Migrates test from GPars Dataflow.task to new async.
src/test/groovy/groovy/concurrent/PoolTest.groovy Adds tests for new Pool API.
src/test/groovy/groovy/concurrent/ParallelScopeTest.groovy Adds tests for ParallelScope pool scoping.
src/test/groovy/groovy/concurrent/ParallelCollectionTest.groovy Adds tests for new parallel collection extension methods.
src/test/groovy/groovy/concurrent/ParallelAnnotationTest.groovy Adds tests for @Parallel loop rewriting.
src/test/groovy/groovy/concurrent/DataflowsTest.groovy Adds tests for Dataflows dynamic dataflow vars.
src/test/groovy/groovy/concurrent/DataflowVariableTest.groovy Adds tests for DataflowVariable behavior and composition.
src/test/groovy/groovy/concurrent/ConcurrentConfigTest.groovy Adds tests for global concurrency configuration defaults/overrides.
src/test/groovy/groovy/concurrent/ChannelSelectTest.groovy Adds tests for ChannelSelect.
src/test/groovy/groovy/concurrent/ChannelCompositionTest.groovy Adds tests for AsyncChannel composition operations (filter/map/merge/split/tap).
src/test/groovy/groovy/concurrent/BroadcastChannelTest.groovy Adds tests for broadcast channels and subscription semantics.
src/test/groovy/groovy/concurrent/AsyncScopeTimeoutTest.groovy Adds timeout/cancellation tests for structured scopes.
src/test/groovy/groovy/concurrent/AsyncScopeNestingTest.groovy Adds tests for parent/child scope relationships and cancellation propagation.
src/test/groovy/groovy/concurrent/AgentTest.groovy Adds tests for agent update serialization and async reads.
src/test/groovy/groovy/concurrent/ActorTest.groovy Adds tests for reactor/stateful actors, lifecycle, and stress sending.
src/test/groovy/groovy/concurrent/ActiveObjectTest.groovy Adds tests for new @ActiveObject/@ActiveMethod AST transform.
src/test/groovy/groovy/AsyncAwaitTest.groovy Switches async wrapping tests from AsyncSupport to AsyncClosureUtils.
src/spec/test/DesignPatternsTest.groovy Updates design pattern spec tests from GPars to new async/parallel APIs.
src/spec/doc/core-parallel-collections.adoc Adds spec docs for parallel collections and @Parallel.
src/spec/doc/core-concurrent-java.adoc Adds spec docs for the standalone Java concurrency module.
src/spec/doc/core-concurrent-dataflow.adoc Adds spec docs for dataflow vars and channels.
src/spec/doc/core-concurrent-actors.adoc Adds spec docs for actors, agents, and active objects.
src/spec/doc/_design-pattern-monoid.adoc Replaces GPars monoid examples with new parallel collections.
src/spec/doc/_design-pattern-decorator.adoc Replaces GPars decorator example with new async approach.
src/resources/META-INF/services/org.codehaus.groovy.transform.ASTTransformation Registers global transform for @Parallel.
src/main/java/org/codehaus/groovy/transform/ParallelASTTransformation.java Rewrites @Parallel loops to eachParallel at CONVERSION phase.
src/main/java/org/codehaus/groovy/transform/ActiveObjectASTTransformation.java Adds AST transform for @ActiveObject/@ActiveMethod.
src/main/java/org/codehaus/groovy/runtime/metaclass/MetaClassRegistryImpl.java Adds runtime warning when both core and groovy-concurrent-java appear.
src/main/java/org/codehaus/groovy/runtime/ParallelCollectionExtensions.java Adds parallel collection extension methods using isolated ForkJoinPools.
src/main/java/org/codehaus/groovy/runtime/DefaultGroovyMethods.java Registers ParallelCollectionExtensions as DGM-like methods.
src/main/java/org/apache/groovy/runtime/async/GeneratorBridge.java Makes generator completion APIs public.
src/main/java/org/apache/groovy/runtime/async/DefaultPool.java Adds Pool implementation including virtual threads + scoped “current pool”.
src/main/java/org/apache/groovy/runtime/async/DefaultAsyncScope.java Adds parent tracking, child-scope cancellation, and timeouts.
src/main/java/org/apache/groovy/runtime/async/DefaultActor.java Adds default Actor implementation.
src/main/java/org/apache/groovy/runtime/async/AsyncSupport.java Refactors async runtime to prefer java.util.function types + adds defer API.
src/main/java/org/apache/groovy/runtime/async/AsyncClosureUtils.java Adds closure-only async utilities previously in AsyncSupport.
src/main/java/groovy/transform/Parallel.java Updates @Parallel semantics docs and removes explicit transform class link.
src/main/java/groovy/transform/ActiveObject.java Adds @ActiveObject annotation.
src/main/java/groovy/transform/ActiveMethod.java Adds @ActiveMethod annotation.
src/main/java/groovy/concurrent/Pool.java Adds Pool API surface and docs.
src/main/java/groovy/concurrent/ParallelScope.java Adds ParallelScope helpers combining Pool + AsyncScope.
src/main/java/groovy/concurrent/DataflowVariable.java Adds DataflowVariable implementation.
src/main/java/groovy/concurrent/ConcurrentConfig.java Adds global concurrency configuration.
src/main/java/groovy/concurrent/ChannelSelect.java Adds channel selection primitive.
src/main/java/groovy/concurrent/BroadcastChannel.java Adds broadcast channel implementation.
src/main/java/groovy/concurrent/Awaitable.java Refactors to Supplier-based go and Function-based withScope.
src/main/java/groovy/concurrent/AsyncScope.java Refactors scope API to Supplier/Function and adds timeout overloads.
src/main/java/groovy/concurrent/AsyncChannel.java Adds channel composition operations (filter/map/merge/split/tap).
src/main/java/groovy/concurrent/Agent.java Adds agent implementation for serialized state updates.
src/main/java/groovy/concurrent/Actor.java Adds Actor interface + factories.
src/main/groovy/groovy/concurrent/Dataflows.groovy Adds Groovy-only Dataflows dynamic convenience wrapper.
settings.gradle Registers new groovy-concurrent-java subproject.
gradle/verification-metadata.xml Removes verification metadata entries for GPars-related dependencies.
build.gradle Removes GPars feature/deps and adjusts compiler classpath configuration.
build-logic/src/main/groovy/org.apache.groovy-distribution.gradle Removes distribution capability requirement for GPars.
Comments suppressed due to low confidence (1)

src/test/groovy/groovy/concurrent/ChannelCompositionTest.groovy:1

  • This disables coverage for AsyncChannel.split, and the note indicates a cross-test interference/cleanup problem. It would be better to fix the underlying leak (likely background tasks or channels not being closed/drained deterministically) and re-enable the test, since split is a key part of the new channel API.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +95 to +105
* Returns the global default pool. If none has been set, creates one
* based on current configuration: a virtual-thread pool if
* {@link #preferVirtualThreads()} is {@code true}, otherwise a
* fixed pool sized by {@link #getDefaultParallelism()}.
*
* @return the default pool, never {@code null}
*/
public static Pool getDefaultPool() {
Pool p = defaultPool;
if (p != null) return p;
return preferVirtualThreads() ? Pool.virtual() : Pool.fixed(getDefaultParallelism());
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDefaultPool() claims to return a global default pool but currently creates and returns a new pool instance on every call unless defaultPool was explicitly set. This will leak threads/executors and can cause surprising behavior. Consider lazily initializing and caching defaultPool (e.g., double-checked locking or synchronized init) and document/reset its lifecycle clearly.

Suggested change
* Returns the global default pool. If none has been set, creates one
* based on current configuration: a virtual-thread pool if
* {@link #preferVirtualThreads()} is {@code true}, otherwise a
* fixed pool sized by {@link #getDefaultParallelism()}.
*
* @return the default pool, never {@code null}
*/
public static Pool getDefaultPool() {
Pool p = defaultPool;
if (p != null) return p;
return preferVirtualThreads() ? Pool.virtual() : Pool.fixed(getDefaultParallelism());
* Returns the global default pool. If none has been set, lazily creates
* and caches one based on current configuration: a virtual-thread pool if
* {@link #preferVirtualThreads()} is {@code true}, otherwise a
* fixed pool sized by {@link #getDefaultParallelism()}.
* <p>
* The created pool is reused for subsequent calls until replaced or reset
* via {@link #setDefaultPool(Pool)}.
*
* @return the default pool, never {@code null}
*/
public static Pool getDefaultPool() {
Pool p = defaultPool;
if (p != null) return p;
synchronized (ConcurrentConfig.class) {
p = defaultPool;
if (p == null) {
p = preferVirtualThreads() ? Pool.virtual() : Pool.fixed(getDefaultParallelism());
defaultPool = p;
}
}
return p;

Copilot uses AI. Check for mistakes.
Comment on lines +103 to +114
public static <T> Agent<T> create(T initialValue, Pool pool) {
Objects.requireNonNull(pool, "pool must not be null");
// Wrap pool in a single-thread executor to serialise updates
return new Agent<>(initialValue,
Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(() -> {
pool.execute(r);
}, "groovy-agent-dispatch");
t.setDaemon(true);
return t;
}));
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ThreadFactory implementation breaks newSingleThreadExecutor: the executor's worker thread is expected to run the executor's internal queue loop, but here it immediately submits that loop to pool.execute(r) and then exits. That can lead to a dead executor, unbounded thread creation, or tasks never executing. If you want updates to run on the provided Pool while staying serialized, implement a serializing executor (e.g., a SerialExecutor over pool) rather than delegating the worker thread runnable to another executor.

Copilot uses AI. Check for mistakes.
Comment on lines +81 to +86
public void bind(T value) {
if (!future.complete(value)) {
throw new IllegalStateException(
"DataflowVariable is already bound to: " + future.join());
}
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the variable was previously completed exceptionally, future.join() will throw CompletionException while building the error message, so bind() will throw the wrong exception type (and potentially hide the intended IllegalStateException). Consider using future.getNow(null) (guarded), future.isCompletedExceptionally(), or a safe string representation that can't throw.

Copilot uses AI. Check for mistakes.
Comment on lines +81 to +94
public Awaitable<Result> select() {
CompletableFuture<Result> winner = new CompletableFuture<>();
for (int i = 0; i < channels.size(); i++) {
final int index = i;
AsyncChannel<?> ch = channels.get(i);
ch.receive().toCompletableFuture().whenComplete((value, error) -> {
if (error == null) {
winner.complete(new Result(index, value));
}
// On error (e.g., channel closed), let other channels compete
});
}
return GroovyPromise.of(winner);
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This implementation starts a real receive() on every channel. When a non-winning channel later receives a value, it will still consume that value but the result is discarded (because winner.complete(...) no-ops after the first completion). That can permanently lose messages and break subsequent selects/receives. A correct select needs non-consuming readiness registration, or a cancellable/transactional receive where losing registrations don't dequeue values. Consider adding/selecting via an AsyncChannel primitive that supports selection semantics (e.g., a selectRegister API) rather than racing receive() calls.

Copilot uses AI. Check for mistakes.
Comment on lines +137 to +143
// Replace the for loop: iterate over [null] once,
// executing the eachParallel call
ListExpression singleRun = new ListExpression();
singleRun.addExpression(ConstantExpression.NULL);
forLoop.setCollectionExpression(singleRun);
forLoop.setLoopBlock(block(callStatement));
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rewriting the annotated loop into a single-iteration for (x in [null]) { collection.eachParallel { ... } } changes observable semantics of for in Groovy (e.g., the loop variable may end up as null after the loop rather than the last iterated value; statement labeling and debugging/line mapping can also become confusing). Prefer replacing the parent statement with a direct expression/statement (collection.eachParallel { ... }) instead of mutating the loop to 'run once'. If replacing in-place is hard with ClassCodeVisitorSupport, consider a CompilationCustomizer that rewrites the enclosing BlockStatement statement list.

Copilot uses AI. Check for mistakes.
* Called internally when the generator throws an exception.
*/
void completeExceptionally(Throwable error) {
public void completeExceptionally(Throwable error) {
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete(), completeExceptionally(...), and GeneratorClosedException were previously package-private/internal hooks (per the surrounding comments) and are now public API surface. If external callers aren't meant to control generator lifecycle, consider keeping these package-private and exposing only what’s necessary. Expanding public API makes future refactors harder and increases compatibility obligations.

Copilot uses AI. Check for mistakes.
* Thrown when a generator tries to yield after the consumer has closed the bridge.
*/
static final class GeneratorClosedException extends RuntimeException {
public static final class GeneratorClosedException extends RuntimeException {
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

complete(), completeExceptionally(...), and GeneratorClosedException were previously package-private/internal hooks (per the surrounding comments) and are now public API surface. If external callers aren't meant to control generator lifecycle, consider keeping these package-private and exposing only what’s necessary. Expanding public API makes future refactors harder and increases compatibility obligations.

Copilot uses AI. Check for mistakes.
*/
package groovy.concurrent;

import org.apache.groovy.runtime.async.AsyncSupport;
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is unused in the current file and may trigger static analysis failures in builds with strict checks. Please remove it.

Suggested change
import org.apache.groovy.runtime.async.AsyncSupport;

Copilot uses AI. Check for mistakes.
*/
package groovy.concurrent;

import org.apache.groovy.runtime.async.AsyncSupport;
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This import is unused in the current file and may trigger static analysis failures in builds with strict checks. Please remove it.

Suggested change
import org.apache.groovy.runtime.async.AsyncSupport;

Copilot uses AI. Check for mistakes.
Comment on lines +90 to +98
/**
* Finds the first element matching the predicate (order not guaranteed
* due to parallelism).
*/
public static <T> T findParallel(Collection<T> self, Predicate<T> filter) {
return withCurrentFJP(fjp ->
fjp.submit(() -> self.parallelStream().filter(filter).findFirst().orElse(null)).join()
);
}
Copy link

Copilot AI Apr 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The doc says order is not guaranteed, but findFirst() on an ordered stream does respect encounter order (and may incur extra coordination cost). If the intention is 'no order guarantee', this should use findAny() instead; otherwise, update the Javadoc to reflect that it returns the first element in encounter order for ordered collections.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants