Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ name = "pynumaflow_lite"
crate-type = ["cdylib", "rlib"]

[dependencies]
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "44ee3068fcf7088ff265df7ae7ce1881a40694ff" }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", rev = "15c46e8289943a639a46a475b7e0d286e928a8b0" }
pyo3 = { version = "0.27.1", features = ["chrono", "experimental-inspect"] }
tokio = "1.47.1"
tonic = "0.14.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def pending_handler(self) -> sourcer.PendingResponse:
"""
return sourcer.PendingResponse(count=0)

async def partitions_handler(self) -> sourcer.PartitionsResponse:
async def active_partitions_handler(self) -> sourcer.PartitionsResponse:
"""
The simple source always returns default partitions.
"""
Expand Down
23 changes: 20 additions & 3 deletions packages/pynumaflow-lite/pynumaflow_lite/_source_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
NackRequest,
PendingResponse,
PartitionsResponse,
TotalPartitionsResponse,
)


Expand All @@ -19,10 +20,11 @@ class Sourcer(metaclass=ABCMeta):
- read_handler: Read messages from the source
- ack_handler: Acknowledge processed messages
- pending_handler: Return the number of pending messages
- partitions_handler: Return the partitions this source handles
- active_partitions_handler: Return the partitions this source handles

Optionally, you can implement:
- nack_handler: Negatively acknowledge messages (default: no-op)
- total_partitions_handler: Return the total number of partitions in the source
"""

def __call__(self, *args, **kwargs):
Expand Down Expand Up @@ -88,9 +90,9 @@ async def pending_handler(self) -> PendingResponse:
pass

@abstractmethod
async def partitions_handler(self) -> PartitionsResponse:
async def active_partitions_handler(self) -> PartitionsResponse:
"""
Return the partitions associated with this source.
Return the active partitions associated with this source.

This is used by the platform to determine the partitions to which
the watermark should be published. If your source doesn't have the
Expand All @@ -105,6 +107,21 @@ async def partitions_handler(self) -> PartitionsResponse:
"""
pass

async def total_partitions_handler(self) -> int | None:
"""
Optional.

Returns the total number of partitions in the source.
Used by the platform for watermark progression to know when all
processors have reported in.

Returns None by default, indicating the source does not report total partitions.

:return:
TotalPartitionsResponse: Response containing the total number of partitions
"""
return None

async def nack_handler(self, request: NackRequest) -> None:
"""
Negatively acknowledge messages (optional).
Expand Down
42 changes: 35 additions & 7 deletions packages/pynumaflow-lite/src/source/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,27 +145,27 @@ impl numaflow::source::Sourcer for PySourceRunner {
}
}

/// Returns the partitions associated with the source. This will be used by the platform to determine
/// Returns the active partitions associated with the source. This will be used by the platform to determine
/// the partitions to which the watermark should be published. Some sources might not have the concept of partitions.
/// Kafka is an example of source where a reader can read from multiple partitions.
/// If None is returned, Numaflow replica-id will be returned as the partition.
async fn partitions(&self) -> Option<Vec<i32>> {
// Call the Python partitions_handler
async fn active_partitions(&self) -> Option<Vec<i32>> {
// Call the Python active_partitions_handler
let fut = Python::attach(|py| {
let py_handler = self.py_handler.clone();
let locals = pyo3_async_runtimes::TaskLocals::new(self.event_loop.bind(py).clone());

let coro = py_handler
.call_method0(py, "partitions_handler")
.expect("failed to call partitions_handler")
.call_method0(py, "active_partitions_handler")
.expect("failed to call active_partitions_handler")
.into_bound(py);

pyo3_async_runtimes::into_future_with_locals(&locals, coro)
.expect("failed to convert partitions_handler to future")
.expect("failed to convert active_partitions_handler to future")
});

// Await the Python coroutine and extract the result
let result = fut.await.expect("partitions_handler failed");
let result = fut.await.expect("active_partitions_handler failed");

let partitions_response = Python::attach(|py| {
result
Expand All @@ -175,6 +175,34 @@ impl numaflow::source::Sourcer for PySourceRunner {

Some(partitions_response.partitions)
}

/// Returns the total number of partitions in the source. This is used by the platform for
/// watermark progression to know when all processors have reported in.
/// If None is returned, the platform will not use total partitions for watermark tracking.
async fn total_partitions(&self) -> Option<i32> {
// Call the Python total_partitions_handler
let fut = Python::attach(|py| {
let py_handler = self.py_handler.clone();
let locals = pyo3_async_runtimes::TaskLocals::new(self.event_loop.bind(py).clone());

let coro = py_handler
.call_method0(py, "total_partitions_handler")
.expect("failed to call total_partitions_handler")
.into_bound(py);

pyo3_async_runtimes::into_future_with_locals(&locals, coro)
.expect("failed to convert total_partitions_handler to future")
});

// Await the Python coroutine and extract the result
let result = fut.await.expect("total_partitions_handler failed");

Python::attach(|py| {
result
.extract::<Option<i32>>(py)
.expect("failed to extract Option<i32> from total_partitions_handler")
})
}
}

/// Start the source server by spinning up a dedicated Python asyncio loop and wiring shutdown.
Expand Down
8 changes: 7 additions & 1 deletion packages/pynumaflow-lite/tests/examples/source_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,18 @@ async def pending_handler(self) -> sourcer.PendingResponse:
"""
return sourcer.PendingResponse(count=0)

async def partitions_handler(self) -> sourcer.PartitionsResponse:
async def active_partitions_handler(self) -> sourcer.PartitionsResponse:
"""
The simple source always returns default partitions.
"""
return sourcer.PartitionsResponse(partitions=[self.partition_idx])

async def total_partitions_handler(self) -> int:
"""
The simple source has only one partition.
"""
return 1


async def start():
sock_file = "/tmp/var/run/numaflow/source.sock"
Expand Down
Loading