diff --git a/core/src/main/scala/ox/flow/Flow.scala b/core/src/main/scala/ox/flow/Flow.scala index 6f6de70b..0591ac53 100644 --- a/core/src/main/scala/ox/flow/Flow.scala +++ b/core/src/main/scala/ox/flow/Flow.scala @@ -9,7 +9,8 @@ import scala.annotation.nowarn * * A flow is lazy - evaluation happens only when it's run. * - * Flows can be created using the [[Flow.usingSink]], [[Flow.fromValues]] and other `Flow.from*` methods, [[Flow.tick]] etc. + * Flows can be created using the [[Flow.usingEmit]], [[Flow.usingChannel]], [[Flow.fromValues]] and other `Flow.from*` methods, + * [[Flow.tick]] etc. * * Transformation stages can be added using the available combinators, such as [[Flow.map]], [[Flow.buffer]], [[Flow.grouped]], etc. Each * such method returns a new immutable `Flow` instance. diff --git a/core/src/main/scala/ox/flow/FlowCompanionOps.scala b/core/src/main/scala/ox/flow/FlowCompanionOps.scala index 48aead87..cbd4db52 100644 --- a/core/src/main/scala/ox/flow/FlowCompanionOps.scala +++ b/core/src/main/scala/ox/flow/FlowCompanionOps.scala @@ -4,6 +4,7 @@ import ox.Fork import ox.channels.ChannelClosed import ox.channels.ChannelClosedUnion.isValue import ox.channels.Source +import ox.channels.Sink import ox.channels.BufferCapacity import ox.forever import ox.forkUnsupervised @@ -37,6 +38,26 @@ trait FlowCompanionOps: */ def usingEmit[T](withEmit: FlowEmit[T] => Unit): Flow[T] = usingEmitInline(withEmit) + /** Creates a flow, which when run, provides a [[Sink]] (channel) to the given `withSink` function. Elements can be sent to the sink to be + * processed by downstream stages. The `withSink` function is run asynchronously in a forked task. + * + * The flow completes when the `withSink` function completes and the provided sink is closed. The sink is automatically closed when + * `withSink` completes normally. If `withSink` throws an exception, the sink is closed with an error. + * + * Must be run within a concurrency scope as a fork is created to run the `withSink` function. + * + * @param withSink + * A function that receives a [[Sink]] to which elements can be sent. + */ + def usingChannel[T](withSink: Sink[T] => Unit)(using BufferCapacity, ox.OxUnsupervised): Flow[T] = usingEmitInline: emit => + val ch = BufferCapacity.newChannel[T] + val _ = forkUnsupervised: + try + withSink(ch) + ch.doneOrClosed().discard // the channel might be already closed by `withSink` + catch case e: Throwable => ch.errorOrClosed(e).discard // the channel might be already closed by `withSink` + FlowEmit.channelToEmit(ch, emit) + /** Creates a flow using the given `source`. An element is emitted for each value received from the source. If the source is completed * with an error, is it propagated by throwing. */ diff --git a/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala b/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala new file mode 100644 index 00000000..38fa089d --- /dev/null +++ b/core/src/test/scala/ox/flow/FlowOpsUsingChannelTest.scala @@ -0,0 +1,74 @@ +package ox.flow + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class FlowOpsUsingChannelTest extends AnyFlatSpec with Matchers: + behavior of "usingChannel" + + it should "send elements through the provided channel" in supervised: + Flow + .usingChannel(sink => + sink.send(1) + sink.send(2) + sink.send(3) + ) + .runToList() shouldBe List(1, 2, 3) + + it should "propagate errors from the channel" in supervised: + val exception = new RuntimeException("test error") + val result = intercept[ox.channels.ChannelClosedException.Error]: + Flow + .usingChannel[Int](sink => + sink.send(1) + throw exception + ) + .runToList() + result.cause shouldBe exception + + it should "work with transformations" in supervised: + Flow + .usingChannel[Int](sink => + sink.send(1) + sink.send(2) + sink.send(3) + ) + .map(_ * 2) + .runToList() shouldBe List(2, 4, 6) + + it should "support concurrent sending" in supervised: + Flow + .usingChannel[Int](sink => + val f1 = fork: + sink.send(1) + sink.send(2) + val f2 = fork: + sink.send(3) + sink.send(4) + f1.join() + f2.join() + ) + .runToList() + .sorted shouldBe List(1, 2, 3, 4) + + it should "handle channel closed by withSink with done()" in supervised: + Flow + .usingChannel[Int](sink => + sink.send(1) + sink.send(2) + sink.done() + ) + .runToList() shouldBe List(1, 2) + + it should "handle channel closed by withSink with error()" in supervised: + val exception = new RuntimeException("explicit error") + val result = intercept[ox.channels.ChannelClosedException.Error]: + Flow + .usingChannel[Int](sink => + sink.send(1) + sink.error(exception) + ) + .runToList() + result.cause shouldBe exception +end FlowOpsUsingChannelTest diff --git a/doc/streaming/flows.md b/doc/streaming/flows.md index 8cb3ae32..1b17a1de 100644 --- a/doc/streaming/flows.md +++ b/doc/streaming/flows.md @@ -60,6 +60,26 @@ As part of the callback, you can create [supervision scopes](../structured-concu Any asynchronous communication should be best done with [channels](channels.md). You can then manually forward any elements received from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`. +Alternatively, flows can be created by providing a function that receives a `Sink` (channel): + +```scala mdoc:compile-only +import ox.flow.Flow +import ox.{fork, supervised} + +supervised: + Flow.usingChannel: sink => + sink.send(1) + fork: + sink.send(2) + sink.send(3) + sink.send(4) + // TODO: transform the flow further & run +``` + +Unlike `usingEmit`, the `Sink` instance can be safely shared across threads, as channels are thread-safe. The provided function is run asynchronously in a forked task. The flow completes when the function completes and the sink is automatically closed. If the function throws an exception, it is propagated as a flow error. + +Note that `Flow.usingChannel` must be run within a concurrency scope, as it creates a fork to run the provided function. + ## Transforming flows: basics Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a number of pre-defined transformation stages, many of them similar in function to corresponding methods on Scala's collections: