Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 84 additions & 26 deletions flows/src/main/java/com/softwaremill/jox/flows/Flow.java
Original file line number Diff line number Diff line change
Expand Up @@ -1491,34 +1491,92 @@ public <U extends T> Flow<U> onErrorRecover(ThrowingFunction<Throwable, U> f) {
* if the upstream fails with a handled exception.
*/
public <U extends T> Flow<U> recoverWith(ThrowingFunction<Throwable, Optional<Flow<U>>> pf) {
return recoverWithRetry(Schedule.immediate().maxRetries(0), pf);
}

/**
* Recovers from upstream errors by switching to a recovery flow with retry support. On failure
* matching {@code pf}, the recovery flow is materialized and run. If the recovery flow also
* fails and retries remain (as specified by the schedule), recovery is attempted again. After
* exhausting all retries, the error is propagated.
*
* <p>The function {@code pf} is re-evaluated on each retry attempt, allowing it to return
* different recovery flows.
*
* <p>Elements already emitted before the upstream error are preserved. Creates an asynchronous
* boundary (see {@link #buffer}) to isolate failures.
*
* @param schedule The retry schedule specifying intervals and maximum retries.
* @param pf A function mapping exceptions to recovery flows, or empty if not handled.
* @return A flow that emits elements from the upstream flow, and switches to a recovery flow on
* failure, retrying according to the schedule.
*/
public <U extends T> Flow<U> recoverWithRetry(
Schedule schedule, ThrowingFunction<Throwable, Optional<Flow<U>>> pf) {
return usingEmit(
emit ->
supervised(
scope -> {
Channel<U> channel = newChannelWithBufferSizeFromScope();
forkPropagate(
scope,
channel,
() -> {
try {
//noinspection unchecked
last.run(t -> channel.send((U) t));
} catch (Throwable e) {
Optional<Flow<U>> recovery = pf.apply(e);
if (recovery.isPresent()) {
recovery.get().runToEmit(channel::send);
} else {
channel.error(e);
return null;
}
}
channel.done();
return null;
});
emit -> supervised(scope -> recoverWithRetryImpl(scope, schedule, pf, emit)));
}

FlowEmit.channelToEmit(channel, emit);
return null;
}));
@SuppressWarnings("unchecked")
private <U> Void recoverWithRetryImpl(
Scope scope,
Schedule schedule,
ThrowingFunction<Throwable, Optional<Flow<U>>> pf,
FlowEmit<U> emit)
throws Exception {
Channel<U> channel = newChannelWithBufferSizeFromScope();
forkPropagate(
scope,
channel,
() -> {
try {
last.run(t -> channel.send((U) t));
} catch (Throwable e) {
Optional<Flow<U>> recovery = pf.apply(e);
if (recovery.isEmpty()) {
channel.error(e);
return null;
}
// use first pf result for initial attempt, re-evaluate pf on retries
boolean[] first = {true};
retryOnException(
schedule,
() -> {
Flow<U> flow;
if (first[0]) {
first[0] = false;
flow = recovery.get();
} else {
flow = pf.apply(e).orElseThrow();
}
flow.runToEmit(channel::send);
});
}
channel.done();
return null;
});
FlowEmit.channelToEmit(channel, emit);
return null;
}

private static void retryOnException(Schedule schedule, ThrowingRunnable operation)
throws Exception {
Iterator<Duration> intervals = schedule.newIterator();
while (true) {
try {
operation.run();
return;
} catch (Exception e) {
if (intervals.hasNext()) {
Duration delay = intervals.next();
if (!delay.isZero()) {
Thread.sleep(delay);
}
} else {
throw e;
}
}
}
}

