Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/crawlee/crawlers/_basic/_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
UserDefinedErrorHandlerError,
UserHandlerTimeoutError,
)
from crawlee.events import EventManager
from crawlee.events._types import Event, EventCrawlerStatusData
from crawlee.http_clients import ImpitHttpClient
from crawlee.router import Router
Expand Down Expand Up @@ -90,7 +91,6 @@
PushDataKwargs,
)
from crawlee.configuration import Configuration
from crawlee.events import EventManager
from crawlee.http_clients import HttpClient, HttpResponse
from crawlee.proxy_configuration import ProxyConfiguration, ProxyInfo
from crawlee.request_loaders import RequestManager
Expand Down Expand Up @@ -768,22 +768,26 @@ def sigint_handler() -> None:
return final_statistics

async def _run_crawler(self) -> None:
event_manager = self._service_locator.get_event_manager()
local_event_manager = self._service_locator.get_event_manager()
global_event_manager = service_locator.get_event_manager()
if local_event_manager is global_event_manager:
local_event_manager = None # Avoid entering the same event manager context twice

# Collect the context managers to be entered. Context managers that are already active are excluded,
# as they were likely entered by the caller, who will also be responsible for exiting them.
contexts_to_enter = [
cm
for cm in (
event_manager,
global_event_manager,
local_event_manager,
self._snapshotter,
self._statistics,
self._session_pool if self._use_session_pool else None,
self._http_client,
self._crawler_state_rec_task,
*self._additional_context_managers,
)
if cm and getattr(cm, 'active', False) is False
if cm and (isinstance(cm, EventManager) or not getattr(cm, 'active', False))
Copy link
Collaborator

@Pijukatel Pijukatel Mar 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more readable to just do it on a standalone line. Something like
contexts_to_enter.append([local_event_manager, global_event_manager])

]

async with AsyncExitStack() as exit_stack:
Expand Down
22 changes: 15 additions & 7 deletions src/crawlee/events/_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def __init__(
delay=self._persist_state_interval,
)

# Reference count for active contexts.
self._ref_count = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe self._active_ref_count


# Flag to indicate the context state.
self._active = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need self._active anymore?

    @property
    def active(self) -> bool:
        """Indicate whether the context is active."""
        return self._ref_count>0


Expand All @@ -102,13 +105,11 @@ def active(self) -> bool:
return self._active

async def __aenter__(self) -> EventManager:
"""Initialize the event manager upon entering the async context.
"""Initialize the event manager upon entering the async context."""
self._ref_count += 1

Raises:
RuntimeError: If the context manager is already active.
"""
if self._active:
raise RuntimeError(f'The {self.__class__.__name__} is already active.')
if self._ref_count > 1:
return self

