diff --git a/src/crawlee/crawlers/_basic/_basic_crawler.py b/src/crawlee/crawlers/_basic/_basic_crawler.py index 2553467124..29805780a5 100644 --- a/src/crawlee/crawlers/_basic/_basic_crawler.py +++ b/src/crawlee/crawlers/_basic/_basic_crawler.py @@ -768,27 +768,36 @@ def sigint_handler() -> None: return final_statistics async def _run_crawler(self) -> None: - event_manager = self._service_locator.get_event_manager() + local_event_manager = self._service_locator.get_event_manager() + global_event_manager = service_locator.get_event_manager() + if local_event_manager is global_event_manager: + local_event_manager = None # Avoid entering the same event manager context twice + + # The event managers are always entered. + contexts_to_enter: list[Any] = ( + [global_event_manager, local_event_manager] if local_event_manager else [global_event_manager] + ) # Collect the context managers to be entered. Context managers that are already active are excluded, # as they were likely entered by the caller, who will also be responsible for exiting them. - contexts_to_enter = [ - cm - for cm in ( - event_manager, - self._snapshotter, - self._statistics, - self._session_pool if self._use_session_pool else None, - self._http_client, - self._crawler_state_rec_task, - *self._additional_context_managers, - ) - if cm and getattr(cm, 'active', False) is False - ] + contexts_to_enter.extend( + [ + cm + for cm in ( + self._snapshotter, + self._statistics, + self._session_pool if self._use_session_pool else None, + self._http_client, + self._crawler_state_rec_task, + *self._additional_context_managers, + ) + if cm and getattr(cm, 'active', False) is False + ] + ) async with AsyncExitStack() as exit_stack: for context in contexts_to_enter: - await exit_stack.enter_async_context(context) # ty: ignore[invalid-argument-type] + await exit_stack.enter_async_context(context) await self._autoscaled_pool.run() diff --git a/src/crawlee/events/_event_manager.py b/src/crawlee/events/_event_manager.py index 8b714255fb..2c629c6d39 100644 --- a/src/crawlee/events/_event_manager.py +++ b/src/crawlee/events/_event_manager.py @@ -93,24 +93,20 @@ def __init__( delay=self._persist_state_interval, ) - # Flag to indicate the context state. - self._active = False + # Reference count for active contexts. + self._active_ref_count = 0 @property def active(self) -> bool: """Indicate whether the context is active.""" - return self._active + return self._active_ref_count > 0 async def __aenter__(self) -> EventManager: - """Initialize the event manager upon entering the async context. + """Initialize the event manager upon entering the async context.""" + self._active_ref_count += 1 + if self._active_ref_count > 1: + return self - Raises: - RuntimeError: If the context manager is already active. - """ - if self._active: - raise RuntimeError(f'The {self.__class__.__name__} is already active.') - - self._active = True self._emit_persist_state_event_rec_task.start() return self @@ -127,9 +123,15 @@ async def __aexit__( Raises: RuntimeError: If the context manager is not active. """ - if not self._active: + if not self.active: raise RuntimeError(f'The {self.__class__.__name__} is not active.') + if self._active_ref_count > 1: + # Emit persist state event to ensure the latest state is saved before closing the context. + await self._emit_persist_state_event() + self._active_ref_count -= 1 + return + # Stop persist state event periodic emission and manually emit last one to ensure latest state is saved. await self._emit_persist_state_event_rec_task.stop() await self._emit_persist_state_event() @@ -137,7 +139,7 @@ async def __aexit__( self._event_emitter.remove_all_listeners() self._listener_tasks.clear() self._listeners_to_wrappers.clear() - self._active = False + self._active_ref_count -= 1 @overload def on(self, *, event: Literal[Event.PERSIST_STATE], listener: EventListener[EventPersistStateData]) -> None: ... diff --git a/tests/unit/crawlers/_basic/test_basic_crawler.py b/tests/unit/crawlers/_basic/test_basic_crawler.py index d65176c5e2..0d4296de47 100644 --- a/tests/unit/crawlers/_basic/test_basic_crawler.py +++ b/tests/unit/crawlers/_basic/test_basic_crawler.py @@ -26,8 +26,7 @@ from crawlee.configuration import Configuration from crawlee.crawlers import BasicCrawler from crawlee.errors import RequestCollisionError, SessionError, UserDefinedErrorHandlerError -from crawlee.events import Event, EventCrawlerStatusData -from crawlee.events._local_event_manager import LocalEventManager +from crawlee.events import Event, EventCrawlerStatusData, LocalEventManager from crawlee.request_loaders import RequestList, RequestManagerTandem from crawlee.sessions import Session, SessionPool from crawlee.statistics import FinalStatistics @@ -2047,3 +2046,104 @@ async def error_handler(context: BasicCrawlingContext, error: Exception) -> Requ assert error_request is not None assert error_request.state == RequestState.DONE assert error_request.was_already_handled + + +@pytest.mark.skipif(sys.version_info[:3] < (3, 11), reason='asyncio.Barrier was introduced in Python 3.11.') +async def test_multiple_crawlers_with_global_event_manager() -> None: + """Test that multiple crawlers work correctly when using the global event manager.""" + + rq1 = await RequestQueue.open(alias='rq1') + rq2 = await RequestQueue.open(alias='rq2') + + crawler_1 = BasicCrawler(request_manager=rq1) + crawler_2 = BasicCrawler(request_manager=rq2) + + started_event = asyncio.Event() + finished_event = asyncio.Event() + + async def launch_crawler_1() -> None: + await crawler_1.run(['https://a.placeholder.com']) + finished_event.set() + + async def launch_crawler_2() -> None: + # Ensure that crawler_1 is already running and has activated event_manager + await started_event.wait() + await crawler_2.run(['https://b.placeholder.com']) + + handler_barrier = asyncio.Barrier(2) # ty:ignore[unresolved-attribute] # Test is skipped in older Python versions. + + handler_call = AsyncMock() + + @crawler_1.router.default_handler + async def handler_1(context: BasicCrawlingContext) -> None: + started_event.set() + # Ensure that both handlers are running at the same time. + await handler_barrier.wait() + event_manager = service_locator.get_event_manager() + + await handler_call(event_manager.active) + + @crawler_2.router.default_handler + async def handler_2(context: BasicCrawlingContext) -> None: + # Ensure that both handlers are running at the same time. + await handler_barrier.wait() + # Ensure that crawler_1 is finished and closed all active contexts. + await finished_event.wait() + # Check that event manager is active and can be used in the second crawler. + event_manager = service_locator.get_event_manager() + + await handler_call(event_manager.active) + + await asyncio.gather( + launch_crawler_1(), + launch_crawler_2(), + ) + + assert handler_call.call_count == 2 + + first_call = handler_call.call_args_list[0] + second_call = handler_call.call_args_list[1] + + assert first_call[0][0] is True + assert second_call[0][0] is True + + event_manager = service_locator.get_event_manager() + + # After both crawlers are finished, event manager should be inactive. + assert event_manager.active is False + + await rq1.drop() + await rq2.drop() + + +async def test_global_and_local_event_manager_in_crawler_run() -> None: + """Test that both global and local event managers are used in crawler run""" + + config = service_locator.get_configuration() + + local_event_manager = LocalEventManager.from_config(config) + + crawler = BasicCrawler(event_manager=local_event_manager) + + handler_call = AsyncMock() + + @crawler.router.default_handler + async def handler(context: BasicCrawlingContext) -> None: + global_event_manager = service_locator.get_event_manager() + handler_call(local_event_manager.active, global_event_manager.active) + + await crawler.run(['https://a.placeholder.com']) + + assert handler_call.call_count == 1 + + local_em_state, global_em_state = handler_call.call_args_list[0][0] + + # Both event managers should be active. + assert local_em_state is True + assert global_em_state is True + + global_event_manager = service_locator.get_event_manager() + + # After crawler is finished, both event managers should be inactive. + assert local_event_manager.active is False + assert global_event_manager.active is False diff --git a/tests/unit/events/test_event_manager.py b/tests/unit/events/test_event_manager.py index 4654efaf64..db26f9a18e 100644 --- a/tests/unit/events/test_event_manager.py +++ b/tests/unit/events/test_event_manager.py @@ -199,10 +199,6 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event with pytest.raises(RuntimeError, match=r'EventManager is not active.'): await event_manager.wait_for_all_listeners_to_complete() - with pytest.raises(RuntimeError, match=r'EventManager is already active.'): - async with event_manager, event_manager: - pass - async with event_manager: event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_system_info_data) await event_manager.wait_for_all_listeners_to_complete()