A DAG-based parallel computation framework for Java
Simplify multi-threaded task orchestration — declare dependencies, and the framework maximizes parallelism automatically.
- DAG-based parallel execution — Automatically maximizes parallelism based on declared dependencies using
CompletableFuture - Multiple command types —
SyncCommand(caller thread),AsyncCommand(I/O pool),CalcCommand(CPU pool),BatchCommand(fan-out with ALL/ANY/AT_LEAST_N strategies) - Cycle detection — DFS-based cycle detection before execution with clear error reporting
- Fluent builder API — Chain
.node().depend()calls; builder is reusable across runs - Auto-naming —
node(Class)auto-generates names (fetchOrder#0,fetchOrder#1, ...);node(Function)usesnode#0,node#1, ... - Lambda support —
node()acceptsFunction<C, R>orConsumer<C>for lightweight nodes - Extensible architecture —
JobBuilderis designed for extension; create custom builders for third-party integrations - Hystrix integration —
dag-flow-hystrixmodule wraps NetflixHystrixCommandinto the DAG - Resilience4j integration —
dag-flow-resilience4jmodule provides CircuitBreaker, Retry, Bulkhead, RateLimiter, TimeLimiter support - Spring Boot Starter —
dag-flow-spring-boot-starterauto-configures dag-flow engine;dependSpringBean()works out of the box - Virtual threads —
useVirtualThreads()enables Java 21 virtual threads for all non-sync nodes; traditional thread pools remain the default - Smart thread pools — I/O pool (2x–8x cores) and CPU pool (cores+1) with
CallerRunsPolicy - OpenTelemetry tracing — Built-in distributed tracing via
opentelemetry-api; creates root span per DAG run, child spans per node, and batch-item spans — no-op when SDK is absent - Error propagation — Node exceptions propagate as
ExecutionException; downstream nodes are cancelled
| Module | Description |
|---|---|
dag-flow-core |
Core framework: DAG builder, runner, command API, thread pools |
dag-flow-hystrix |
Netflix Hystrix extension |
dag-flow-resilience4j |
Resilience4j extension (CircuitBreaker, Retry, Bulkhead, RateLimiter, TimeLimiter) |
dag-flow-spring-boot-starter |
Spring Boot 4 auto-configuration starter |
Add to your build.gradle:
dependencies {
// Core (required)
implementation 'com.lesofn:dag-flow-core:1.0-SNAPSHOT'
// Spring Boot Starter (optional — auto-configures dag-flow in Spring Boot apps)
implementation 'com.lesofn:dag-flow-spring-boot-starter:1.0-SNAPSHOT'
// Hystrix extension (optional)
implementation 'com.lesofn:dag-flow-hystrix:1.0-SNAPSHOT'
// Resilience4j extension (optional)
implementation 'com.lesofn:dag-flow-resilience4j:1.0-SNAPSHOT'
}The context carries request data and provides access to upstream results:
public class OrderContext extends DagFlowContext {
private String orderId;
// getters & setters
}// Async I/O node (runs on async thread pool)
public class FetchOrder implements AsyncCommand<OrderContext, Order> {
@Override
public Order run(OrderContext context) {
return orderService.getById(context.getOrderId());
}
}
// CPU-bound node (runs on calc thread pool)
public class CalcDiscount implements CalcCommand<OrderContext, BigDecimal> {
@Override
public BigDecimal run(OrderContext context) {
Order order = context.getResult(FetchOrder.class);
return discountEngine.calculate(order);
}
}OrderContext context = new OrderContext();
context.setOrderId("12345");
JobRunner<OrderContext> runner = new JobBuilder<OrderContext>()
.node(FetchOrder.class)
.node(FetchUser.class)
.node(CalcDiscount.class).depend(FetchOrder.class, FetchUser.class)
.node(BuildResult.class).depend(CalcDiscount.class)
.run(context);
Result result = runner.getResult(BuildResult.class);This builds and executes the following DAG:
FetchOrder FetchUser ← parallel (no mutual dependency)
\ /
CalcDiscount ← waits for both
|
BuildResult
DagFlowCommand<C, R> // Base: R run(C context)
├── SyncCommand<C, R> // Runs on caller thread
├── AsyncCommand<C, R> // Runs on I/O thread pool
│ └── BatchCommand<C, P, R> // Fan-out per param → Map<P, R> (ALL/ANY/atLeast)
└── CalcCommand<C, R> // Runs on CPU thread pool
├── FunctionCommand // Lambda Function<C, R> wrapper
└── ConsumerCommand // Lambda Consumer<C> wrapper
Extensions (SyncCommand-based):
├── HystrixCommandWrapper // dag-flow-hystrix: Netflix Hystrix adapter
└── Resilience4jCommand // dag-flow-resilience4j: Resilience4j decorator wrapper
| Component | Description |
|---|---|
JobBuilder<C> |
Fluent API for DAG construction and node registration (extensible) |
JobRunner<C> |
CompletableFuture-based execution engine with result retrieval |
DagFlowContext |
Abstract context — subclass to carry request data and access upstream results |
DagNode |
Runtime node wrapping a command with its future and dependencies |
DagNodeCheck |
DFS-based cycle detection, runs before execution |
DagFlowDefaultExecutor |
Default thread pool configuration for async and calc nodes |
DagFlowTracing |
OpenTelemetry tracing utility — root, node, and batch-item spans |
| Pool | Core | Max | Queue | Use Case |
|---|---|---|---|---|
| Async (I/O) | CPU × 2 | CPU × 8 | CPU × 16 | Network calls, DB queries, file I/O |
| Calc (CPU) | CPU + 1 | CPU + 1 | CPU × 4 | Computation, transformation, aggregation |
Both pools use CallerRunsPolicy as the rejection handler.
When you use node(Class) without an explicit name, dag-flow auto-generates a name using the pattern className#0, className#1, etc.:
new JobBuilder<OrderContext>()
.node(FetchOrder.class) // auto-named "fetchOrder#0"
.node(FetchOrder.class) // auto-named "fetchOrder#1"
.node(CalcDiscount.class) // auto-named "calcDiscount#0"
.run(context);Lambda nodes use the prefix node: node#0, node#1, ...
You can also provide an explicit name:
builder.node("myCustomName", FetchOrder.class);For lightweight logic, skip creating a class:
new JobBuilder<OrderContext>()
.node(FetchOrder.class)
.node("format", (Function<OrderContext, String>) ctx -> {
Order order = ctx.getResult(FetchOrder.class);
return order.toString();
}).depend(FetchOrder.class)
.run(context);Fan-out a set of parameters into parallel sub-tasks:
public class BatchFetch implements BatchCommand<MyContext, Long, String> {
@Override
public Set<Long> batchParam(MyContext context) {
return Set.of(1L, 2L, 3L);
}
@Override
public String run(MyContext context, Long param) {
return fetchById(param);
}
}
// Result is Map<Long, String>
Map<Long, String> results = runner.getResult(BatchFetch.class);By default, BatchCommand waits for all sub-tasks to complete. Override batchStrategy() to change this:
| Strategy | Description |
|---|---|
BatchStrategy.ALL |
Wait for all sub-tasks (default) |
BatchStrategy.ANY |
Return as soon as 1 sub-task completes; cancel the rest |
BatchStrategy.atLeast(n) |
Return as soon as n sub-tasks complete; cancel the rest |
public class FastBatchFetch implements BatchCommand<MyContext, Long, String> {
@Override
public Set<Long> batchParam(MyContext context) {
return Set.of(1L, 2L, 3L, 4L, 5L);
}
@Override
public String run(MyContext context, Long param) {
return fetchFromReplica(param);
}
@Override
public BatchStrategy batchStrategy() {
return BatchStrategy.ANY; // first result wins
// return BatchStrategy.atLeast(3); // wait for at least 3
}
}Commands can declare their own dependencies:
public class CalcDiscount implements CalcCommand<OrderContext, BigDecimal> {
@Override
public Class<? extends DagFlowCommand<OrderContext, ?>> dependNode() {
return FetchOrder.class;
}
@Override
public BigDecimal run(OrderContext context) { ... }
}
// No need to call .depend() in the builder
new JobBuilder<OrderContext>()
.node(CalcDiscount.class) // auto-resolves dependency on FetchOrder
.run(context);Use dag-flow-hystrix module to wrap existing HystrixCommand implementations:
// Add dependency: implementation 'com.lesofn:dag-flow-hystrix:1.0-SNAPSHOT'
JobRunner<MyContext> runner = new HystrixJobBuilder<MyContext>()
.addHystrixNode(MyHystrixCommand.class)
.run(context);
String result = runner.getResult("myHystrixCommand");
// Or use the type-safe helper:
String result = HystrixJobBuilder.getHystrixResult(runner, MyHystrixCommand.class);Use dag-flow-resilience4j module to add fault tolerance to DAG nodes:
// Add dependency: implementation 'com.lesofn:dag-flow-resilience4j:1.0-SNAPSHOT'
CircuitBreaker cb = CircuitBreaker.of("myService", CircuitBreakerConfig.ofDefaults());
Retry retry = Retry.of("myService", RetryConfig.custom().maxAttempts(3).build());
Resilience4jCommand<MyContext, String> command =
new Resilience4jCommand<>(ctx -> callRemoteService(ctx))
.withCircuitBreaker(cb)
.withRetry(retry);
JobRunner<MyContext> runner = new Resilience4jJobBuilder<MyContext>()
.addResilience4jNode("protectedCall", command)
.node(DownstreamJob.class).depend("protectedCall")
.run(context);Supported decorators: CircuitBreaker, Retry, Bulkhead, RateLimiter, TimeLimiter — can be combined freely.
Add dag-flow-spring-boot-starter to auto-configure the dag-flow engine in Spring Boot applications:
// build.gradle
implementation 'com.lesofn:dag-flow-spring-boot-starter:1.0-SNAPSHOT'Once installed, SpringContextHolder is auto-registered and dependSpringBean() works out of the box — no manual configuration needed:
// Spring beans that implement DagFlowCommand can be used as DAG nodes
@Component
public class OrderService implements AsyncCommand<OrderContext, Order> {
@Autowired
private OrderRepository orderRepository;
@Override
public Order run(OrderContext context) {
return orderRepository.findById(context.getOrderId());
}
}
// Reference Spring beans by name in DAG construction
new JobBuilder<OrderContext>()
.node(CalcDiscount.class)
.dependSpringBean("orderService") // resolved from Spring ApplicationContext
.run(context);Configuration via application.properties / application.yml:
# Disable dag-flow auto-configuration (default: true)
dagflow.enabled=falseEnable virtual threads for all non-sync nodes with a single call:
JobRunner<MyContext> runner = new JobBuilder<MyContext>()
.useVirtualThreads() // enable virtual threads
.node(FetchOrder.class) // AsyncCommand → virtual thread
.node(FetchUser.class) // AsyncCommand → virtual thread
.node(CalcDiscount.class).depend(FetchOrder.class, FetchUser.class)
.run(context);SyncCommand— still runs on the caller thread (unchanged)AsyncCommand/CalcCommand— runs on virtual threads instead of platform thread pools- Without
useVirtualThreads(), the traditional I/O and CPU thread pools are used (default behavior)
dag-flow automatically creates distributed tracing spans for every DAG execution. When opentelemetry-api is on the classpath:
- A root span (
dagflow.run) is created per DAG execution with adagflow.node.countattribute - Each node gets a child span (
dagflow.node.<name>) withdagflow.node.nameanddagflow.node.typeattributes - Each batch sub-task gets its own child span (
dagflow.batch.<name>) with thedagflow.batch.paramattribute
When no OpenTelemetry SDK is configured, all tracing operations are no-op with zero overhead.
// Option 1: Use GlobalOpenTelemetry (default — zero configuration needed)
// Just configure OpenTelemetry SDK globally as usual
// Option 2: Provide a custom instance
DagFlowTracing.setOpenTelemetry(myOpenTelemetrySdk);
// Then run the DAG as normal — spans are created automatically
new JobBuilder<MyContext>()
.node(FetchOrder.class)
.node(CalcDiscount.class).depend(FetchOrder.class)
.run(context);
// Reset to GlobalOpenTelemetry
DagFlowTracing.setOpenTelemetry(null);Spring Boot configuration:
# Disable tracing (default: true)
dagflow.tracing-enabled=falseOverride the default thread pool for specific nodes:
// Per-node override in the command class
public class CustomJob implements AsyncCommand<MyContext, String> {
@Override
public Executor executor() {
return myCustomExecutor;
}
// ...
}
// Or pass executor to lambda nodes
builder.node("custom", myFunction, myExecutor);dag-flow/
├── dag-flow-core/ # Core module
│ └── src/main/java/com/lesofn/dagflow/
│ ├── JobBuilder.java # Fluent DAG construction (extensible)
│ ├── JobRunner.java # CompletableFuture execution engine
│ ├── api/
│ │ ├── DagFlowCommand.java # Base command interface
│ │ ├── SyncCommand.java # Synchronous command
│ │ ├── AsyncCommand.java # Async (I/O) command
│ │ ├── CalcCommand.java # CPU-bound command
│ │ ├── BatchCommand.java # Batch fan-out command
│ │ ├── BatchStrategy.java # Batch strategies: ALL, ANY, atLeast(N)
│ │ ├── context/ # Context & injection interfaces
│ │ ├── depend/ # Dependency declaration interface
│ │ └── function/ # Lambda wrappers
│ ├── exception/ # DagFlowBuildException, CycleException, etc.
│ ├── executor/ # Default thread pool configuration
│ ├── model/ # DagNode, DagNodeCheck, DagNodeFactory
│ ├── tracing/ # OpenTelemetry tracing (DagFlowTracing)
│ └── spring/ # Optional Spring integration
├── dag-flow-hystrix/ # Hystrix extension module
│ └── src/main/java/com/lesofn/dagflow/hystrix/
│ ├── HystrixCommandWrapper.java # HystrixCommand → SyncCommand adapter
│ └── HystrixJobBuilder.java # Builder with addHystrixNode()
├── dag-flow-resilience4j/ # Resilience4j extension module
│ └── src/main/java/com/lesofn/dagflow/resilience4j/
│ ├── Resilience4jCommand.java # Resilience4j decorator wrapper
│ └── Resilience4jJobBuilder.java # Builder with addResilience4jNode()
└── dag-flow-spring-boot-starter/ # Spring Boot 4 auto-configuration starter
└── src/main/java/com/lesofn/dagflow/spring/boot/autoconfigure/
├── DagFlowAutoConfiguration.java # Auto-registers SpringContextHolder
└── DagFlowProperties.java # dagflow.enabled configuration
Platform thread pool vs Virtual threads comparison (Java 21, 8 vCPU):
| Scenario | Platform Threads | Virtual Threads | Speedup |
|---|---|---|---|
| 10 Parallel I/O (100ms each) | 200.97 ms | 101.24 ms | 1.99x |
| 50 Parallel I/O (50ms each) | 302.88 ms | 58.43 ms | 5.18x |
| 100 Parallel I/O (20ms each) | 222.91 ms | 26.19 ms | 8.51x |
| 8 Parallel CPU Nodes | 4.65 ms | 4.47 ms | 1.04x |
| Mixed DAG (5 I/O → CPU → I/O) | 102.71 ms | 102.53 ms | 1.00x |
| Multi-Layer DAG (3×10, 30ms) | 121.80 ms | 62.03 ms | 1.96x |
Key takeaways:
- Virtual threads excel at I/O-bound workloads — up to 8.5x faster with 100 concurrent blocking nodes
- For CPU-bound tasks, performance is comparable (virtual threads add negligible overhead)
- The more concurrent I/O nodes, the greater the advantage of virtual threads over fixed-size thread pools
Run benchmarks yourself:
./gradlew :dag-flow-core:benchmark # Benchmarks only (excluded from default test)./gradlew build # Build all modules
./gradlew test # Run all tests (Spock + JUnit Platform)
./gradlew :dag-flow-core:test # Run core tests only
./gradlew :dag-flow-hystrix:test # Run Hystrix tests only
./gradlew :dag-flow-resilience4j:test # Run Resilience4j tests only
./gradlew :dag-flow-spring-boot-starter:test # Run Spring Boot Starter tests only
./gradlew :dag-flow-core:benchmark # Run performance benchmarks
./gradlew clean build # Clean build- Java 21+
- Gradle 8.10+ (wrapper included)