diff --git a/packages/pynumaflow-lite/Makefile b/packages/pynumaflow-lite/Makefile index ae7d02a0..8f0f8c58 100644 --- a/packages/pynumaflow-lite/Makefile +++ b/packages/pynumaflow-lite/Makefile @@ -42,3 +42,17 @@ test-rust: clean: cargo clean + +fmt: + cargo fmt --all + +.PHONY: lint +lint: test-fmt clippy + +.PHONY: test-fmt +test-fmt: + cargo fmt --all --check + +.PHONY: clippy +clippy: + cargo clippy --workspace --all-targets --all-features -- -D warnings -A clippy::module_inception \ No newline at end of file diff --git a/packages/pynumaflow-lite/src/accumulate/server.rs b/packages/pynumaflow-lite/src/accumulate/server.rs index 7a39054c..ecd8e13d 100644 --- a/packages/pynumaflow-lite/src/accumulate/server.rs +++ b/packages/pynumaflow-lite/src/accumulate/server.rs @@ -155,7 +155,7 @@ pub(super) async fn start( // if not finished, abort it if !sig_handle.is_finished() { println!("Aborting signal handler"); - let _ = sig_handle.abort(); + sig_handle.abort(); } result diff --git a/packages/pynumaflow-lite/src/batchmap/server.rs b/packages/pynumaflow-lite/src/batchmap/server.rs index 65a5d879..25c75210 100644 --- a/packages/pynumaflow-lite/src/batchmap/server.rs +++ b/packages/pynumaflow-lite/src/batchmap/server.rs @@ -99,7 +99,7 @@ pub(super) async fn start( // if not finished, abort it if !sig_handle.is_finished() { println!("Aborting signal handler"); - let _ = sig_handle.abort(); + sig_handle.abort(); } result diff --git a/packages/pynumaflow-lite/src/lib.rs b/packages/pynumaflow-lite/src/lib.rs index 0f5a7c2d..1e25b912 100644 --- a/packages/pynumaflow-lite/src/lib.rs +++ b/packages/pynumaflow-lite/src/lib.rs @@ -114,7 +114,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.batchmapper` as well let binding = m.getattr("batchmapper")?; @@ -123,7 +123,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.mapstreamer` as well let binding = m.getattr("mapstreamer")?; @@ -132,7 +132,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.reducer` as well let binding = m.getattr("reducer")?; @@ -141,7 +141,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.session_reducer` as well let binding = m.getattr("session_reducer")?; @@ -150,7 +150,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.reducestreamer` as well let binding = m.getattr("reducestreamer")?; @@ -159,7 +159,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.accumulator` as well let binding = m.getattr("accumulator")?; @@ -168,7 +168,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.sinker` as well let binding = m.getattr("sinker")?; @@ -177,7 +177,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.sourcer` as well let binding = m.getattr("sourcer")?; @@ -186,7 +186,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.sourcetransformer` as well let binding = m.getattr("sourcetransformer")?; @@ -195,7 +195,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; // Ensure it's importable as `pynumaflow_lite.sideinputer` as well let binding = m.getattr("sideinputer")?; @@ -204,7 +204,7 @@ fn pynumaflow_lite(py: Python, m: &Bound) -> PyResult<()> { sub.setattr("__name__", fullname)?; py.import("sys")? .getattr("modules")? - .set_item(fullname, &sub)?; + .set_item(fullname, sub)?; Ok(()) } diff --git a/packages/pynumaflow-lite/src/map/server.rs b/packages/pynumaflow-lite/src/map/server.rs index 8817e6ff..1e220994 100644 --- a/packages/pynumaflow-lite/src/map/server.rs +++ b/packages/pynumaflow-lite/src/map/server.rs @@ -80,7 +80,7 @@ pub(super) async fn start( // if not finished, abort it if !sig_handle.is_finished() { println!("Aborting signal handler"); - let _ = sig_handle.abort(); + sig_handle.abort(); } result diff --git a/packages/pynumaflow-lite/src/mapstream/server.rs b/packages/pynumaflow-lite/src/mapstream/server.rs index a90eda27..7dbcaee1 100644 --- a/packages/pynumaflow-lite/src/mapstream/server.rs +++ b/packages/pynumaflow-lite/src/mapstream/server.rs @@ -95,7 +95,7 @@ pub(super) async fn start( // if not finished, abort it if !sig_handle.is_finished() { println!("Aborting signal handler"); - let _ = sig_handle.abort(); + sig_handle.abort(); } result diff --git a/packages/pynumaflow-lite/src/pyiterables.rs b/packages/pynumaflow-lite/src/pyiterables.rs index fc4e33c5..d85721d0 100644 --- a/packages/pynumaflow-lite/src/pyiterables.rs +++ b/packages/pynumaflow-lite/src/pyiterables.rs @@ -11,6 +11,8 @@ use pyo3::{PyClass, exceptions::PyStopAsyncIteration, prelude::*}; use tokio::sync::Mutex as AsyncMutex; use tokio::sync::mpsc; +type PyFuture = Pin>> + Send + 'static>>; + /// Stream over a Python AsyncIterator, yielding `M` as soon as each value is produced. /// `M` must be extractable from the Python object. /// @@ -24,7 +26,7 @@ pub struct PyAsyncIterStream { event_loop: Arc>, // In-flight future for the next item (converted from Python awaitable). #[pin] - next_fut: Option>> + Send + 'static>>>, + next_fut: Option, // Phantom: we yield M _marker: std::marker::PhantomData, } diff --git a/packages/pynumaflow-lite/src/reduce/server.rs b/packages/pynumaflow-lite/src/reduce/server.rs index dc19de02..68d35762 100644 --- a/packages/pynumaflow-lite/src/reduce/server.rs +++ b/packages/pynumaflow-lite/src/reduce/server.rs @@ -107,21 +107,19 @@ impl reduce::Reducer for PyReduceRunner { // Ensure forwarder completes let _ = forwarder.await; - let messages = Python::attach(|py| { + Python::attach(|py| { // Expect Messages; also allow a single Message for convenience if let Ok(msgs) = result.extract::(py) { msgs.messages .into_iter() - .map(|m| reduce::Message::from(m)) + .map(reduce::Message::from) .collect::>() } else if let Ok(single) = result.extract::(py) { vec![single.into()] } else { vec![] } - }); - - messages + }) } } @@ -144,20 +142,18 @@ pub(super) async fn start( let obj = py_creator.bind(py); // Check if it's a function or coroutine function using inspect module let inspect = py.import("inspect").ok()?; - if let Ok(is_func) = inspect.call_method1("isfunction", (obj,)) { - if let Ok(result) = is_func.extract::() { - if result { - return Some(true); - } - } + if let Ok(is_func) = inspect.call_method1("isfunction", (obj,)) + && let Ok(result) = is_func.extract::() + && result + { + return Some(true); } // Also check for coroutine function - if let Ok(is_coro) = inspect.call_method1("iscoroutinefunction", (obj,)) { - if let Ok(result) = is_coro.extract::() { - if result { - return Some(true); - } - } + if let Ok(is_coro) = inspect.call_method1("iscoroutinefunction", (obj,)) + && let Ok(result) = is_coro.extract::() + && result + { + return Some(true); } Some(false) }) @@ -194,7 +190,7 @@ pub(super) async fn start( // if not finished, abort it if !sig_handle.is_finished() { println!("Aborting signal handler"); - let _ = sig_handle.abort(); + sig_handle.abort(); } result diff --git a/packages/pynumaflow-lite/src/reducestream/server.rs b/packages/pynumaflow-lite/src/reducestream/server.rs index 9e406179..3d11b23f 100644 --- a/packages/pynumaflow-lite/src/reducestream/server.rs +++ b/packages/pynumaflow-lite/src/reducestream/server.rs @@ -164,7 +164,7 @@ pub(super) async fn start( // if not finished, abort it if !sig_handle.is_finished() { println!("Aborting signal handler"); - let _ = sig_handle.abort(); + sig_handle.abort(); } result diff --git a/packages/pynumaflow-lite/src/session_reduce/server.rs b/packages/pynumaflow-lite/src/session_reduce/server.rs index 51438eae..e6e3266f 100644 --- a/packages/pynumaflow-lite/src/session_reduce/server.rs +++ b/packages/pynumaflow-lite/src/session_reduce/server.rs @@ -198,7 +198,7 @@ pub(super) async fn start( // if not finished, abort it if !sig_handle.is_finished() { println!("Aborting signal handler"); - let _ = sig_handle.abort(); + sig_handle.abort(); } result diff --git a/packages/pynumaflow-lite/src/sideinput/server.rs b/packages/pynumaflow-lite/src/sideinput/server.rs index 7d6f3cfb..ca130bbf 100644 --- a/packages/pynumaflow-lite/src/sideinput/server.rs +++ b/packages/pynumaflow-lite/src/sideinput/server.rs @@ -29,10 +29,7 @@ impl sideinput::SideInputer for PySideInputRunner { let result = fut.await.unwrap(); - let response = Python::attach(|py| { - let response: Response = result.extract(py).unwrap(); - response - }); + let response: Response = Python::attach(|py| result.extract(py).unwrap()); if response.broadcast { Some(response.value) diff --git a/packages/pynumaflow-lite/src/sink/mod.rs b/packages/pynumaflow-lite/src/sink/mod.rs index 6ccf5fb0..5ee344cb 100644 --- a/packages/pynumaflow-lite/src/sink/mod.rs +++ b/packages/pynumaflow-lite/src/sink/mod.rs @@ -383,6 +383,7 @@ pub struct Datum { } impl Datum { + #[allow(clippy::too_many_arguments)] fn new( keys: Vec, value: Vec, diff --git a/packages/pynumaflow-lite/src/sink/server.rs b/packages/pynumaflow-lite/src/sink/server.rs index d971d514..34466d2d 100644 --- a/packages/pynumaflow-lite/src/sink/server.rs +++ b/packages/pynumaflow-lite/src/sink/server.rs @@ -43,10 +43,7 @@ impl sink::Sinker for PySinkRunner { // Ensure forwarder completes let _ = forwarder.await; - let responses = Python::attach(|py| { - let x: crate::sink::Responses = result.extract(py).unwrap(); - x - }); + let responses: crate::sink::Responses = Python::attach(|py| result.extract(py).unwrap()); responses .responses diff --git a/packages/pynumaflow-lite/src/source/server.rs b/packages/pynumaflow-lite/src/source/server.rs index f47540e9..103105c3 100644 --- a/packages/pynumaflow-lite/src/source/server.rs +++ b/packages/pynumaflow-lite/src/source/server.rs @@ -25,11 +25,9 @@ impl numaflow::source::Sourcer for PySourceRunner { let py_handler = self.py_handler.clone(); // Call read_handler(request) -> AsyncIterator[Message] - let result = py_handler + py_handler .call_method1(py, "read_handler", (read_request,)) - .expect("failed to call read_handler"); - - result + .expect("failed to call read_handler") }); // Create a stream from the Python AsyncIterator diff --git a/packages/pynumaflow-lite/src/sourcetransform/server.rs b/packages/pynumaflow-lite/src/sourcetransform/server.rs index c554074b..31ee6b7e 100644 --- a/packages/pynumaflow-lite/src/sourcetransform/server.rs +++ b/packages/pynumaflow-lite/src/sourcetransform/server.rs @@ -83,7 +83,7 @@ pub(super) async fn start( // if not finished, abort it if !sig_handle.is_finished() { println!("Aborting signal handler"); - let _ = sig_handle.abort(); + sig_handle.abort(); } result diff --git a/packages/pynumaflow-lite/tests/bin/batchmap.rs b/packages/pynumaflow-lite/tests/bin/batchmap.rs index 88fb697e..5eeb1ac4 100644 --- a/packages/pynumaflow-lite/tests/bin/batchmap.rs +++ b/packages/pynumaflow-lite/tests/bin/batchmap.rs @@ -102,7 +102,7 @@ async fn main() -> Result<(), Box> { // We'll take the first result's value. let value = r .results - .get(0) + .first() .map(|res| res.value.clone()) .unwrap_or_default(); let id = r.id.clone(); diff --git a/packages/pynumaflow-lite/tests/bin/mapstream.rs b/packages/pynumaflow-lite/tests/bin/mapstream.rs index 0d0ea9ed..5929b8c8 100644 --- a/packages/pynumaflow-lite/tests/bin/mapstream.rs +++ b/packages/pynumaflow-lite/tests/bin/mapstream.rs @@ -75,7 +75,7 @@ async fn main() -> Result<(), Box> { assert!(maybe.is_some()); let resp = maybe.unwrap(); // Each MapResponse carries results; we take the first - if let Some(first) = resp.results.get(0) { + if let Some(first) = resp.results.first() { got.push(first.value.clone()); } } diff --git a/packages/pynumaflow-lite/tests/bin/reducestream.rs b/packages/pynumaflow-lite/tests/bin/reducestream.rs index b8d606db..8868a123 100644 --- a/packages/pynumaflow-lite/tests/bin/reducestream.rs +++ b/packages/pynumaflow-lite/tests/bin/reducestream.rs @@ -118,23 +118,19 @@ async fn main() -> Result<(), Box> { let mut message_count = 0; let mut found_eof = false; - loop { - if let Some(r) = resp.message().await? { - if let Some(res) = r.result { - assert!(!res.value.is_empty()); - message_count += 1; - println!( - "Received message {}: {:?}", - message_count, - String::from_utf8_lossy(&res.value) - ); - } - if r.eof { - found_eof = true; - println!("Received EOF"); - break; - } - } else { + while let Some(r) = resp.message().await? { + if let Some(res) = r.result { + assert!(!res.value.is_empty()); + message_count += 1; + println!( + "Received message {}: {:?}", + message_count, + String::from_utf8_lossy(&res.value) + ); + } + if r.eof { + found_eof = true; + println!("Received EOF"); break; } } diff --git a/packages/pynumaflow-lite/tests/bin/source.rs b/packages/pynumaflow-lite/tests/bin/source.rs index 460fb4c1..65afaa53 100644 --- a/packages/pynumaflow-lite/tests/bin/source.rs +++ b/packages/pynumaflow-lite/tests/bin/source.rs @@ -122,10 +122,10 @@ async fn read_messages( let mut messages = Vec::new(); while let Some(response) = response_stream.message().await? { - if let Some(status) = response.status { - if status.eot { - break; - } + if let Some(status) = response.status + && status.eot + { + break; } if let Some(result) = response.result { messages.push(result);