self._active = True
self._emit_persist_state_event_rec_task.start()
Expand All @@ -130,9 +131,16 @@ async def __aexit__(
if not self._active:
raise RuntimeError(f'The {self.__class__.__name__} is not active.')

self._ref_count -= 1

# Emit persist state event to ensure the latest state is saved before closing the context.
await self._emit_persist_state_event()

if self._ref_count > 0:
return

# Stop persist state event periodic emission and manually emit last one to ensure latest state is saved.
await self._emit_persist_state_event_rec_task.stop()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having

self._emit_persist_state_event_rec_task.stop()
await self._emit_persist_state_event()

reduced the risk of a possible scenario of two persist state events very close to each other, I would prefer to keep it this way and just add

 if self._ref_count > 0:
            await self._emit_persist_state_event()
            return

await self._emit_persist_state_event()
await self.wait_for_all_listeners_to_complete(timeout=self._close_timeout)
self._event_emitter.remove_all_listeners()
self._listener_tasks.clear()
Expand Down
107 changes: 105 additions & 2 deletions tests/unit/crawlers/_basic/test_basic_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@
from crawlee.configuration import Configuration
from crawlee.crawlers import BasicCrawler
from crawlee.errors import RequestCollisionError, SessionError, UserDefinedErrorHandlerError
from crawlee.events import Event, EventCrawlerStatusData
from crawlee.events._local_event_manager import LocalEventManager
from crawlee.events import Event, EventCrawlerStatusData, LocalEventManager
from crawlee.request_loaders import RequestList, RequestManagerTandem
from crawlee.sessions import Session, SessionPool
from crawlee.statistics import FinalStatistics
Expand Down Expand Up @@ -2047,3 +2046,107 @@ async def error_handler(context: BasicCrawlingContext, error: Exception) -> Requ
assert error_request is not None
assert error_request.state == RequestState.DONE
assert error_request.was_already_handled


@pytest.mark.skipif(sys.version_info[:3] < (3, 11), reason='asyncio.Barrier was introduced in Python 3.11.')
async def test_multiple_crawlers_with_global_event_manager() -> None:
"""Test that multiple crawlers work correctly when using the global event manager."""

# Test is skipped in older Python versions.
from asyncio import Barrier # type:ignore[attr-defined] # noqa: PLC0415

rq1 = await RequestQueue.open(alias='rq1')
rq2 = await RequestQueue.open(alias='rq2')

crawler_1 = BasicCrawler(request_manager=rq1)
crawler_2 = BasicCrawler(request_manager=rq2)

started_event = asyncio.Event()
finished_event = asyncio.Event()

async def launch_crawler_1() -> None:
await crawler_1.run(['https://a.placeholder.com'])
finished_event.set()

async def launch_crawler_2() -> None:
# Ensure that crawler_1 is already running and has activated event_manager
await started_event.wait()
await crawler_2.run(['https://b.placeholder.com'])

handler_barrier = Barrier(2)

handler_call = AsyncMock()

@crawler_1.router.default_handler
async def handler_1(context: BasicCrawlingContext) -> None:
started_event.set()
# Ensure that both handlers are running at the same time.
await handler_barrier.wait()
event_manager = service_locator.get_event_manager()

await handler_call(event_manager.active)

@crawler_2.router.default_handler
async def handler_2(context: BasicCrawlingContext) -> None:
# Ensure that both handlers are running at the same time.
await handler_barrier.wait()
# Ensure that crawler_1 is finished and closed all active contexts.
await finished_event.wait()
# Check that event manager is active and can be used in the second crawler.
event_manager = service_locator.get_event_manager()

await handler_call(event_manager.active)

await asyncio.gather(
launch_crawler_1(),
launch_crawler_2(),
)

assert handler_call.call_count == 2

first_call = handler_call.call_args_list[0]
second_call = handler_call.call_args_list[1]

assert first_call[0][0] is True
assert second_call[0][0] is True

event_manager = service_locator.get_event_manager()

# After both crawlers are finished, event manager should be inactive.
assert event_manager.active is False

await rq1.drop()
await rq2.drop()


async def test_global_and_local_event_manager_in_crawler_run() -> None:
"""Test that both global and local event managers are used in crawler run"""

config = service_locator.get_configuration()

local_event_manager = LocalEventManager.from_config(config)

crawler = BasicCrawler(event_manager=local_event_manager)

handler_call = AsyncMock()

@crawler.router.default_handler
async def handler(context: BasicCrawlingContext) -> None:
global_event_manager = service_locator.get_event_manager()
handler_call(local_event_manager.active, global_event_manager.active)

await crawler.run(['https://a.placeholder.com'])

assert handler_call.call_count == 1

local_em_state, global_em_state = handler_call.call_args_list[0][0]

# Both event managers should be active.
assert local_em_state is True
assert global_em_state is True

global_event_manager = service_locator.get_event_manager()

# After crawler is finished, both event managers should be inactive.
assert local_event_manager.active is False
assert global_event_manager.active is False
4 changes: 0 additions & 4 deletions tests/unit/events/test_event_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,6 @@ async def test_methods_raise_error_when_not_active(event_system_info_data: Event
with pytest.raises(RuntimeError, match=r'EventManager is not active.'):
await event_manager.wait_for_all_listeners_to_complete()

with pytest.raises(RuntimeError, match=r'EventManager is already active.'):
async with event_manager, event_manager:
pass

async with event_manager:
event_manager.emit(event=Event.SYSTEM_INFO, event_data=event_system_info_data)
await event_manager.wait_for_all_listeners_to_complete()
Expand Down
Loading