Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions packages/pynumaflow-lite/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,17 @@ test-rust:

clean:
cargo clean

fmt:
cargo fmt --all

.PHONY: lint
lint: test-fmt clippy

.PHONY: test-fmt
test-fmt:
cargo fmt --all --check

.PHONY: clippy
clippy:
cargo clippy --workspace --all-targets --all-features -- -D warnings -A clippy::module_inception
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/src/accumulate/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ pub(super) async fn start(
// if not finished, abort it
if !sig_handle.is_finished() {
println!("Aborting signal handler");
let _ = sig_handle.abort();
sig_handle.abort();
}

result
Expand Down
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/src/batchmap/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub(super) async fn start(
// if not finished, abort it
if !sig_handle.is_finished() {
println!("Aborting signal handler");
let _ = sig_handle.abort();
sig_handle.abort();
}

result
Expand Down
22 changes: 11 additions & 11 deletions packages/pynumaflow-lite/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.batchmapper` as well
let binding = m.getattr("batchmapper")?;
Expand All @@ -123,7 +123,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.mapstreamer` as well
let binding = m.getattr("mapstreamer")?;
Expand All @@ -132,7 +132,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.reducer` as well
let binding = m.getattr("reducer")?;
Expand All @@ -141,7 +141,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.session_reducer` as well
let binding = m.getattr("session_reducer")?;
Expand All @@ -150,7 +150,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.reducestreamer` as well
let binding = m.getattr("reducestreamer")?;
Expand All @@ -159,7 +159,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.accumulator` as well
let binding = m.getattr("accumulator")?;
Expand All @@ -168,7 +168,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.sinker` as well
let binding = m.getattr("sinker")?;
Expand All @@ -177,7 +177,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.sourcer` as well
let binding = m.getattr("sourcer")?;
Expand All @@ -186,7 +186,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.sourcetransformer` as well
let binding = m.getattr("sourcetransformer")?;
Expand All @@ -195,7 +195,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

// Ensure it's importable as `pynumaflow_lite.sideinputer` as well
let binding = m.getattr("sideinputer")?;
Expand All @@ -204,7 +204,7 @@ fn pynumaflow_lite(py: Python, m: &Bound<PyModule>) -> PyResult<()> {
sub.setattr("__name__", fullname)?;
py.import("sys")?
.getattr("modules")?
.set_item(fullname, &sub)?;
.set_item(fullname, sub)?;

Ok(())
}
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/src/map/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub(super) async fn start(
// if not finished, abort it
if !sig_handle.is_finished() {
println!("Aborting signal handler");
let _ = sig_handle.abort();
sig_handle.abort();
}

result
Expand Down
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/src/mapstream/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub(super) async fn start(
// if not finished, abort it
if !sig_handle.is_finished() {
println!("Aborting signal handler");
let _ = sig_handle.abort();
sig_handle.abort();
}

result
Expand Down
4 changes: 3 additions & 1 deletion packages/pynumaflow-lite/src/pyiterables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use pyo3::{PyClass, exceptions::PyStopAsyncIteration, prelude::*};
use tokio::sync::Mutex as AsyncMutex;
use tokio::sync::mpsc;

type PyFuture = Pin<Box<dyn Future<Output = PyResult<Py<PyAny>>> + Send + 'static>>;

/// Stream over a Python AsyncIterator, yielding `M` as soon as each value is produced.
/// `M` must be extractable from the Python object.
///
Expand All @@ -24,7 +26,7 @@ pub struct PyAsyncIterStream<M> {
event_loop: Arc<Py<PyAny>>,
// In-flight future for the next item (converted from Python awaitable).
#[pin]
next_fut: Option<Pin<Box<dyn Future<Output = PyResult<Py<PyAny>>> + Send + 'static>>>,
next_fut: Option<PyFuture>,
// Phantom: we yield M
_marker: std::marker::PhantomData<M>,
}
Expand Down
32 changes: 14 additions & 18 deletions packages/pynumaflow-lite/src/reduce/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,21 +107,19 @@ impl reduce::Reducer for PyReduceRunner {
// Ensure forwarder completes
let _ = forwarder.await;

let messages = Python::attach(|py| {
Python::attach(|py| {
// Expect Messages; also allow a single Message for convenience
if let Ok(msgs) = result.extract::<PyMessages>(py) {
msgs.messages
.into_iter()
.map(|m| reduce::Message::from(m))
.map(reduce::Message::from)
.collect::<Vec<reduce::Message>>()
} else if let Ok(single) = result.extract::<PyMessage>(py) {
vec![single.into()]
} else {
vec![]
}
});

messages
})
}
}

Expand All @@ -144,20 +142,18 @@ pub(super) async fn start(
let obj = py_creator.bind(py);
// Check if it's a function or coroutine function using inspect module
let inspect = py.import("inspect").ok()?;
if let Ok(is_func) = inspect.call_method1("isfunction", (obj,)) {
if let Ok(result) = is_func.extract::<bool>() {
if result {
return Some(true);
}
}
if let Ok(is_func) = inspect.call_method1("isfunction", (obj,))
&& let Ok(result) = is_func.extract::<bool>()
&& result
{
return Some(true);
}
// Also check for coroutine function
if let Ok(is_coro) = inspect.call_method1("iscoroutinefunction", (obj,)) {
if let Ok(result) = is_coro.extract::<bool>() {
if result {
return Some(true);
}
}
if let Ok(is_coro) = inspect.call_method1("iscoroutinefunction", (obj,))
&& let Ok(result) = is_coro.extract::<bool>()
&& result
{
return Some(true);
}
Some(false)
})
Expand Down Expand Up @@ -194,7 +190,7 @@ pub(super) async fn start(
// if not finished, abort it
if !sig_handle.is_finished() {
println!("Aborting signal handler");
let _ = sig_handle.abort();
sig_handle.abort();
}

result
Expand Down
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/src/reducestream/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub(super) async fn start(
// if not finished, abort it
if !sig_handle.is_finished() {
println!("Aborting signal handler");
let _ = sig_handle.abort();
sig_handle.abort();
}

result
Expand Down
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/src/session_reduce/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ pub(super) async fn start(
// if not finished, abort it
if !sig_handle.is_finished() {
println!("Aborting signal handler");
let _ = sig_handle.abort();
sig_handle.abort();
}

result
Expand Down
5 changes: 1 addition & 4 deletions packages/pynumaflow-lite/src/sideinput/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ impl sideinput::SideInputer for PySideInputRunner {

let result = fut.await.unwrap();

let response = Python::attach(|py| {
let response: Response = result.extract(py).unwrap();
response
});
let response: Response = Python::attach(|py| result.extract(py).unwrap());

if response.broadcast {
Some(response.value)
Expand Down
1 change: 1 addition & 0 deletions packages/pynumaflow-lite/src/sink/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,7 @@ pub struct Datum {
}

impl Datum {
#[allow(clippy::too_many_arguments)]
fn new(
keys: Vec<String>,
value: Vec<u8>,
Expand Down
5 changes: 1 addition & 4 deletions packages/pynumaflow-lite/src/sink/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ impl sink::Sinker for PySinkRunner {
// Ensure forwarder completes
let _ = forwarder.await;

let responses = Python::attach(|py| {
let x: crate::sink::Responses = result.extract(py).unwrap();
x
});
let responses: crate::sink::Responses = Python::attach(|py| result.extract(py).unwrap());

responses
.responses
Expand Down
6 changes: 2 additions & 4 deletions packages/pynumaflow-lite/src/source/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,9 @@ impl numaflow::source::Sourcer for PySourceRunner {
let py_handler = self.py_handler.clone();

// Call read_handler(request) -> AsyncIterator[Message]
let result = py_handler
py_handler
.call_method1(py, "read_handler", (read_request,))
.expect("failed to call read_handler");

result
.expect("failed to call read_handler")
});

// Create a stream from the Python AsyncIterator
Expand Down
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/src/sourcetransform/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub(super) async fn start(
// if not finished, abort it
if !sig_handle.is_finished() {
println!("Aborting signal handler");
let _ = sig_handle.abort();
sig_handle.abort();
}

result
Expand Down
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/tests/bin/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// We'll take the first result's value.
let value = r
.results
.get(0)
.first()
.map(|res| res.value.clone())
.unwrap_or_default();
let id = r.id.clone();
Expand Down
2 changes: 1 addition & 1 deletion packages/pynumaflow-lite/tests/bin/mapstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
assert!(maybe.is_some());
let resp = maybe.unwrap();
// Each MapResponse carries results; we take the first
if let Some(first) = resp.results.get(0) {
if let Some(first) = resp.results.first() {
got.push(first.value.clone());
}
}
Expand Down
30 changes: 13 additions & 17 deletions packages/pynumaflow-lite/tests/bin/reducestream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut message_count = 0;
let mut found_eof = false;

loop {
if let Some(r) = resp.message().await? {
if let Some(res) = r.result {
assert!(!res.value.is_empty());
message_count += 1;
println!(
"Received message {}: {:?}",
message_count,
String::from_utf8_lossy(&res.value)
);
}
if r.eof {
found_eof = true;
println!("Received EOF");
break;
}
} else {
while let Some(r) = resp.message().await? {
if let Some(res) = r.result {
assert!(!res.value.is_empty());
message_count += 1;
println!(
"Received message {}: {:?}",
message_count,
String::from_utf8_lossy(&res.value)
);
}
if r.eof {
found_eof = true;
println!("Received EOF");
break;
}
}
Expand Down
8 changes: 4 additions & 4 deletions packages/pynumaflow-lite/tests/bin/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ async fn read_messages(

let mut messages = Vec::new();
while let Some(response) = response_stream.message().await? {
if let Some(status) = response.status {
if status.eot {
break;
}
if let Some(status) = response.status
&& status.eot
{
break;
}
if let Some(result) = response.result {
messages.push(result);
Expand Down
Loading