From f749e546b9314c49f99d5bb4b5bfebd5e2844313 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Mon, 19 Jan 2026 18:53:02 +0000 Subject: [PATCH 1/7] Add Flow.usingChannel method This commit ports Flow.usingChannel from jox to ox (issue #396). Flow.usingChannel creates a Flow by providing a Sink (channel) that can be written to asynchronously. The method: - Creates a channel and forks a task to run the provided function - Automatically closes the channel on completion or error - Propagates elements from the channel to downstream flow stages Also includes comprehensive tests covering basic usage, error propagation, transformations, and concurrent sending. Co-authored-by: Adam Warski --- core/src/main/scala/ox/flow/Flow.scala | 2 +- .../main/scala/ox/flow/FlowCompanionOps.scala | 24 ++++++++ .../ox/flow/FlowOpsUsingChannelTest.scala | 60 +++++++++++++++++++ 3 files changed, 85 insertions(+), 1 deletion(-) create mode 100644 core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala diff --git a/core/src/main/scala/ox/flow/Flow.scala b/core/src/main/scala/ox/flow/Flow.scala index 6f6de70b..4d0a94d9 100644 --- a/core/src/main/scala/ox/flow/Flow.scala +++ b/core/src/main/scala/ox/flow/Flow.scala @@ -9,7 +9,7 @@ import scala.annotation.nowarn * * A flow is lazy - evaluation happens only when it's run. * - * Flows can be created using the [[Flow.usingSink]], [[Flow.fromValues]] and other `Flow.from*` methods, [[Flow.tick]] etc. + * Flows can be created using the [[Flow.usingEmit]], [[Flow.usingChannel]], [[Flow.fromValues]] and other `Flow.from*` methods, [[Flow.tick]] etc. * * Transformation stages can be added using the available combinators, such as [[Flow.map]], [[Flow.buffer]], [[Flow.grouped]], etc. Each * such method returns a new immutable `Flow` instance. diff --git a/core/src/main/scala/ox/flow/FlowCompanionOps.scala b/core/src/main/scala/ox/flow/FlowCompanionOps.scala index 48aead87..9b680379 100644 --- a/core/src/main/scala/ox/flow/FlowCompanionOps.scala +++ b/core/src/main/scala/ox/flow/FlowCompanionOps.scala @@ -4,6 +4,7 @@ import ox.Fork import ox.channels.ChannelClosed import ox.channels.ChannelClosedUnion.isValue import ox.channels.Source +import ox.channels.Sink import ox.channels.BufferCapacity import ox.forever import ox.forkUnsupervised @@ -37,6 +38,29 @@ trait FlowCompanionOps: */ def usingEmit[T](withEmit: FlowEmit[T] => Unit): Flow[T] = usingEmitInline(withEmit) + /** Creates a flow, which when run, provides a [[Sink]] (channel) to the given `withSink` function. Elements can be sent to the sink to + * be processed by downstream stages. The `withSink` function is run asynchronously in a forked task. + * + * The flow completes when the `withSink` function completes and the provided sink is closed. The sink is automatically closed when + * `withSink` completes normally. If `withSink` throws an exception, the sink is closed with an error. + * + * Must be run within a concurrency scope as a fork is created to run the `withSink` function. + * + * @param withSink + * A function that receives a [[Sink]] to which elements can be sent. + */ + def usingChannel[T](withSink: Sink[T] => Unit)(using BufferCapacity, ox.OxUnsupervised): Flow[T] = usingEmitInline: emit => + val ch = BufferCapacity.newChannel[T] + forkUnsupervised: + try + withSink(ch) + ch.done() + catch + case e: Throwable => + ch.error(e) + throw e + FlowEmit.channelToEmit(ch, emit) + /** Creates a flow using the given `source`. An element is emitted for each value received from the source. If the source is completed * with an error, is it propagated by throwing. */ diff --git a/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala b/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala new file mode 100644 index 00000000..cb8b396b --- /dev/null +++ b/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala @@ -0,0 +1,60 @@ +package ox.flow + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class FlowOpsUsingChannelTest extends AnyFlatSpec with Matchers: + behavior of "usingChannel" + + it should "send elements through the provided channel" in supervised: + Flow + .usingChannel(sink => + sink.send(1) + sink.send(2) + sink.send(3) + ) + .runToList() shouldBe List(1, 2, 3) + + it should "complete when the channel is done" in supervised: + Flow + .usingChannel[Int](sink => + sink.send(10) + sink.send(20) + ) + .runToList() shouldBe List(10, 20) + + it should "propagate errors from the channel" in supervised: + val exception = new RuntimeException("test error") + val result = intercept[RuntimeException]: + Flow + .usingChannel[Int](sink => + sink.send(1) + throw exception + ) + .runToList() + result shouldBe exception + + it should "work with transformations" in supervised: + Flow + .usingChannel[Int](sink => + sink.send(1) + sink.send(2) + sink.send(3) + ) + .map(_ * 2) + .runToList() shouldBe List(2, 4, 6) + + it should "support concurrent sending" in supervised: + Flow + .usingChannel[Int](sink => + fork: + sink.send(1) + sink.send(2) + fork: + sink.send(3) + sink.send(4) + ) + .runToList() + .sorted shouldBe List(1, 2, 3, 4) +end FlowOpsUsingChannelTest From 17b038cc4fc7ba5825a164c8235c023690aa65c6 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Mon, 19 Jan 2026 18:58:15 +0000 Subject: [PATCH 2/7] Add Flow.usingChannel documentation Co-authored-by: Adam Warski --- doc/streaming/flows.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/doc/streaming/flows.md b/doc/streaming/flows.md index 8cb3ae32..1b17a1de 100644 --- a/doc/streaming/flows.md +++ b/doc/streaming/flows.md @@ -60,6 +60,26 @@ As part of the callback, you can create [supervision scopes](../structured-concu Any asynchronous communication should be best done with [channels](channels.md). You can then manually forward any elements received from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`. +Alternatively, flows can be created by providing a function that receives a `Sink` (channel): + +```scala mdoc:compile-only +import ox.flow.Flow +import ox.{fork, supervised} + +supervised: + Flow.usingChannel: sink => + sink.send(1) + fork: + sink.send(2) + sink.send(3) + sink.send(4) + // TODO: transform the flow further & run +``` + +Unlike `usingEmit`, the `Sink` instance can be safely shared across threads, as channels are thread-safe. The provided function is run asynchronously in a forked task. The flow completes when the function completes and the sink is automatically closed. If the function throws an exception, it is propagated as a flow error. + +Note that `Flow.usingChannel` must be run within a concurrency scope, as it creates a fork to run the provided function. + ## Transforming flows: basics Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a number of pre-defined transformation stages, many of them similar in function to corresponding methods on Scala's collections: From e4ccf1d215298207eaf78b8fb3e41c4829a572bb Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Mon, 19 Jan 2026 19:42:48 +0000 Subject: [PATCH 3/7] Fix scalafmt formatting issues - Split long comment line in Flow.scala - Reformat usingChannel method signature in FlowCompanionOps.scala Co-authored-by: Adam Warski --- core/src/main/scala/ox/flow/Flow.scala | 3 ++- .../main/scala/ox/flow/FlowCompanionOps.scala | 23 ++++++++++--------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/ox/flow/Flow.scala b/core/src/main/scala/ox/flow/Flow.scala index 4d0a94d9..0591ac53 100644 --- a/core/src/main/scala/ox/flow/Flow.scala +++ b/core/src/main/scala/ox/flow/Flow.scala @@ -9,7 +9,8 @@ import scala.annotation.nowarn * * A flow is lazy - evaluation happens only when it's run. * - * Flows can be created using the [[Flow.usingEmit]], [[Flow.usingChannel]], [[Flow.fromValues]] and other `Flow.from*` methods, [[Flow.tick]] etc. + * Flows can be created using the [[Flow.usingEmit]], [[Flow.usingChannel]], [[Flow.fromValues]] and other `Flow.from*` methods, + * [[Flow.tick]] etc. * * Transformation stages can be added using the available combinators, such as [[Flow.map]], [[Flow.buffer]], [[Flow.grouped]], etc. Each * such method returns a new immutable `Flow` instance. diff --git a/core/src/main/scala/ox/flow/FlowCompanionOps.scala b/core/src/main/scala/ox/flow/FlowCompanionOps.scala index 9b680379..058fc0e9 100644 --- a/core/src/main/scala/ox/flow/FlowCompanionOps.scala +++ b/core/src/main/scala/ox/flow/FlowCompanionOps.scala @@ -49,17 +49,18 @@ trait FlowCompanionOps: * @param withSink * A function that receives a [[Sink]] to which elements can be sent. */ - def usingChannel[T](withSink: Sink[T] => Unit)(using BufferCapacity, ox.OxUnsupervised): Flow[T] = usingEmitInline: emit => - val ch = BufferCapacity.newChannel[T] - forkUnsupervised: - try - withSink(ch) - ch.done() - catch - case e: Throwable => - ch.error(e) - throw e - FlowEmit.channelToEmit(ch, emit) + def usingChannel[T](withSink: Sink[T] => Unit)(using BufferCapacity, ox.OxUnsupervised): Flow[T] = + usingEmitInline: emit => + val ch = BufferCapacity.newChannel[T] + forkUnsupervised: + try + withSink(ch) + ch.done() + catch + case e: Throwable => + ch.error(e) + throw e + FlowEmit.channelToEmit(ch, emit) /** Creates a flow using the given `source`. An element is emitted for each value received from the source. If the source is completed * with an error, is it propagated by throwing. From ae5c7fbfea4e243d729768dc10ed7c33942c8847 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <41898282+claude[bot]@users.noreply.github.com> Date: Mon, 19 Jan 2026 19:48:00 +0000 Subject: [PATCH 4/7] Fix scalafmt formatting issues Co-authored-by: Adam Warski --- .../main/scala/ox/flow/FlowCompanionOps.scala | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/ox/flow/FlowCompanionOps.scala b/core/src/main/scala/ox/flow/FlowCompanionOps.scala index 058fc0e9..9b680379 100644 --- a/core/src/main/scala/ox/flow/FlowCompanionOps.scala +++ b/core/src/main/scala/ox/flow/FlowCompanionOps.scala @@ -49,18 +49,17 @@ trait FlowCompanionOps: * @param withSink * A function that receives a [[Sink]] to which elements can be sent. */ - def usingChannel[T](withSink: Sink[T] => Unit)(using BufferCapacity, ox.OxUnsupervised): Flow[T] = - usingEmitInline: emit => - val ch = BufferCapacity.newChannel[T] - forkUnsupervised: - try - withSink(ch) - ch.done() - catch - case e: Throwable => - ch.error(e) - throw e - FlowEmit.channelToEmit(ch, emit) + def usingChannel[T](withSink: Sink[T] => Unit)(using BufferCapacity, ox.OxUnsupervised): Flow[T] = usingEmitInline: emit => + val ch = BufferCapacity.newChannel[T] + forkUnsupervised: + try + withSink(ch) + ch.done() + catch + case e: Throwable => + ch.error(e) + throw e + FlowEmit.channelToEmit(ch, emit) /** Creates a flow using the given `source`. An element is emitted for each value received from the source. If the source is completed * with an error, is it propagated by throwing. From 880cf723b6253a68fd87ee72e68bb79411ec084e Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Wed, 4 Feb 2026 13:58:49 +0000 Subject: [PATCH 5/7] Fix scalafmt formatting in FlowCompanionOps Co-Authored-By: Claude Opus 4.5 --- core/src/main/scala/ox/flow/FlowCompanionOps.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/ox/flow/FlowCompanionOps.scala b/core/src/main/scala/ox/flow/FlowCompanionOps.scala index 9b680379..50979820 100644 --- a/core/src/main/scala/ox/flow/FlowCompanionOps.scala +++ b/core/src/main/scala/ox/flow/FlowCompanionOps.scala @@ -38,8 +38,8 @@ trait FlowCompanionOps: */ def usingEmit[T](withEmit: FlowEmit[T] => Unit): Flow[T] = usingEmitInline(withEmit) - /** Creates a flow, which when run, provides a [[Sink]] (channel) to the given `withSink` function. Elements can be sent to the sink to - * be processed by downstream stages. The `withSink` function is run asynchronously in a forked task. + /** Creates a flow, which when run, provides a [[Sink]] (channel) to the given `withSink` function. Elements can be sent to the sink to be + * processed by downstream stages. The `withSink` function is run asynchronously in a forked task. * * The flow completes when the `withSink` function completes and the provided sink is closed. The sink is automatically closed when * `withSink` completes normally. If `withSink` throws an exception, the sink is closed with an error. From af7954f73df85602a9654066c790ab6203d0cb21 Mon Sep 17 00:00:00 2001 From: Adam Warski Date: Wed, 4 Feb 2026 14:07:16 +0000 Subject: [PATCH 6/7] Fix usingChannel error propagation and concurrent sending test - Unwrap ChannelClosedException.Error in usingChannel to throw original exception - Fix concurrent sending test to join forks before returning from withSink Co-Authored-By: Claude Opus 4.5 --- core/src/main/scala/ox/flow/FlowCompanionOps.scala | 4 +++- core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/ox/flow/FlowCompanionOps.scala b/core/src/main/scala/ox/flow/FlowCompanionOps.scala index 50979820..ce7dd8bb 100644 --- a/core/src/main/scala/ox/flow/FlowCompanionOps.scala +++ b/core/src/main/scala/ox/flow/FlowCompanionOps.scala @@ -2,6 +2,7 @@ package ox.flow import ox.Fork import ox.channels.ChannelClosed +import ox.channels.ChannelClosedException import ox.channels.ChannelClosedUnion.isValue import ox.channels.Source import ox.channels.Sink @@ -59,7 +60,8 @@ trait FlowCompanionOps: case e: Throwable => ch.error(e) throw e - FlowEmit.channelToEmit(ch, emit) + try FlowEmit.channelToEmit(ch, emit) + catch case ChannelClosedException.Error(cause) => throw cause /** Creates a flow using the given `source`. An element is emitted for each value received from the source. If the source is completed * with an error, is it propagated by throwing. diff --git a/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala b/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala index cb8b396b..b8dcec42 100644 --- a/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala +++ b/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala @@ -48,12 +48,14 @@ class FlowOpsUsingChannelTest extends AnyFlatSpec with Matchers: it should "support concurrent sending" in supervised: Flow .usingChannel[Int](sink => - fork: + val f1 = fork: sink.send(1) sink.send(2) - fork: + val f2 = fork: sink.send(3) sink.send(4) + f1.join() + f2.join() ) .runToList() .sorted shouldBe List(1, 2, 3, 4) From b75ffc98d9571fc3f4b4a1e5b0ef1df364f9a912 Mon Sep 17 00:00:00 2001 From: adamw Date: Thu, 5 Feb 2026 10:50:49 +0100 Subject: [PATCH 7/7] Fix Flow.usingChannel to handle already-closed channels Uses doneOrClosed/errorOrClosed instead of done/error to safely handle cases where withSink closes the channel. Removes exception unwrapping to preserve ChannelClosedException.Error. Adds test cases for channel closure scenarios. Co-Authored-By: Claude Sonnet 4.5 --- .../main/scala/ox/flow/FlowCompanionOps.scala | 13 +++----- .../ox/flow/FlowOpsUsingChannelTest.scala | 32 +++++++++++++------ 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/ox/flow/FlowCompanionOps.scala b/core/src/main/scala/ox/flow/FlowCompanionOps.scala index ce7dd8bb..cbd4db52 100644 --- a/core/src/main/scala/ox/flow/FlowCompanionOps.scala +++ b/core/src/main/scala/ox/flow/FlowCompanionOps.scala @@ -2,7 +2,6 @@ package ox.flow import ox.Fork import ox.channels.ChannelClosed -import ox.channels.ChannelClosedException import ox.channels.ChannelClosedUnion.isValue import ox.channels.Source import ox.channels.Sink @@ -52,16 +51,12 @@ trait FlowCompanionOps: */ def usingChannel[T](withSink: Sink[T] => Unit)(using BufferCapacity, ox.OxUnsupervised): Flow[T] = usingEmitInline: emit => val ch = BufferCapacity.newChannel[T] - forkUnsupervised: + val _ = forkUnsupervised: try withSink(ch) - ch.done() - catch - case e: Throwable => - ch.error(e) - throw e - try FlowEmit.channelToEmit(ch, emit) - catch case ChannelClosedException.Error(cause) => throw cause + ch.doneOrClosed().discard // the channel might be already closed by `withSink` + catch case e: Throwable => ch.errorOrClosed(e).discard // the channel might be already closed by `withSink` + FlowEmit.channelToEmit(ch, emit) /** Creates a flow using the given `source`. An element is emitted for each value received from the source. If the source is completed * with an error, is it propagated by throwing. diff --git a/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala b/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala index b8dcec42..38fa089d 100644 --- a/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala +++ b/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala @@ -16,24 +16,16 @@ class FlowOpsUsingChannelTest extends AnyFlatSpec with Matchers: ) .runToList() shouldBe List(1, 2, 3) - it should "complete when the channel is done" in supervised: - Flow - .usingChannel[Int](sink => - sink.send(10) - sink.send(20) - ) - .runToList() shouldBe List(10, 20) - it should "propagate errors from the channel" in supervised: val exception = new RuntimeException("test error") - val result = intercept[RuntimeException]: + val result = intercept[ox.channels.ChannelClosedException.Error]: Flow .usingChannel[Int](sink => sink.send(1) throw exception ) .runToList() - result shouldBe exception + result.cause shouldBe exception it should "work with transformations" in supervised: Flow @@ -59,4 +51,24 @@ class FlowOpsUsingChannelTest extends AnyFlatSpec with Matchers: ) .runToList() .sorted shouldBe List(1, 2, 3, 4) + + it should "handle channel closed by withSink with done()" in supervised: + Flow + .usingChannel[Int](sink => + sink.send(1) + sink.send(2) + sink.done() + ) + .runToList() shouldBe List(1, 2) + + it should "handle channel closed by withSink with error()" in supervised: + val exception = new RuntimeException("explicit error") + val result = intercept[ox.channels.ChannelClosedException.Error]: + Flow + .usingChannel[Int](sink => + sink.send(1) + sink.error(exception) + ) + .runToList() + result.cause shouldBe exception end FlowOpsUsingChannelTest