From f7ccd6b2928b1e5cfa0f4bf02371c91728f2a3b0 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Sat, 28 Mar 2026 00:07:18 -0400 Subject: [PATCH 1/2] chore: remove unused SyncSource classes Signed-off-by: Vaibhav Tiwari --- packages/pynumaflow/tests/source/utils.py | 41 +---------------------- 1 file changed, 1 insertion(+), 40 deletions(-) diff --git a/packages/pynumaflow/tests/source/utils.py b/packages/pynumaflow/tests/source/utils.py index 04daaa61..9733745d 100644 --- a/packages/pynumaflow/tests/source/utils.py +++ b/packages/pynumaflow/tests/source/utils.py @@ -56,28 +56,6 @@ async def partitions_handler(self) -> PartitionsResponse: return PartitionsResponse(partitions=mock_partitions()) -class SyncSource(Sourcer): - def read_handler(self, datum: ReadRequest) -> Iterable[Message]: - payload = b"payload:test_mock_message" - keys = ["test_key"] - offset = mock_offset() - event_time = mock_event_time() - for i in range(10): - yield Message(payload=payload, keys=keys, offset=offset, event_time=event_time) - - def ack_handler(self, ack_request: AckRequest): - return - - def nack_handler(self, nack_request: NackRequest): - return - - def pending_handler(self) -> PendingResponse: - return PendingResponse(count=10) - - def partitions_handler(self) -> PartitionsResponse: - return PartitionsResponse(partitions=mock_partitions()) - - def read_req_source_fn() -> ReadRequest: request = source_pb2.ReadRequest.Request( num_records=10, @@ -127,21 +105,4 @@ async def pending_handler(self) -> PendingResponse: raise RuntimeError("Got a runtime error from pending handler.") async def partitions_handler(self) -> PartitionsResponse: - raise RuntimeError("Got a runtime error from partition handler.") - - -class SyncSourceError(Sourcer): - def read_handler(self, datum: ReadRequest) -> Iterable[Message]: - raise RuntimeError("Got a runtime error from read handler.") - - def ack_handler(self, ack_request: AckRequest): - raise RuntimeError("Got a runtime error from ack handler.") - - def nack_handler(self, nack_request: NackRequest): - raise RuntimeError("Got a runtime error from nack handler.") - - def pending_handler(self) -> PendingResponse: - raise RuntimeError("Got a runtime error from pending handler.") - - def partitions_handler(self) -> PartitionsResponse: - raise RuntimeError("Got a runtime error from partition handler.") + raise RuntimeError("Got a runtime error from partition handler.") \ No newline at end of file From bd70217aed021e43f5772c5ab1fb636c752ae896 Mon Sep 17 00:00:00 2001 From: Vaibhav Tiwari Date: Sat, 28 Mar 2026 00:12:00 -0400 Subject: [PATCH 2/2] chore: fix linting Signed-off-by: Vaibhav Tiwari --- packages/pynumaflow/tests/source/utils.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/pynumaflow/tests/source/utils.py b/packages/pynumaflow/tests/source/utils.py index 9733745d..0c1c3de7 100644 --- a/packages/pynumaflow/tests/source/utils.py +++ b/packages/pynumaflow/tests/source/utils.py @@ -1,5 +1,3 @@ -from collections.abc import Iterable - from pynumaflow.shared.asynciter import NonBlockingIterator from pynumaflow.sourcer import ReadRequest, Message, UserMetadata from pynumaflow.sourcer import ( @@ -105,4 +103,4 @@ async def pending_handler(self) -> PendingResponse: raise RuntimeError("Got a runtime error from pending handler.") async def partitions_handler(self) -> PartitionsResponse: - raise RuntimeError("Got a runtime error from partition handler.") \ No newline at end of file + raise RuntimeError("Got a runtime error from partition handler.")