From 0ca08950ac98cf16397f1e1153e7b7254ee6d67b Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Tue, 24 Mar 2026 19:34:36 +0000 Subject: [PATCH 1/4] fix global `event_manager` for correctly support multiple crawlers --- src/crawlee/crawlers/_basic/_basic_crawler.py | 4 +- src/crawlee/events/_event_manager.py | 18 +++-- .../crawlers/_basic/test_basic_crawler.py | 71 +++++++++++++++++++ tests/unit/events/test_event_manager.py | 4 -- 4 files changed, 85 insertions(+), 12 deletions(-) diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 2553467124..b3cebf189e 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -61,6 +61,7 @@ UserDefinedErrorHandlerError, UserHandlerTimeoutError, ) +from crawlee.events import EventManager from crawlee.events._types import Event, EventCrawlerStatusData from crawlee.http_clients import ImpitHttpClient from crawlee.router import Router @@ -90,7 +91,6 @@ PushDataKwargs, ) from crawlee.configuration import Configuration - from crawlee.events import EventManager from crawlee.http_clients import HttpClient, HttpResponse from crawlee.proxy_configuration import ProxyConfiguration, ProxyInfo from crawlee.request_loaders import RequestManager @@ -783,7 +783,7 @@ async def _run_crawler(self) -> None: self._crawler_state_rec_task, *self._additional_context_managers, ) - if cm and getattr(cm, 'active', False) is False + if cm and (isinstance(cm, EventManager) or not getattr(cm, 'active', False)) ] async with AsyncExitStack() as exit_stack: diff --git a/src/crawlee/events/_event_manager.py b/src/crawlee/events/_event_manager.py index 8b714255fb..200a8c9caf 100644 --- a/src/crawlee/events/_event_manager.py +++ b/src/crawlee/events/_event_manager.py @@ -93,6 +93,9 @@ def __init__( delay=self._persist_state_interval, ) + # Reference count for active contexts. + self._ref_count = 0 + # Flag to indicate the context state. self._active = False @@ -102,13 +105,11 @@ def active(self) -> bool: return self._active async def __aenter__(self) -> EventManager: - """Initialize the event manager upon entering the async context. + """Initialize the event manager upon entering the async context.""" + self._ref_count += 1 - Raises: - RuntimeError: If the context manager is already active. - """ - if self._active: - raise RuntimeError(f'The {self.__class__.__name__} is already active.') + if self._ref_count > 1: + return self self._active = True self._emit_persist_state_event_rec_task.start() @@ -130,6 +131,11 @@ async def __aexit__( if not self._active: raise RuntimeError(f'The {self.__class__.__name__} is not active.') + self._ref_count -= 1 + + if self._ref_count > 0: + return + # Stop persist state event periodic emission and manually emit last one to ensure latest state is saved. await self._emit_persist_state_event_rec_task.stop() await self._emit_persist_state_event() diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index c0f7b808e8..94bb1b65cd 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -2047,3 +2047,74 @@ async def error_handler(context: BasicCrawlingContext, error: Exception) -> Requ assert error_request is not None assert error_request.state == RequestState.DONE assert error_request.was_already_handled + + +@pytest.mark.skipif(sys.version_info[:3] < (3, 11), reason='asyncio.Barrier was introduced in Python 3.11.') +async def test_multiple_crawlers_with_global_event_manager() -> None: + """Test that multiple crawlers work correctly when using the global event manager.""" + + # Test is skipped in older Python versions. + from asyncio import Barrier # type:ignore[attr-defined] # noqa: PLC0415 + + rq1 = await RequestQueue.open(alias='rq1') + rq2 = await RequestQueue.open(alias='rq2') + + crawler_1 = BasicCrawler(request_manager=rq1) + crawler_2 = BasicCrawler(request_manager=rq2) + + started_event = asyncio.Event() + finished_event = asyncio.Event() + + async def launch_crawler_1() -> None: + await crawler_1.run(['https://a.placeholder.com']) + finished_event.set() + + async def launch_crawler_2() -> None: + # Ensure that crawler_1 is already running and has activated event_manager + await started_event.wait() + await crawler_2.run(['https://b.placeholder.com']) + + handler_barrier = Barrier(2) + + handler_call = AsyncMock() + + @crawler_1.router.default_handler + async def handler_1(context: BasicCrawlingContext) -> None: + started_event.set() + # Ensure that both handlers are running at the same time. + await handler_barrier.wait() + event_manager = service_locator.get_event_manager() + + await handler_call(event_manager.active) + + @crawler_2.router.default_handler + async def handler_2(context: BasicCrawlingContext) -> None: + # Ensure that both handlers are running at the same time. + await handler_barrier.wait() + # Ensure that crawler_1 is finished and closed all active contexts. + await finished_event.wait() + # Check that event manager is active and can be used in the second crawler. + event_manager = service_locator.get_event_manager() + + await handler_call(event_manager.active) + + await asyncio.gather( + launch_crawler_1(), + launch_crawler_2(), + ) + + assert handler_call.call_count == 2 + + first_call = handler_call.call_args_list[0] + second_call = handler_call.call_args_list[1] + + assert first_call[0][0] is True + assert second_call[0][0] is True + + event_manager = service_locator.get_event_manager() + + # After both crawlers are finished, event manager should be inactive. + assert event_manager.active is False + + await rq1.drop() + await rq2.drop() diff --git a/tests/unit/events/test_event_manager.py b/tests/unit/events/test_event_manager.py index 4654efaf64..db26f9a18e 100644 --- a/tests/unit/events/test_event_manager.py +++ b/tests/unit/events/test_event_manager.py @@ -199,10 +199,6 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event with pytest.raises(RuntimeError, match=r'EventManager is not active.'): await event_manager.wait_for_all_listeners_to_complete() - with pytest.raises(RuntimeError, match=r'EventManager is already active.'): - async with event_manager, event_manager: - pass - async with event_manager: event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_system_info_data) await event_manager.wait_for_all_listeners_to_complete() From c2a229077111e35ab422c65418ee76e005180735 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 25 Mar 2026 07:48:54 +0000 Subject: [PATCH 2/4] launch global event_manager with local in crawler --- src/crawlee/crawlers/_basic/_basic_crawler.py | 8 +++-- .../crawlers/_basic/test_basic_crawler.py | 36 +++++++++++++++++-- 2 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index b3cebf189e..29849f34fc 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -768,14 +768,18 @@ def sigint_handler() -> None: return final_statistics async def _run_crawler(self) -> None: - event_manager = self._service_locator.get_event_manager() + local_event_manager = self._service_locator.get_event_manager() + global_event_manager = service_locator.get_event_manager() + if local_event_manager is global_event_manager: + local_event_manager = None # Avoid entering the same event manager context twice # Collect the context managers to be entered. Context managers that are already active are excluded, # as they were likely entered by the caller, who will also be responsible for exiting them. contexts_to_enter = [ cm for cm in ( - event_manager, + global_event_manager, + local_event_manager, self._snapshotter, self._statistics, self._session_pool if self._use_session_pool else None, diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index 94bb1b65cd..ab90be969e 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -26,8 +26,7 @@ from crawlee.configuration import Configuration from crawlee.crawlers import BasicCrawler from crawlee.errors import RequestCollisionError, SessionError, UserDefinedErrorHandlerError -from crawlee.events import Event, EventCrawlerStatusData -from crawlee.events._local_event_manager import LocalEventManager +from crawlee.events import Event, EventCrawlerStatusData, LocalEventManager from crawlee.request_loaders import RequestList, RequestManagerTandem from crawlee.sessions import Session, SessionPool from crawlee.statistics import FinalStatistics @@ -2118,3 +2117,36 @@ async def handler_2(context: BasicCrawlingContext) -> None: await rq1.drop() await rq2.drop() + + +async def test_globa_and_local_event_manager_in_crawler_run() -> None: + """Test that both global and local event managers are used in crawler run""" + + config = service_locator.get_configuration() + + local_event_manager = LocalEventManager.from_config(config) + + crawler = BasicCrawler(event_manager=local_event_manager) + + handler_call = AsyncMock() + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + global_event_manager = service_locator.get_event_manager() + handler_call(local_event_manager.active, global_event_manager.active) + + await crawler.run(['https://a.placeholder.com']) + + assert handler_call.call_count == 1 + + local_em_state, global_em_state = handler_call.call_args_list[0][0] + + # Both event managers should be active. + assert local_em_state is True + assert global_em_state is True + + global_event_manager = service_locator.get_event_manager() + + # After crawler is finished, both event managers should be inactive. + assert local_event_manager.active is False + assert global_event_manager.active is False From 6e157994425786f42a79b925675ce3fa57ee434e Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 25 Mar 2026 07:52:19 +0000 Subject: [PATCH 3/4] fix typo --- tests/unit/crawlers/_basic/test_basic_crawler.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index ab90be969e..29f3a169fe 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -2119,7 +2119,7 @@ async def handler_2(context: BasicCrawlingContext) -> None: await rq2.drop() -async def test_globa_and_local_event_manager_in_crawler_run() -> None: +async def test_global_and_local_event_manager_in_crawler_run() -> None: """Test that both global and local event managers are used in crawler run""" config = service_locator.get_configuration() From 4bf288fcd4824d96a9c48159075192c3971dea19 Mon Sep 17 00:00:00 2001 From: Max Bohomolov Date: Wed, 25 Mar 2026 08:07:30 +0000 Subject: [PATCH 4/4] fix --- src/crawlee/events/_event_manager.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/crawlee/events/_event_manager.py b/src/crawlee/events/_event_manager.py index 200a8c9caf..3ac19e2930 100644 --- a/src/crawlee/events/_event_manager.py +++ b/src/crawlee/events/_event_manager.py @@ -133,12 +133,14 @@ async def __aexit__( self._ref_count -= 1 + # Emit persist state event to ensure the latest state is saved before closing the context. + await self._emit_persist_state_event() + if self._ref_count > 0: return # Stop persist state event periodic emission and manually emit last one to ensure latest state is saved. await self._emit_persist_state_event_rec_task.stop() - await self._emit_persist_state_event() await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout) self._event_emitter.remove_all_listeners() self._listener_tasks.clear()