Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Control/Concurrent/Async.hs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ module Control.Concurrent.Async (
-- ** Linking
link, linkOnly, link2, link2Only, ExceptionInLinkedThread(..),

-- ** Exception annotations
AsyncWaitLocation(..)

) where

import Control.Concurrent.Async.Internal
161 changes: 138 additions & 23 deletions Control/Concurrent/Async/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
{-# LANGUAGE DeriveDataTypeable #-}
#endif
{-# OPTIONS -Wall #-}
{-# LANGUAGE ScopedTypeVariables #-}

-----------------------------------------------------------------------------
-- |
Expand All @@ -25,7 +26,21 @@
--
-----------------------------------------------------------------------------

module Control.Concurrent.Async.Internal where
module Control.Concurrent.Async.Internal (
module Control.Concurrent.Async.Internal,

#if MIN_VERSION_base(4,21,0)
-- * Compatibility wrapper for base < 4.20
-- These items are defined for base < 4.20 in this module and in
-- Control.Exception[.Context] for base >= 4.20. In order to ease usage of
-- the internal API, we reexport them here.
ExceptionWithContext(..),
rethrowIO,
catchNoPropagate,
tryWithContext
#endif

)where

import Control.Concurrent.STM
import Control.Exception
Expand Down Expand Up @@ -56,6 +71,12 @@ import Data.IORef
import GHC.Exts
import GHC.IO hiding (finally, onException)
import GHC.Conc (ThreadId(..), labelThread)
import GHC.Stack (CallStack, callStack, prettyCallStack, withFrozenCallStack)

#if MIN_VERSION_base(4,21,0)
import Control.Exception.Annotation (ExceptionAnnotation (..))
#endif
import GHC.Stack.Types (HasCallStack)

#if defined(__MHS__)
import Data.Traversable
Expand Down Expand Up @@ -84,7 +105,7 @@ data Async a = Async
{ asyncThreadId :: {-# UNPACK #-} !ThreadId
-- ^ Returns the 'ThreadId' of the thread running
-- the given 'Async'.
, _asyncWait :: STM (Either SomeException a)
, _asyncWait :: STM (Either (ExceptionWithContext SomeException) a)
}

instance Eq (Async a) where
Expand Down Expand Up @@ -152,7 +173,7 @@ asyncUsing doFork action = do
-- t <- forkFinally action (\r -> atomically $ putTMVar var r)
-- slightly faster:
t <- mask $ \restore ->
doFork $ try (restore action_plus) >>= atomically . putTMVar var
doFork $ tryWithContext (restore action_plus) >>= atomically . putTMVar var
return (Async t (readTMVar var))


Expand Down Expand Up @@ -215,23 +236,99 @@ withAsyncUsing doFork action inner = do
var <- newEmptyTMVarIO
mask $ \restore -> do
let action_plus = debugLabelMe >> action
t <- doFork $ try (restore action_plus) >>= atomically . putTMVar var
t <- doFork $ tryWithContext (restore action_plus) >>= atomically . putTMVar var
let a = Async t (readTMVar var)
r <- restore (inner a) `catchAll` \e -> do
-- Using catch/no/propagate and rethrowIO, we do not wrap the exception
-- with a `WhileWaiting`
r <- restore (inner a) `catchNoPropagate` \e -> do
uninterruptibleCancel a
throwIO e
rethrowIO (e :: ExceptionWithContext SomeException)
uninterruptibleCancel a
return r

-- * Compatibilty logic with base 4.21 for exception context. The rational here is that this module is implemented with 'ExceptionWithContext' as the basic building block with the following special cases:
--
-- - With base >= 4.21 (GHC 9.12), exception context is propagated correctly using the 'rethrowIO', 'catchNoPropagate', ... functions.
-- - With base >= 4.20 (GHC 9.10), exception context logic exists, but not the 'rethrow' logic. We reimplemented these function which are basically discarding the context
-- - With base < 4.20 (GHC 9.8 and older), we just use the old functions which does not know anything about exception context. We implement an alias 'ExceptionWithContext' which is actually bare exception.
--
-- For all version we implement 'dropContext' which is able to drop the
-- context, for all the function such as 'poll' which returns an exception without context.


-- | Drop the exception context
dropContext :: ExceptionWithContext t -> t

-- | Rethrow an exception inside 'STM' context, while preserving the 'ExceptionContext'. See 'rethrowIO' for details.
rethrowSTM :: Exception e => ExceptionWithContext e -> STM a

#if MIN_VERSION_base(4,21,0)
-- The 'rethrowIO', 'catchNoPropagate' and 'tryWithContext' are already available in base
#else
-- In older version, we reimplement them
rethrowIO :: ExceptionWithContext SomeException -> IO a
catchNoPropagate :: forall e a. Exception e => IO a -> (ExceptionWithContext e -> IO a) -> IO a
tryWithContext :: IO a -> IO (Either (ExceptionWithContext SomeException) a)
#endif

#if MIN_VERSION_base(4,21,0)
dropContext (ExceptionWithContext _context e) = e
rethrowSTM e = throwSTM (NoBacktrace e)
#elif MIN_VERSION_base(4,20,0)
dropContext (ExceptionWithContext _context e) = e

-- For rethrowSTM and rethrowIO, it is important to drop the context, otherwise
-- we throw an exception which is actually an "ExceptionWithContext" embedding
-- an exception (so that's an exception inside an exception) and later
-- "fromException" won't behave as expected.
rethrowSTM e = throwSTM (dropContext e)

rethrowIO e = throwIO (dropContext e)
catchNoPropagate = catch
tryWithContext = try
#else
dropContext e = e
rethrowSTM e = throwSTM e

type ExceptionWithContext e = e
rethrowIO e = throwIO e
catchNoPropagate = catch
tryWithContext = try
#endif

-- | An exception annotation which stores the callstack of a 'wait',
-- 'waitBoth', 'waitEither' call.
data AsyncWaitLocation = AsyncWaitLocation CallStack
deriving (Show)

#if MIN_VERSION_base(4,21,0)
instance ExceptionAnnotation AsyncWaitLocation where
displayExceptionAnnotation (AsyncWaitLocation callstack) = "AsyncWaitLocation " <> prettyCallStack callstack

-- | Annotate an exception with the current callstack with GHC >= 9.12
annotateWithCallSite :: HasCallStack => IO b -> IO b
annotateWithCallSite action = do
resM <- tryWithContext action
case resM of
Right res -> pure res
Left (exc :: ExceptionWithContext SomeException) -> do
annotateIO (AsyncWaitLocation callStack) $ rethrowIO exc
#else
-- | Do nothing with GHC < 9.12
annotateWithCallSite :: HasCallStack => IO b -> IO b
annotateWithCallSite action = action
#endif


-- | Wait for an asynchronous action to complete, and return its
-- value. If the asynchronous action threw an exception, then the
-- exception is re-thrown by 'wait'.
--
-- > wait = atomically . waitSTM
--
{-# INLINE wait #-}
wait :: Async a -> IO a
wait = tryAgain . atomically . waitSTM
wait :: HasCallStack => Async a -> IO a
wait = withFrozenCallStack $ annotateWithCallSite . tryAgain . atomically . waitSTM
where
-- See: https://github.com/simonmar/async/issues/14
tryAgain f = f `catch` \BlockedIndefinitelyOnSTM -> f
Expand Down Expand Up @@ -264,20 +361,38 @@ poll = atomically . pollSTM
--
waitSTM :: Async a -> STM a
waitSTM a = do
r <- waitCatchSTM a
either throwSTM return r
r <- waitCatchSTMWithContext a
either (rethrowSTM) return r

-- | A version of 'waitCatch' that can be used inside an STM transaction.
--
{-# INLINE waitCatchSTM #-}
waitCatchSTM :: Async a -> STM (Either SomeException a)
waitCatchSTM (Async _ w) = w
waitCatchSTM (Async _ w) = either (Left . dropContext) Right <$> w


-- | A version of 'waitCatch' that can be used inside an STM transaction.
--
-- The returned exception keep the 'ExceptionContext'. See 'tryWithContext' for details.
{-# INLINE waitCatchSTMWithContext #-}
waitCatchSTMWithContext :: Async a -> STM (Either (ExceptionWithContext SomeException) a)
waitCatchSTMWithContext (Async _ w) = w

-- | A version of 'poll' that can be used inside an STM transaction.
--
{-# INLINE pollSTM #-}
pollSTM :: Async a -> STM (Maybe (Either SomeException a))
pollSTM (Async _ w) = (Just <$> w) `orElse` return Nothing
pollSTM (Async _ w) = (Just . either (Left . dropContext) Right <$> w) `orElse` return Nothing

#if MIN_VERSION_base(4,21,0)
-- | A version of 'poll' that can be used inside an STM transaction.
--
-- It keep the exception context associated with the exception. See 'tryWithContext' for details.
--
{-# INLINE pollSTMWithContext #-}
pollSTMWithContext :: Async a -> STM (Maybe (Either (ExceptionWithContext SomeException) a))
pollSTMWithContext (Async _ w) = (Just <$> w) `orElse` return Nothing
#endif

-- | Cancel an asynchronous action by throwing the @AsyncCancelled@
-- exception to it, and waiting for the `Async` thread to quit.
Expand Down Expand Up @@ -436,8 +551,8 @@ waitEitherCatchCancel left right =
-- re-thrown by 'waitEither'.
--
{-# INLINE waitEither #-}
waitEither :: Async a -> Async b -> IO (Either a b)
waitEither left right = atomically (waitEitherSTM left right)
waitEither :: HasCallStack => Async a -> Async b -> IO (Either a b)
waitEither left right = withFrozenCallStack $ annotateWithCallSite $ atomically (waitEitherSTM left right)

-- | A version of 'waitEither' that can be used inside an STM transaction.
--
Expand Down Expand Up @@ -475,8 +590,8 @@ waitEitherCancel left right =
-- re-thrown by 'waitBoth'.
--
{-# INLINE waitBoth #-}
waitBoth :: Async a -> Async b -> IO (a,b)
waitBoth left right = tryAgain $ atomically (waitBothSTM left right)
waitBoth :: HasCallStack => Async a -> Async b -> IO (a,b)
waitBoth left right = withFrozenCallStack $ annotateWithCallSite $ tryAgain $ atomically (waitBothSTM left right)
where
-- See: https://github.com/simonmar/async/issues/14
tryAgain f = f `catch` \BlockedIndefinitelyOnSTM -> f
Expand Down Expand Up @@ -664,7 +779,7 @@ race left right = concurrently' left right collect
collect m = do
e <- m
case e of
Left ex -> throwIO ex
Left ex -> rethrowIO ex
Right r -> return r

-- race_ :: IO a -> IO b -> IO ()
Expand All @@ -678,7 +793,7 @@ concurrently left right = concurrently' left right (collect [])
collect xs m = do
e <- m
case e of
Left ex -> throwIO ex
Left ex -> rethrowIO ex
Right r -> collect (r:xs) m

-- concurrentlyE :: IO (Either e a) -> IO (Either e b) -> IO (Either e (a, b))
Expand All @@ -691,13 +806,13 @@ concurrentlyE left right = concurrently' left right (collect [])
collect xs m = do
e <- m
case e of
Left ex -> throwIO ex
Left ex -> rethrowIO ex
Right r -> collect (r:xs) m

concurrently' ::
Copy link
Copy Markdown
Contributor

@phadej phadej Mar 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, concurrently' is an internal, not exposed combinator, so I suggest that it's changed from

concurrently' ::
  CALLSTACK
  IO a -> IO b
  -> (IO (Either SomeException (Either a b)) -> IO r)
  -> IO r

to

concurrently' ::
  CALLSTACK
  IO a -> IO b
  -> (IO (Either (ExceptionWithContext SomeException) (Either a b)) -> IO r)
  -> IO r

i.e. catch -> catchNoPropagate change. If I'm looking right, that would remove the need for very suspicious looking rethrowIO', the base's rethrowIO will be enough.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, catchAll in the body should probably be catchNoPropogate.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phadej I agree with your proposal, but it mean that the complete concurrently' block would be duplicated with a #if MIN_VERSION_base(4,21,0) pragma. And similarly, if you want to avoid the rethrowIO' function, we also need to duplicate all the rethrowIO' callsite to dispatch between rethrowIO and throwIO. This is a lot of code duplication, unless I missunderstood your proposal.

Note that I do admit that rethrowIO' is suspicious, but it is well localised and seems safe because SomException may contain an exception context (Actually, it will always contain one, unless it was explicitely removed).

Regarding catchAll versus catchNoPropagate. I would claim that in this context we do not really care.

The point of catchNoPropagate is for the handler to possibly rethrow the exception explicitely and hence not having the WhileHandling wrapping an a new callstack. Here the handler is a putMVar done . Left which does not explicitely rethrow the exception (hence the WhileHandling won't happen), unless another exception happen during the putMVar and I would admit that this context is, from my understanding, rare or even impossible, but actually not a "normal" use case of the async library, and does not fall into the the scope of this MR (e.g. improving exception context in "normal" use case, meaning when an exception happen in the task run by the library, no in its internals).

The "problem" with catchNoPropagate is that it returns an ExceptionWithContext and if we want to store that inside the mvar, we actually need to patch all the library.

I think my approach is imperfect because of backward compatibility. When later we'll drop support for base <4.21, we'll be able to rewrite the complete library and turns all the rethrowIO' to proper rethrowIO and all the SomeException to proper ExceptionWithContext SomeException, but this is not doable immediately without tones of CPP.

@phadej I see a few options here:

a) Keep the implementation as it is right now
b) Introduce a lot of CPP everywhere
c) Copy paste everything in another module and do conditional build based on the base version
d) Drop support for base <4.21 and do a "major" async release and maybe maintain the old one in a different branch with a different version number?
e) (I just thought about it): use CPP only in a really localised place to implement rethrowIO, catchNoPropagate and ExceptionWithContext which would map either to throwIO, catch and SomeException or the version from base>= 4.21. Actually, this may be the best option. Do you want me to attempt this refactor?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@phadej, @Bodigrim, would you have a look at the latest commit which tries a different approach: using tryWithContext / catchNoPropagate everywhere and adding a simple compatibility layer.

The test fails with GHC 9.10, I'm investigating, but I would like to know first if you prefer this approach.

CALLSTACK
IO a -> IO b
-> (IO (Either SomeException (Either a b)) -> IO r)
-> (IO (Either (ExceptionWithContext SomeException) (Either a b)) -> IO r)
-> IO r
concurrently' left right collect = do
done <- newEmptyMVar
Expand All @@ -708,10 +823,10 @@ concurrently' left right collect = do
-- the thread to terminate.
lid <- forkIO $ uninterruptibleMask_ $
restore (left >>= putMVar done . Right . Left)
`catchAll` (putMVar done . Left)
`catchNoPropagate` (putMVar done . Left)
rid <- forkIO $ uninterruptibleMask_ $
restore (right >>= putMVar done . Right . Right)
`catchAll` (putMVar done . Left)
`catchNoPropagate` (putMVar done . Left)

count <- newIORef (2 :: Int)
let takeDone = do
Expand Down Expand Up @@ -752,7 +867,7 @@ concurrently_ left right = concurrently' left right (collect 0)
collect i m = do
e <- m
case e of
Left ex -> throwIO ex
Left ex -> rethrowIO ex
Right _ -> collect (i + 1 :: Int) m


Expand Down
11 changes: 11 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
- #165 async will preserve the `ExceptionContext` (on base >= 4.21, hence
starting with GHC 9.12) of exceptions runs inside the `Async`. Especially, it
now returs a callstack pointing inside the executed code instead of an
arbitrary location inside `async` library. For call such as `concurrently`,
it means that the callstack include the location of the `concurrently` and
continues inside the called functions. For call such as `withAsync + wait`,
the callstack contains the `withAsync` location as well as the location
inside the called function. The exception also contains an additional
annotation, `AsyncWaitLocation` which contains the location of the `wait`
call.

## Changes in 2.2.6

- Added Control.Concurrent.Stream for processing streams with a fixed
Expand Down
Loading