/**
Expand Down
148 changes: 148 additions & 0 deletions flows/src/main/java/com/softwaremill/jox/flows/Schedule.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package com.softwaremill.jox.flows;

import java.time.Duration;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.Supplier;

/**
* Describes a retry schedule as a sequence of intervals between retry attempts. A schedule is a
* supplier of iterators over durations, re-created on each usage.
*
* <p>Schedules are infinite by default. Use {@link #maxRetries(int)} to limit the number of
* retries.
*/
public final class Schedule {
private static final Duration MAX_DURATION = Duration.ofDays(3650);

private final Supplier<Iterator<Duration>> intervals;

private Schedule(Supplier<Iterator<Duration>> intervals) {
this.intervals = intervals;
}

/** Returns a new iterator over the intervals for this schedule. */
Iterator<Duration> newIterator() {
return intervals.get();
}

// --- Factory methods ---

/**
* Infinite schedule with zero delay between attempts.
*
* @return A schedule yielding {@link Duration#ZERO} between each attempt.
*/
public static Schedule immediate() {
return new Schedule(() -> infiniteIterator(Duration.ZERO));
}

/**
* Infinite schedule with a fixed interval between attempts.
*
* @param interval The duration between attempts.
* @return A schedule yielding the given interval between each attempt.
*/
public static Schedule fixedInterval(Duration interval) {
return new Schedule(() -> infiniteIterator(interval));
}

/**
* Infinite exponential backoff schedule (base 2). Each interval is double the previous,
* starting from {@code initial}. Intervals are capped at ~3650 days to avoid overflow.
*
* @param initial The initial interval.
* @return A schedule with exponentially increasing intervals.
*/
public static Schedule exponentialBackoff(Duration initial) {
return new Schedule(
() ->
new Iterator<>() {
Duration current = initial;

public boolean hasNext() {
return true;
}

public Duration next() {
Duration result = current;
// double, capping to avoid overflow
long millis = current.toMillis();
current =
millis > MAX_DURATION.toMillis() / 2
? MAX_DURATION
: Duration.ofMillis(millis * 2);
return result;
}
});
}

// --- Modifiers ---

/**
* Limits total retries. The resulting schedule's iterator yields exactly {@code retries}
* intervals, allowing {@code retries + 1} total attempts (the initial attempt + retries).
*
* @param retries The maximum number of retries (must be >= 0).
* @return A schedule limited to the given number of retries.
*/
public Schedule maxRetries(int retries) {
if (retries < 0) {
throw new IllegalArgumentException("retries must be >= 0");
}
Schedule self = this;
return new Schedule(
() -> {
Iterator<Duration> delegate = self.intervals.get();
return new Iterator<>() {
int remaining = retries;

public boolean hasNext() {
return remaining > 0 && delegate.hasNext();
}

public Duration next() {
if (!hasNext()) throw new NoSuchElementException();
remaining--;
return delegate.next();
}
};
});
}

/**
* Caps each interval to the given maximum.
*
* @param max The maximum duration for any single interval.
* @return A schedule with each interval capped to the given maximum.
*/
public Schedule maxInterval(Duration max) {
Schedule self = this;
return new Schedule(
() -> {
Iterator<Duration> delegate = self.intervals.get();
return new Iterator<>() {
public boolean hasNext() {
return delegate.hasNext();
}

public Duration next() {
Duration d = delegate.next();
return d.compareTo(max) > 0 ? max : d;
}
};
});
}

private static Iterator<Duration> infiniteIterator(Duration value) {
return new Iterator<>() {
public boolean hasNext() {
return true;
}

public Duration next() {
return value;
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.softwaremill.jox.flows;

import static org.junit.jupiter.api.Assertions.*;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.Test;

import com.softwaremill.jox.structured.JoxScopeExecutionException;

class FlowRecoverWithRetryTest {

@Test
void shouldRetryRecoveryFlowOnFailure() throws Exception {
// given
var attemptCounter = new AtomicInteger(0);
int maxRetries = 2;
Flow<Integer> flow =
Flows.fromValues(1).concat(Flows.failed(new RuntimeException("upstream")));

// when
List<Integer> result =
flow.recoverWithRetry(
Schedule.immediate().maxRetries(maxRetries),
e -> {
if (e instanceof RuntimeException) {
int attempt = attemptCounter.incrementAndGet();
if (attempt <= maxRetries) {
return Optional.of(
Flows.failed(
new RuntimeException(
"recovery attempt "
+ attempt
+ " failed")));
}
return Optional.of(Flows.fromValues(42));
}
return Optional.empty();
})
.runToList();

// then
assertEquals(List.of(1, 42), result);
assertEquals(maxRetries + 1, attemptCounter.get());
}

@Test
void shouldPropagateErrorAfterExhaustingRetries() {
// given
Flow<Integer> flow =
Flows.fromValues(1).concat(Flows.failed(new RuntimeException("upstream")));

// when & then
var caught =
assertThrows(
JoxScopeExecutionException.class,
() ->
flow.recoverWithRetry(
Schedule.immediate().maxRetries(2),
e ->
e instanceof RuntimeException
? Optional.of(
Flows.failed(
new RuntimeException(
"still"
+ " failing")))
: Optional.empty())
.runToList());
assertInstanceOf(RuntimeException.class, caught.getCause().getCause());
assertEquals("still failing", caught.getCause().getCause().getMessage());
}
}
Loading