Context
We use Anorm with PekkoStream (previously AkkaStream) in a Play Framework application. Our usage is pretty standard with the exception that the requests we do with Anorm are to a database that can take several seconds (or even more sometimes) to start answering results.
In this situation, we noticed that all "default" threads can become blocked and thus the Play application becomes irresponsive to any request (including a healthcheck request that is absolutely unrelated and just answers OK without doing anything else).
Details
Our usage
Just to highlight that we're not doing anything custom, minimized code:
val source: Source[OurDataModel, NotUsed] = PekkoStream.source(SQL("SELECT * FROM ..."), rowParser)
val jsonSource: Source[ByteString, NotUsed] = source.map(row => transformRowToJson(row))
Ok.chunked(jsonSource) // Play Results
Observations
A thread dump during a period where the application is irresponsive shows threads being waiting/hanging in the preStart of the stream:
....
at app//anorm.Cursor$.apply(Cursor.scala:31)
at app//anorm.Sql$.unsafeCursor(Anorm.scala:248)
at app//anorm.PekkoStream$ResultSource$$anon$1.nextCursor(PekkoStream.scala:149)
at app//anorm.PekkoStream$ResultSource$$anon$1.preStart(PekkoStream.scala:126)
at app//org.apache.pekko.stream.impl.fusing.GraphInterpreter.init(GraphInterpreter.scala:317)
at app//org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:631)
...
at app//org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:729)
at app//org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at app//org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at app//org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
at app//org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
at app//org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
at java.base@21.0.5/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:387)
at java.base@21.0.5/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1312)
at java.base@21.0.5/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1843)
at java.base@21.0.5/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1808)
at java.base@21.0.5/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:188)
Relevant code in Anorm:
|
override def preStart(): Unit = { |
|
try { |
|
resultSet = sql.unsafeResultSet(connection) |
|
nextCursor() |
|
} catch { |
|
case NonFatal(cause) => failWith(cause) |
|
} |
|
} |
Workaround
We've been able to fix the issue on our side by explicitly wrapping the preStart code in a scala.concurrent.blocking block.
override def preStart(): Unit = {
+ blocking {
try {
resultSet = sql.unsafeResultSet(connection)
nextCursor()
} catch {
case NonFatal(cause) => failWith(cause)
}
+ }
}
(This requires that we "fork" this class in our own code.)
This has for consequences that the thread pool of Pekko actors can grow to still be able to accept non-blocking code. The threads hanging for the database are not blocking anymore other requests.
Now the question is: should it be the default behavior? As Anorm and JDBC drivers are blocking by default, shouldn't the preStart be robust so that if it blocks, it doesn't affect the Pekko actors threads? Would there be any downside in wrapping this code in blocking even if it isn't actually blocking?
If it makes sense to you, I'll be happy to open the PR :)
Context
We use Anorm with PekkoStream (previously AkkaStream) in a Play Framework application. Our usage is pretty standard with the exception that the requests we do with Anorm are to a database that can take several seconds (or even more sometimes) to start answering results.
In this situation, we noticed that all "default" threads can become blocked and thus the Play application becomes irresponsive to any request (including a healthcheck request that is absolutely unrelated and just answers
OKwithout doing anything else).Details
Our usage
Just to highlight that we're not doing anything custom, minimized code:
Observations
A thread dump during a period where the application is irresponsive shows threads being waiting/hanging in the
preStartof the stream:Relevant code in Anorm:
anorm/pekko/src/main/scala/anorm/PekkoStream.scala
Lines 123 to 130 in 7501f26
Workaround
We've been able to fix the issue on our side by explicitly wrapping the
preStartcode in ascala.concurrent.blockingblock.override def preStart(): Unit = { + blocking { try { resultSet = sql.unsafeResultSet(connection) nextCursor() } catch { case NonFatal(cause) => failWith(cause) } + } }(This requires that we "fork" this class in our own code.)
This has for consequences that the thread pool of Pekko actors can grow to still be able to accept non-blocking code. The threads hanging for the database are not blocking anymore other requests.
Now the question is: should it be the default behavior? As Anorm and JDBC drivers are blocking by default, shouldn't the
preStartbe robust so that if it blocks, it doesn't affect the Pekko actors threads? Would there be any downside in wrapping this code inblockingeven if it isn't actually blocking?If it makes sense to you, I'll be happy to open the PR :)