diff --git a/.github/actions/determine-msrv/action.yml b/.github/actions/determine-msrv/action.yml index 3815607a6..41449fc87 100644 --- a/.github/actions/determine-msrv/action.yml +++ b/.github/actions/determine-msrv/action.yml @@ -12,6 +12,10 @@ inputs: description: The MSRV for PulseAudio backend (optional, Linux only) required: false default: '' + pipewire-msrv: + description: The MSRV for Pipewire backend (optional, Linux only) + required: false + default: '' outputs: all-features: @@ -28,11 +32,15 @@ runs: PLATFORM_MSRV="${{ inputs.platform-msrv }}" JACK_MSRV="${{ inputs.jack-msrv }}" PULSEAUDIO_MSRV="${{ inputs.pulseaudio-msrv }}" + PIPEWIRE_MSRV="${{ inputs.pipewire-msrv }}" # Use sort -V to find the maximum version VERSIONS="$PLATFORM_MSRV $JACK_MSRV" if [ -n "$PULSEAUDIO_MSRV" ]; then VERSIONS="$VERSIONS $PULSEAUDIO_MSRV" fi + if [ -n "$PIPEWIRE_MSRV" ]; then + VERSIONS="$VERSIONS $PIPEWIRE_MSRV" + fi MAX_MSRV=$(printf '%s\n' $VERSIONS | sort -V | tail -n1) echo "all-features=$MAX_MSRV" >> $GITHUB_OUTPUT - echo "Platform MSRV: $PLATFORM_MSRV, JACK MSRV: $JACK_MSRV, PulseAudio MSRV: $PULSEAUDIO_MSRV, Using for --all-features: $MAX_MSRV" + echo "Platform MSRV: $PLATFORM_MSRV, JACK MSRV: $JACK_MSRV, PulseAudio MSRV: $PULSEAUDIO_MSRV, PIPEWIRE_MSRV: $PIPEWIRE_MSRV, Using for --all-features: $MAX_MSRV" diff --git a/.github/workflows/platforms.yml b/.github/workflows/platforms.yml index 3354a5551..da3447d05 100644 --- a/.github/workflows/platforms.yml +++ b/.github/workflows/platforms.yml @@ -29,11 +29,12 @@ env: MSRV_COREAUDIO: "1.80" MSRV_JACK: "1.82" MSRV_PULSEAUDIO: "1.88" + MSRV_PIPEWIRE: "1.82" MSRV_WASIP1: "1.78" MSRV_WASM: "1.82" MSRV_WINDOWS: "1.82" - PACKAGES_LINUX: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev + PACKAGES_LINUX: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev libpipewire-0.3-dev ANDROID_COMPILE_SDK: "30" ANDROID_BUILD_TOOLS: "30.0.3" @@ -66,6 +67,7 @@ jobs: platform-msrv: ${{ env.MSRV_ALSA }} jack-msrv: ${{ env.MSRV_JACK }} pulseaudio-msrv: ${{ env.MSRV_PULSEAUDIO }} + pipewire-msrv: ${{ env.MSRV_PIPEWIRE }} - name: Install Rust MSRV (${{ env.MSRV_ALSA }}) uses: dtolnay/rust-toolchain@master @@ -153,10 +155,10 @@ jobs: run: cross +${{ env.MSRV_ALSA }} test --no-default-features --workspace --verbose --target ${{ env.TARGET }} - name: Run tests (all features) - run: cross +${{ steps.msrv.outputs.all-features }} test --all-features --workspace --verbose --target ${{ env.TARGET }} + run: cross +${{ steps.msrv.outputs.all-features }} test --features=jack,pulseaudio --workspace --verbose --target ${{ env.TARGET }} - name: Check examples (all features) - run: cross +${{ steps.msrv.outputs.all-features }} test --all-features --workspace --verbose --target ${{ env.TARGET }} + run: cross +${{ steps.msrv.outputs.all-features }} test --features=jack,pulseaudio --workspace --verbose --target ${{ env.TARGET }} # Windows (x86_64 and i686) windows: diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml index cd470bfd0..ba55ac99c 100644 --- a/.github/workflows/quality.yml +++ b/.github/workflows/quality.yml @@ -92,7 +92,7 @@ jobs: if: runner.os == 'Linux' uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev + packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev libpipewire-0.3-dev - name: Setup ASIO SDK if: runner.os == 'Windows' @@ -128,7 +128,7 @@ jobs: - name: Cache Linux audio packages uses: awalsh128/cache-apt-pkgs-action@latest with: - packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev + packages: libasound2-dev libjack-jackd2-dev libjack-jackd2-0 libdbus-1-dev libpipewire-0.3-dev - name: Install Rust toolchain uses: dtolnay/rust-toolchain@nightly diff --git a/CHANGELOG.md b/CHANGELOG.md index b09200a95..4ebbe6596 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `DeviceBusy` error variant to `SupportedStreamConfigsError`, `DefaultStreamConfigError`, and `BuildStreamError` for retryable device access errors (EBUSY, EAGAIN). - **PulseAudio**: New host for Linux and some BSDs using the PulseAudio API. +- **Pipewire**: New host for Linux and some BSDs using the Pipewire API. ### Changed diff --git a/Cargo.toml b/Cargo.toml index 33fbaa1eb..624d34d29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,12 @@ pulseaudio = ["dep:pulseaudio", "dep:futures"] # Note: JACK must be installed separately on all platforms jack = ["dep:jack"] +# PipeWire backend +# Provides audio I/O on Linux and some BSDs via the PipeWire multimedia server +# Requires: PipeWire server and client libraries installed on the system +# Platform: Linux, DragonFly BSD, FreeBSD, NetBSD +pipewire = ["dep:pipewire"] + # Audio thread priority elevation # Raises the audio callback thread to real-time priority for lower latency and fewer glitches # Requires: On Linux, either rtkit or appropriate user permissions (e.g. limits.conf or capabilities) @@ -103,6 +109,7 @@ audio_thread_priority = { version = "0.34", optional = true } jack = { version = "0.13", optional = true } pulseaudio = { version = "0.3", optional = true } futures = { version = "0.3", optional = true } +pipewire = { version = "0.9", optional = true, features = ["v0_3_53"] } [target.'cfg(target_vendor = "apple")'.dependencies] mach2 = "0.5" diff --git a/Cross.toml b/Cross.toml index 6d0ad81d1..09b92e9ea 100644 --- a/Cross.toml +++ b/Cross.toml @@ -2,6 +2,4 @@ dockerfile = "Dockerfile" [target.armv7-unknown-linux-gnueabihf.env] -passthrough = [ - "RUSTFLAGS", -] +passthrough = ["RUSTFLAGS"] diff --git a/Dockerfile b/Dockerfile index 8e56a2efd..5405f1417 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,4 +7,5 @@ ENV PKG_CONFIG_PATH=/usr/lib/arm-linux-gnueabihf/pkgconfig/ RUN dpkg --add-architecture armhf && \ apt-get update && \ apt-get install libasound2-dev:armhf -y && \ - apt-get install libjack-jackd2-dev:armhf libjack-jackd2-0:armhf -y \ + apt-get install libjack-jackd2-dev:armhf libjack-jackd2-0:armhf -y +# TODO: now the cross-rs is based on ubuntu:20.04, so it does not contain pipewire-0.3-dev diff --git a/README.md b/README.md index 63d967b7e..c6a9a7130 100644 --- a/README.md +++ b/README.md @@ -112,6 +112,17 @@ Enables the PulseAudio backend. PulseAudio is a sound server commonly used on Li - **Usage:** See the [beep example](examples/beep.rs) for selecting the PulseAudio host at runtime. +### `pipewire` + +**Platform:** Linux, DragonFly BSD, FreeBSD, NetBSD + +Enables the Pipewire backend. Pipewire is a media server commonly used on Linux desktops. + +**Requirements:** +- Pipewire server and client libraries must be installed on the system +- +**Usage:** See the [beep example](examples/beep.rs) for selecting the Pipewire host at runtime. + ### `wasm-bindgen` **Platform:** WebAssembly (wasm32-unknown-unknown) @@ -198,6 +209,7 @@ If you receive errors about no default input or output device: - **Linux/ALSA:** Ensure your user is in the `audio` group and that ALSA is properly configured - **Linux/PulseAudio:** Check that PulseAudio is running: `pulseaudio --check` +- **Linux/Pipewire:** Check that Pipewire is running: `systemd --user status pipewire` - **Windows:** Verify your audio device is enabled in Sound Settings - **macOS:** Check System Preferences > Sound for available devices - **Mobile (iOS/Android):** Ensure your app has microphone/audio permissions @@ -236,6 +248,8 @@ For platform-specific features, enable the relevant features: ```bash cargo run --example beep --features asio # Windows ASIO cargo run --example beep --features jack # JACK backend +cargo run --example beep --features pulseaudio # PulseAudio backend +cargo run --example beep --features pipewire # Pipewire backend ``` ## Contributing diff --git a/examples/beep.rs b/examples/beep.rs index f11ea6551..74b7d5a36 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -31,6 +31,10 @@ struct Opt { /// Use the PulseAudio host. Requires `--features pulseaudio`. #[arg(long, default_value_t = false)] pulseaudio: bool, + + /// Use the Pipewire host. Requires `--features pipewire` + #[arg(long, default_value_t = false)] + pipewire: bool, } fn main() -> anyhow::Result<()> { @@ -42,7 +46,8 @@ fn main() -> anyhow::Result<()> { let mut jack_host_id = Err(HostUnavailable); #[allow(unused_mut, unused_assignments)] let mut pulseaudio_host_id = Err(HostUnavailable); - + #[allow(unused_mut, unused_assignments)] + let mut pipewire_host_id = Err(HostUnavailable); #[cfg(any( target_os = "linux", target_os = "dragonfly", @@ -59,6 +64,10 @@ fn main() -> anyhow::Result<()> { { pulseaudio_host_id = Ok(cpal::HostId::PulseAudio); } + #[cfg(feature = "pipewire")] + { + pipewire_host_id = Ok(cpal::HostId::PipeWire); + } } // Manually check for flags. Can be passed through cargo with -- e.g. @@ -71,6 +80,10 @@ fn main() -> anyhow::Result<()> { pulseaudio_host_id .and_then(cpal::host_from_id) .expect("make sure `--features pulseaudio` is specified, and the platform is supported") + } else if opt.pipewire { + pipewire_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features pipewire` is specified, and the platform is supported") } else { cpal::default_host() }; diff --git a/examples/record_wav.rs b/examples/record_wav.rs index bbe8f86cf..02f8530c2 100644 --- a/examples/record_wav.rs +++ b/examples/record_wav.rs @@ -4,7 +4,7 @@ use clap::Parser; use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; -use cpal::{FromSample, Sample}; +use cpal::{FromSample, HostUnavailable, Sample}; use std::fs::File; use std::io::BufWriter; use std::sync::{Arc, Mutex}; @@ -20,58 +20,70 @@ struct Opt { #[arg(long, default_value_t = 3)] duration: u64, - /// Use the JACK host - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" - ))] - #[arg(short, long)] - #[allow(dead_code)] + /// Use the JACK host. Requires `--features jack`. + #[arg(long, default_value_t = false)] jack: bool, + + /// Use the PulseAudio host. Requires `--features pulseaudio`. + #[arg(long, default_value_t = false)] + pulseaudio: bool, + + /// Use the Pipewire host. Requires `--features pipewire` + #[arg(long, default_value_t = false)] + pipewire: bool, } fn main() -> Result<(), anyhow::Error> { let opt = Opt::parse(); - // Conditionally compile with jack if the feature is specified. - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" + // Jack/PulseAudio support must be enabled at compile time, and is + // only available on some platforms. + #[allow(unused_mut, unused_assignments)] + let mut jack_host_id = Err(HostUnavailable); + #[allow(unused_mut, unused_assignments)] + let mut pulseaudio_host_id = Err(HostUnavailable); + #[allow(unused_mut, unused_assignments)] + let mut pipewire_host_id = Err(HostUnavailable); + #[cfg(any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" ))] + { + #[cfg(feature = "jack")] + { + jack_host_id = Ok(cpal::HostId::Jack); + } + + #[cfg(feature = "pulseaudio")] + { + pulseaudio_host_id = Ok(cpal::HostId::PulseAudio); + } + #[cfg(feature = "pipewire")] + { + pipewire_host_id = Ok(cpal::HostId::PipeWire); + } + } + // Manually check for flags. Can be passed through cargo with -- e.g. - // cargo run --release --example beep --features jack -- --jack + // cargo run --release --example record_wav --features jack -- --jack let host = if opt.jack { - cpal::host_from_id(cpal::available_hosts() - .into_iter() - .find(|id| *id == cpal::HostId::Jack) - .expect( - "make sure --features jack is specified. only works on OSes where jack is available", - )).expect("jack host unavailable") + jack_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features jack` is specified, and the platform is supported") + } else if opt.pulseaudio { + pulseaudio_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features pulseaudio` is specified, and the platform is supported") + } else if opt.pipewire { + pipewire_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features pipewire` is specified, and the platform is supported") } else { cpal::default_host() }; - #[cfg(any( - not(any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - )), - not(feature = "jack") - ))] - let host = cpal::default_host(); - // Set up the input device and stream with the default input config. let device = if let Some(device) = opt.device { let id = &device.parse().expect("failed to parse input device id"); diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index 71d738633..ba1989625 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -20,14 +20,15 @@ use self::alsa::poll::Descriptors; pub use self::enumerate::Devices; use crate::{ + host::fill_with_equilibrium, iter::{SupportedInputConfigs, SupportedOutputConfigs}, traits::{DeviceTrait, HostTrait, StreamTrait}, BackendSpecificError, BufferSize, BuildStreamError, ChannelCount, Data, DefaultStreamConfigError, DeviceDescription, DeviceDescriptionBuilder, DeviceDirection, DeviceId, DeviceIdError, DeviceNameError, DevicesError, FrameCount, InputCallbackInfo, - OutputCallbackInfo, PauseStreamError, PlayStreamError, Sample, SampleFormat, SampleRate, - StreamConfig, StreamError, SupportedBufferSize, SupportedStreamConfig, - SupportedStreamConfigRange, SupportedStreamConfigsError, I24, U24, + OutputCallbackInfo, PauseStreamError, PlayStreamError, SampleFormat, SampleRate, StreamConfig, + StreamError, SupportedBufferSize, SupportedStreamConfig, SupportedStreamConfigRange, + SupportedStreamConfigsError, }; mod enumerate; @@ -1239,59 +1240,6 @@ fn hw_params_buffer_size_min_max(hw_params: &alsa::pcm::HwParams) -> (FrameCount (min_buf, max_buf) } -// Fill a buffer with equilibrium values for any sample format. -// Works with any buffer size, even if not perfectly aligned to sample boundaries. -fn fill_with_equilibrium(buffer: &mut [u8], sample_format: SampleFormat) { - macro_rules! fill_typed { - ($sample_type:ty) => {{ - let sample_size = std::mem::size_of::<$sample_type>(); - - assert_eq!( - buffer.len() % sample_size, - 0, - "Buffer size must be aligned to sample size for format {:?}", - sample_format - ); - - let num_samples = buffer.len() / sample_size; - let equilibrium = <$sample_type as Sample>::EQUILIBRIUM; - - // Safety: We verified the buffer size is correctly aligned for the sample type - let samples = unsafe { - std::slice::from_raw_parts_mut( - buffer.as_mut_ptr() as *mut $sample_type, - num_samples, - ) - }; - - for sample in samples { - *sample = equilibrium; - } - }}; - } - const DSD_SILENCE_BYTE: u8 = 0x69; - - match sample_format { - SampleFormat::I8 => fill_typed!(i8), - SampleFormat::I16 => fill_typed!(i16), - SampleFormat::I24 => fill_typed!(I24), - SampleFormat::I32 => fill_typed!(i32), - // SampleFormat::I48 => fill_typed!(I48), - SampleFormat::I64 => fill_typed!(i64), - SampleFormat::U8 => fill_typed!(u8), - SampleFormat::U16 => fill_typed!(u16), - SampleFormat::U24 => fill_typed!(U24), - SampleFormat::U32 => fill_typed!(u32), - // SampleFormat::U48 => fill_typed!(U48), - SampleFormat::U64 => fill_typed!(u64), - SampleFormat::F32 => fill_typed!(f32), - SampleFormat::F64 => fill_typed!(f64), - SampleFormat::DsdU8 | SampleFormat::DsdU16 | SampleFormat::DsdU32 => { - buffer.fill(DSD_SILENCE_BYTE) - } - } -} - fn init_hw_params<'a>( pcm_handle: &'a alsa::pcm::PCM, config: &StreamConfig, diff --git a/src/host/mod.rs b/src/host/mod.rs index d4e7a3ec9..39100722d 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -1,3 +1,5 @@ +use crate::{Sample, SampleFormat, I24, U24}; + #[cfg(target_os = "android")] pub(crate) mod aaudio; #[cfg(any( @@ -31,6 +33,16 @@ pub(crate) mod emscripten; ) ))] pub(crate) mod jack; +#[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd", + ), + feature = "pipewire" +))] +pub(crate) mod pipewire; #[cfg(all( any( target_os = "linux", @@ -61,3 +73,57 @@ pub(crate) mod custom; all(target_arch = "wasm32", feature = "wasm-bindgen"), )))] pub(crate) mod null; + +// Fill a buffer with equilibrium values for any sample format. +// Works with any buffer size, even if not perfectly aligned to sample boundaries. +#[allow(unused)] +pub(crate) fn fill_with_equilibrium(buffer: &mut [u8], sample_format: SampleFormat) { + macro_rules! fill_typed { + ($sample_type:ty) => {{ + let sample_size = std::mem::size_of::<$sample_type>(); + + assert_eq!( + buffer.len() % sample_size, + 0, + "Buffer size must be aligned to sample size for format {:?}", + sample_format + ); + + let num_samples = buffer.len() / sample_size; + let equilibrium = <$sample_type as Sample>::EQUILIBRIUM; + + // Safety: We verified the buffer size is correctly aligned for the sample type + let samples = unsafe { + std::slice::from_raw_parts_mut( + buffer.as_mut_ptr() as *mut $sample_type, + num_samples, + ) + }; + + for sample in samples { + *sample = equilibrium; + } + }}; + } + const DSD_SILENCE_BYTE: u8 = 0x69; + + match sample_format { + SampleFormat::I8 => fill_typed!(i8), + SampleFormat::I16 => fill_typed!(i16), + SampleFormat::I24 => fill_typed!(I24), + SampleFormat::I32 => fill_typed!(i32), + // SampleFormat::I48 => fill_typed!(I48), + SampleFormat::I64 => fill_typed!(i64), + SampleFormat::U8 => fill_typed!(u8), + SampleFormat::U16 => fill_typed!(u16), + SampleFormat::U24 => fill_typed!(U24), + SampleFormat::U32 => fill_typed!(u32), + // SampleFormat::U48 => fill_typed!(U48), + SampleFormat::U64 => fill_typed!(u64), + SampleFormat::F32 => fill_typed!(f32), + SampleFormat::F64 => fill_typed!(f64), + SampleFormat::DsdU8 | SampleFormat::DsdU16 | SampleFormat::DsdU32 => { + buffer.fill(DSD_SILENCE_BYTE) + } + } +} diff --git a/src/host/pipewire/device.rs b/src/host/pipewire/device.rs new file mode 100644 index 000000000..163e690a0 --- /dev/null +++ b/src/host/pipewire/device.rs @@ -0,0 +1,767 @@ +use std::time::Duration; +use std::{cell::RefCell, rc::Rc}; + +use crate::host::pipewire::stream::{StreamCommand, StreamData, SUPPORTED_FORMATS}; +use crate::host::pipewire::utils::{ + audio, clock, group, DEVICE_ICON_NAME, METADATA_NAME, PORT_GROUP, +}; +use crate::{traits::DeviceTrait, DeviceDirection, SupportedStreamConfigRange}; +use crate::{ChannelCount, FrameCount, InterfaceType, SampleRate}; + +use crate::iter::{SupportedInputConfigs, SupportedOutputConfigs}; +use pipewire::{ + self as pw, + metadata::{Metadata, MetadataListener}, + node::{Node, NodeListener}, + proxy::ProxyT, + spa::utils::result::AsyncSeq, +}; + +use std::thread; + +use super::stream::Stream; + +pub type Devices = std::vec::IntoIter; + +// This enum record whether it is created by human or just default device +#[derive(Clone, Debug, Default, Copy)] +pub(crate) enum Class { + #[default] + Node, + DefaultSink, + DefaultInput, + DefaultOutput, +} + +#[derive(Clone, Debug, Default, Copy)] +pub enum Role { + Sink, + #[default] + Source, + Duplex, + StreamOutput, + StreamInput, +} + +#[derive(Clone, Debug, Default)] +pub struct Device { + node_name: String, + nick_name: String, + description: String, + direction: DeviceDirection, + channels: ChannelCount, + rate: SampleRate, + allow_rates: Vec, + quantum: FrameCount, + min_quantum: FrameCount, + max_quantum: FrameCount, + class: Class, + role: Role, + icon_name: String, + object_serial: u32, + interface_type: InterfaceType, + address: Option, + driver: Option, +} + +impl Device { + pub(crate) fn class(&self) -> Class { + self.class + } + fn sink_default() -> Self { + Self { + node_name: "sink_default".to_owned(), + nick_name: "sink_default".to_owned(), + description: "default_sink".to_owned(), + direction: DeviceDirection::Duplex, + channels: 2, + class: Class::DefaultSink, + role: Role::Sink, + ..Default::default() + } + } + fn input_default() -> Self { + Self { + node_name: "input_default".to_owned(), + nick_name: "input_default".to_owned(), + description: "default_input".to_owned(), + direction: DeviceDirection::Input, + channels: 2, + class: Class::DefaultInput, + role: Role::Source, + ..Default::default() + } + } + fn output_default() -> Self { + Self { + node_name: "output_default".to_owned(), + nick_name: "output_default".to_owned(), + description: "default_output".to_owned(), + direction: DeviceDirection::Output, + channels: 2, + class: Class::DefaultOutput, + role: Role::Source, + ..Default::default() + } + } + + fn device_type(&self) -> crate::DeviceType { + match self.icon_name.as_str() { + "audio-headphones" => crate::DeviceType::Headphones, + "audio-headset" => crate::DeviceType::Headset, + "audio-input-microphone" => crate::DeviceType::Microphone, + "audio-speakers" => crate::DeviceType::Speaker, + _ => crate::DeviceType::Unknown, + } + } + + pub(crate) fn pw_properties( + &self, + direction: DeviceDirection, + config: &crate::StreamConfig, + ) -> pw::properties::PropertiesBox { + let mut properties = match direction { + DeviceDirection::Output => pw::properties::properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CATEGORY => "Playback", + }, + DeviceDirection::Input => pw::properties::properties! { + *pw::keys::MEDIA_TYPE => "Audio", + *pw::keys::MEDIA_CATEGORY => "Capture", + }, + _ => unreachable!(), + }; + if matches!(self.role, Role::Sink) { + properties.insert(*pw::keys::STREAM_CAPTURE_SINK, "true"); + } + if matches!(self.class, Class::Node) { + properties.insert(*pw::keys::TARGET_OBJECT, self.object_serial.to_string()); + } + if let crate::BufferSize::Fixed(buffer_size) = config.buffer_size { + properties.insert(*pw::keys::NODE_FORCE_QUANTUM, buffer_size.to_string()); + } + properties + } +} +impl DeviceTrait for Device { + type Stream = Stream; + type SupportedInputConfigs = SupportedInputConfigs; + type SupportedOutputConfigs = SupportedOutputConfigs; + + fn id(&self) -> Result { + Ok(crate::DeviceId( + crate::HostId::PipeWire, + self.node_name.clone(), + )) + } + + fn description(&self) -> Result { + let mut builder = crate::DeviceDescriptionBuilder::new(&self.nick_name) + .direction(self.direction) + .device_type(self.device_type()) + .interface_type(self.interface_type); + if let Some(address) = self.address.as_ref() { + builder = builder.address(address); + } + if let Some(driver) = self.driver.as_ref() { + builder = builder.driver(driver); + } + if !self.description.is_empty() && self.description != self.nick_name { + builder = builder.add_extended_line(&self.description); + } + Ok(builder.build()) + } + + fn supports_input(&self) -> bool { + matches!( + self.direction, + DeviceDirection::Input | DeviceDirection::Duplex + ) + } + + fn supports_output(&self) -> bool { + matches!( + self.direction, + DeviceDirection::Output | DeviceDirection::Duplex + ) + } + + fn supported_input_configs( + &self, + ) -> Result { + if !self.supports_input() { + return Ok(vec![].into_iter()); + } + let rates = if self.allow_rates.is_empty() { + vec![self.rate] + } else { + self.allow_rates.clone() + }; + Ok(rates + .iter() + .flat_map(|&rate| { + SUPPORTED_FORMATS + .iter() + .map(move |sample_format| SupportedStreamConfigRange { + channels: self.channels, + min_sample_rate: rate, + max_sample_rate: rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: *sample_format, + }) + }) + .collect::>() + .into_iter()) + } + fn supported_output_configs( + &self, + ) -> Result { + if !self.supports_output() { + return Ok(vec![].into_iter()); + } + let rates = if self.allow_rates.is_empty() { + vec![self.rate] + } else { + self.allow_rates.clone() + }; + Ok(rates + .iter() + .flat_map(|&rate| { + SUPPORTED_FORMATS + .iter() + .map(move |sample_format| SupportedStreamConfigRange { + channels: self.channels, + min_sample_rate: rate, + max_sample_rate: rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + sample_format: *sample_format, + }) + }) + .collect::>() + .into_iter()) + } + fn default_input_config( + &self, + ) -> Result { + if !self.supports_input() { + return Err(crate::DefaultStreamConfigError::StreamTypeNotSupported); + } + Ok(crate::SupportedStreamConfig { + channels: self.channels, + sample_format: crate::SampleFormat::F32, + sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + }) + } + + fn default_output_config( + &self, + ) -> Result { + if !self.supports_output() { + return Err(crate::DefaultStreamConfigError::StreamTypeNotSupported); + } + Ok(crate::SupportedStreamConfig { + channels: self.channels, + sample_format: crate::SampleFormat::F32, + sample_rate: self.rate, + buffer_size: crate::SupportedBufferSize::Range { + min: self.min_quantum, + max: self.max_quantum, + }, + }) + } + + fn build_input_stream_raw( + &self, + config: &crate::StreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&crate::Data, &crate::InputCallbackInfo) + Send + 'static, + E: FnMut(crate::StreamError) + Send + 'static, + { + let (pw_play_tx, pw_play_rx) = pw::channel::channel::(); + + let (pw_init_tx, pw_init_rx) = std::sync::mpsc::channel::(); + let device = self.clone(); + let config = config.clone(); + let wait_timeout = timeout.unwrap_or(Duration::from_secs(2)); + let handle = thread::Builder::new() + .name("pw_in".to_owned()) + .spawn(move || { + let properties = device.pw_properties(DeviceDirection::Input, &config); + let Ok(StreamData { + mainloop, + listener, + stream, + context, + }) = super::stream::connect_input( + &config, + properties, + sample_format, + data_callback, + error_callback, + ) + else { + let _ = pw_init_tx.send(false); + return; + }; + let _ = pw_init_tx.send(true); + let stream = stream.clone(); + let mainloop_rc1 = mainloop.clone(); + let _receiver = pw_play_rx.attach(mainloop.loop_(), move |play| match play { + StreamCommand::Toggle(state) => { + let _ = stream.set_active(state); + } + StreamCommand::Stop => { + let _ = stream.disconnect(); + mainloop_rc1.quit(); + } + }); + mainloop.run(); + drop(listener); + drop(context); + }) + .map_err(|e| crate::BuildStreamError::BackendSpecific { + err: crate::BackendSpecificError { + description: format!("failed to create thread: {e}"), + }, + })?; + match pw_init_rx.recv_timeout(wait_timeout) { + Ok(true) => Ok(Stream { + handle: Some(handle), + controller: pw_play_tx, + }), + Ok(false) => Err(crate::BuildStreamError::StreamConfigNotSupported), + Err(_) => Err(crate::BuildStreamError::BackendSpecific { + err: crate::BackendSpecificError { + description: "pipewire timeout".to_owned(), + }, + }), + } + } + + fn build_output_stream_raw( + &self, + config: &crate::StreamConfig, + sample_format: crate::SampleFormat, + data_callback: D, + error_callback: E, + timeout: Option, + ) -> Result + where + D: FnMut(&mut crate::Data, &crate::OutputCallbackInfo) + Send + 'static, + E: FnMut(crate::StreamError) + Send + 'static, + { + let (pw_play_tx, pw_play_rx) = pw::channel::channel::(); + + let (pw_init_tx, pw_init_rx) = std::sync::mpsc::channel::(); + let device = self.clone(); + let config = config.clone(); + let wait_timeout = timeout.unwrap_or(Duration::from_secs(2)); + let handle = thread::Builder::new() + .name("pw_out".to_owned()) + .spawn(move || { + let properties = device.pw_properties(DeviceDirection::Output, &config); + + let Ok(StreamData { + mainloop, + listener, + stream, + context, + }) = super::stream::connect_output( + &config, + properties, + sample_format, + data_callback, + error_callback, + ) + else { + let _ = pw_init_tx.send(false); + return; + }; + + let _ = pw_init_tx.send(true); + let stream = stream.clone(); + let mainloop_rc1 = mainloop.clone(); + let _receiver = pw_play_rx.attach(mainloop.loop_(), move |play| match play { + StreamCommand::Toggle(state) => { + let _ = stream.set_active(state); + } + StreamCommand::Stop => { + let _ = stream.disconnect(); + mainloop_rc1.quit(); + } + }); + mainloop.run(); + drop(listener); + drop(context); + }) + .map_err(|e| crate::BuildStreamError::BackendSpecific { + err: crate::BackendSpecificError { + description: format!("failed to create thread: {e}"), + }, + })?; + match pw_init_rx.recv_timeout(wait_timeout) { + Ok(true) => Ok(Stream { + handle: Some(handle), + controller: pw_play_tx, + }), + Ok(false) => Err(crate::BuildStreamError::StreamConfigNotSupported), + Err(_) => Err(crate::BuildStreamError::BackendSpecific { + err: crate::BackendSpecificError { + description: "pipewire timeout".to_owned(), + }, + }), + } + } +} + +#[derive(Debug, Clone, Default)] +struct Settings { + rate: SampleRate, + allow_rates: Vec, + quantum: FrameCount, + min_quantum: FrameCount, + max_quantum: FrameCount, +} + +// NOTE: it is just used to keep the lifetime +#[allow(dead_code)] +enum Request { + Node(NodeListener), + Meta(MetadataListener), +} + +impl From for Request { + fn from(value: NodeListener) -> Self { + Self::Node(value) + } +} + +impl From for Request { + fn from(value: MetadataListener) -> Self { + Self::Meta(value) + } +} + +pub fn init_devices() -> Option> { + pw::init(); + let mainloop = pw::main_loop::MainLoopRc::new(None).ok()?; + let context = pw::context::ContextRc::new(&mainloop, None).ok()?; + let core = context.connect_rc(None).ok()?; + let registry = core.get_registry_rc().ok()?; + + // To comply with Rust's safety rules, we wrap this variable in an `Rc` and a `Cell`. + let devices: Rc>> = Rc::new(RefCell::new(vec![ + Device::sink_default(), + Device::input_default(), + Device::output_default(), + ])); + let requests = Rc::new(RefCell::new(vec![])); + let settings = Rc::new(RefCell::new(Settings::default())); + let loop_clone = mainloop.clone(); + + // Trigger the sync event. The server's answer won't be processed until we start the main loop, + // so we can safely do this before setting up a callback. This lets us avoid using a Cell. + let pending_events: Rc>> = Rc::new(RefCell::new(vec![])); + let pending = core.sync(0).ok()?; + + pending_events.borrow_mut().push(pending); + + let _listener_core = core + .add_listener_local() + .done({ + let pending_events = pending_events.clone(); + move |id, seq| { + if id != pw::core::PW_ID_CORE { + return; + } + let mut pendinglist = pending_events.borrow_mut(); + let Some(index) = pendinglist.iter().position(|o_seq| *o_seq == seq) else { + return; + }; + pendinglist.remove(index); + if !pendinglist.is_empty() { + return; + } + loop_clone.quit(); + } + }) + .register(); + let _listener_reg = registry + .add_listener_local() + .global({ + let devices = devices.clone(); + let registry = registry.clone(); + let requests = requests.clone(); + let settings = settings.clone(); + move |global| match global.type_ { + pipewire::types::ObjectType::Metadata => { + if !global.props.is_some_and(|props| { + props + .get(METADATA_NAME) + .is_some_and(|name| name == "settings") + }) { + return; + } + let meta_settings: Metadata = match registry.bind(global) { + Ok(meta_settings) => meta_settings, + Err(_) => { + // TODO: do something about this error + // Though it is already checked, but maybe something happened with + // pipewire? + return; + } + }; + let settings = settings.clone(); + let listener = meta_settings + .add_listener_local() + .property(move |_, key, _, value| { + match (key, value) { + (Some(clock::RATE), Some(rate)) => { + let Ok(rate) = rate.parse() else { + return 0; + }; + settings.borrow_mut().rate = rate; + } + (Some(clock::ALLOWED_RATES), Some(list)) => { + let Some(allow_rates) = parse_allow_rates(list) else { + return 0; + }; + + settings.borrow_mut().allow_rates = allow_rates; + } + (Some(clock::QUANTUM), Some(quantum)) => { + let Ok(quantum) = quantum.parse() else { + return 0; + }; + settings.borrow_mut().quantum = quantum; + } + (Some(clock::MIN_QUANTUM), Some(min_quantum)) => { + let Ok(min_quantum) = min_quantum.parse() else { + return 0; + }; + settings.borrow_mut().min_quantum = min_quantum; + } + (Some(clock::MAX_QUANTUM), Some(max_quantum)) => { + let Ok(max_quantum) = max_quantum.parse() else { + return 0; + }; + settings.borrow_mut().max_quantum = max_quantum; + } + _ => {} + } + 0 + }) + .register(); + let Ok(pending) = core.sync(0) else { + // TODO: maybe we should add a log? + return; + }; + pending_events.borrow_mut().push(pending); + requests + .borrow_mut() + .push((meta_settings.upcast(), Request::Meta(listener))); + } + pipewire::types::ObjectType::Node => { + let Some(props) = global.props else { + return; + }; + let Some(media_class) = props.get(*pw::keys::MEDIA_CLASS) else { + return; + }; + if !matches!( + media_class, + audio::SINK + | audio::SOURCE + | audio::DUPLEX + | audio::STREAM_INPUT + | audio::STREAM_OUTPUT + ) { + return; + } + + let node: Node = match registry.bind(global) { + Ok(node) => node, + Err(_) => { + // TODO: do something about this error + // Though it is already checked, but maybe something happened with + // pipewire? + return; + } + }; + + let devices = devices.clone(); + let listener = node + .add_listener_local() + .info(move |info| { + let Some(props) = info.props() else { + return; + }; + let Some(media_class) = props.get(*pw::keys::MEDIA_CLASS) else { + return; + }; + let role = match media_class { + audio::SINK => Role::Sink, + audio::SOURCE => Role::Source, + audio::DUPLEX => Role::Duplex, + audio::STREAM_OUTPUT => Role::StreamOutput, + audio::STREAM_INPUT => Role::StreamInput, + _ => { + return; + } + }; + let Some(group) = props.get(PORT_GROUP) else { + return; + }; + let direction = match (group, role) { + (group::PLAY_BACK, Role::Sink) => DeviceDirection::Duplex, + (group::PLAY_BACK, Role::Source) => DeviceDirection::Output, + (group::CAPTURE, _) => DeviceDirection::Input, + (_, Role::Sink) => DeviceDirection::Output, + (_, Role::Source) => DeviceDirection::Input, + (_, Role::Duplex) => DeviceDirection::Duplex, + // Bluetooth and other non-ALSA devices use generic port group + // names like "stream.0" — derive direction from media.class + (_, Role::StreamOutput) => DeviceDirection::Output, + (_, Role::StreamInput) => DeviceDirection::Input, + }; + let Some(object_serial) = props + .get(*pw::keys::OBJECT_SERIAL) + .and_then(|serial| serial.parse().ok()) + else { + return; + }; + let node_name = props + .get(*pw::keys::NODE_NAME) + .unwrap_or("unknown") + .to_owned(); + let description = props + .get(*pw::keys::NODE_DESCRIPTION) + .unwrap_or("unknown") + .to_owned(); + let nick_name = props + .get(*pw::keys::NODE_NICK) + .unwrap_or(description.as_str()) + .to_owned(); + let channels = props + .get(*pw::keys::AUDIO_CHANNELS) + .and_then(|channels| channels.parse().ok()) + .unwrap_or(2); + + let icon_name = + props.get(DEVICE_ICON_NAME).unwrap_or("default").to_owned(); + + let interface_type = match props.get(*pw::keys::DEVICE_API) { + Some("bluez5") => InterfaceType::Bluetooth, + _ => match props.get("device.bus") { + Some("pci") => InterfaceType::Pci, + Some("usb") => InterfaceType::Usb, + Some("firewire") => InterfaceType::FireWire, + Some("thunderbolt") => InterfaceType::Thunderbolt, + _ => InterfaceType::Unknown, + }, + }; + + let address = props + .get("api.bluez5.address") + .or_else(|| props.get("api.alsa.path")) + .map(|s| s.to_owned()); + + let driver = props.get(*pw::keys::FACTORY_NAME).map(|s| s.to_owned()); + + let device = Device { + node_name, + nick_name, + description, + direction, + role, + channels, + icon_name, + object_serial, + interface_type, + address, + driver, + ..Default::default() + }; + devices.borrow_mut().push(device); + }) + .register(); + let Ok(pending) = core.sync(0) else { + // TODO: maybe we should add a log? + return; + }; + pending_events.borrow_mut().push(pending); + requests + .borrow_mut() + .push((node.upcast(), Request::Node(listener))); + } + _ => {} + } + }) + .register(); + + mainloop.run(); + + let mut devices = devices.take(); + let settings = settings.take(); + for device in devices.iter_mut() { + device.rate = settings.rate; + device.allow_rates = settings.allow_rates.clone(); + device.quantum = settings.quantum; + device.min_quantum = settings.min_quantum; + device.max_quantum = settings.max_quantum; + } + Some(devices) +} + +fn parse_allow_rates(list: &str) -> Option> { + let list: Vec<&str> = list + .trim() + .strip_prefix("[")? + .strip_suffix("]")? + .split(' ') + .flat_map(|s| s.split(',')) + .filter(|s| !s.is_empty()) + .collect(); + let mut allow_rates = vec![]; + for rate in list { + let rate = rate.parse().ok()?; + allow_rates.push(rate); + } + Some(allow_rates) +} + +#[cfg(test)] +mod test { + use super::parse_allow_rates; + #[test] + fn rate_parse() { + // In documents, the rates are separated by space + let rate_str = r#" [ 44100 48000 88200 96000 176400 192000 ] "#; + let rates = parse_allow_rates(rate_str).unwrap(); + assert_eq!(rates, vec![44100, 48000, 88200, 96000, 176400, 192000]); + // ',' is also allowed + let rate_str = r#" [ 44100, 48000, 88200, 96000 ,176400 ,192000 ] "#; + let rates = parse_allow_rates(rate_str).unwrap(); + assert_eq!(rates, vec![44100, 48000, 88200, 96000, 176400, 192000]); + assert_eq!(rates, vec![44100, 48000, 88200, 96000, 176400, 192000]); + // We only use [] to define the list + let rate_str = r#" { 44100, 48000, 88200, 96000 ,176400 ,192000 } "#; + let rates = parse_allow_rates(rate_str); + assert_eq!(rates, None); + } +} diff --git a/src/host/pipewire/mod.rs b/src/host/pipewire/mod.rs new file mode 100644 index 000000000..67fdf0455 --- /dev/null +++ b/src/host/pipewire/mod.rs @@ -0,0 +1,47 @@ +use crate::traits::HostTrait; +use device::{init_devices, Class, Device, Devices}; +mod device; +mod stream; +mod utils; + +#[inline] +fn pipewire_available() -> bool { + let dir = std::env::var("PIPEWIRE_RUNTIME_DIR") + .or_else(|_| std::env::var("XDG_RUNTIME_DIR")) + .unwrap_or_default(); + std::path::Path::new(&dir).join("pipewire-0").exists() +} + +#[derive(Debug)] +pub struct Host(Vec); + +impl Host { + pub fn new() -> Result { + let devices = init_devices().ok_or(crate::HostUnavailable)?; + Ok(Host(devices)) + } +} + +impl HostTrait for Host { + type Devices = Devices; + type Device = Device; + fn is_available() -> bool { + pipewire_available() + } + fn devices(&self) -> Result { + Ok(self.0.clone().into_iter()) + } + + fn default_input_device(&self) -> Option { + self.0 + .iter() + .find(|device| matches!(device.class(), Class::DefaultInput)) + .cloned() + } + fn default_output_device(&self) -> Option { + self.0 + .iter() + .find(|device| matches!(device.class(), Class::DefaultOutput)) + .cloned() + } +} diff --git a/src/host/pipewire/stream.rs b/src/host/pipewire/stream.rs new file mode 100644 index 000000000..8def5bd2f --- /dev/null +++ b/src/host/pipewire/stream.rs @@ -0,0 +1,561 @@ +use std::{thread::JoinHandle, time::Instant}; + +use crate::{ + host::fill_with_equilibrium, traits::StreamTrait, BackendSpecificError, InputCallbackInfo, + OutputCallbackInfo, SampleFormat, StreamConfig, StreamError, StreamInstant, +}; +use pipewire::{ + self as pw, + context::ContextRc, + main_loop::MainLoopRc, + spa::{ + param::{ + format::{MediaSubtype, MediaType}, + format_utils, + }, + pod::Pod, + }, + stream::{StreamListener, StreamRc, StreamState}, +}; + +use crate::Data; + +#[derive(Debug, Clone, Copy)] +pub enum StreamCommand { + Toggle(bool), + Stop, +} + +pub struct Stream { + pub(crate) handle: Option>, + pub(crate) controller: pw::channel::Sender, +} + +impl Drop for Stream { + fn drop(&mut self) { + let _ = self.controller.send(StreamCommand::Stop); + let _ = self.handle.take().map(|handle| handle.join()); + } +} + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), crate::PlayStreamError> { + self.controller + .send(StreamCommand::Toggle(true)) + .map_err(|_| crate::PlayStreamError::BackendSpecific { + err: BackendSpecificError { + description: "Cannot send message".to_owned(), + }, + })?; + Ok(()) + } + fn pause(&self) -> Result<(), crate::PauseStreamError> { + self.controller + .send(StreamCommand::Toggle(false)) + .map_err(|_| crate::PauseStreamError::BackendSpecific { + err: BackendSpecificError { + description: "Cannot send message".to_owned(), + }, + })?; + Ok(()) + } +} + +pub(crate) const SUPPORTED_FORMATS: &[SampleFormat] = &[ + SampleFormat::I8, + SampleFormat::U8, + SampleFormat::I16, + SampleFormat::U16, + SampleFormat::I24, + SampleFormat::U24, + SampleFormat::I32, + SampleFormat::U32, + SampleFormat::I64, + SampleFormat::U64, + SampleFormat::F32, + SampleFormat::F64, +]; + +impl From for pw::spa::param::audio::AudioFormat { + fn from(value: SampleFormat) -> Self { + match value { + SampleFormat::I8 => Self::S8, + SampleFormat::U8 => Self::U8, + + #[cfg(target_endian = "little")] + SampleFormat::I16 => Self::S16LE, + #[cfg(target_endian = "big")] + SampleFormat::I16 => Self::S16BE, + #[cfg(target_endian = "little")] + SampleFormat::U16 => Self::U16LE, + #[cfg(target_endian = "big")] + SampleFormat::U16 => Self::U16BE, + + #[cfg(target_endian = "little")] + SampleFormat::I24 => Self::S24LE, + #[cfg(target_endian = "big")] + SampleFormat::I24 => Self::S24BE, + #[cfg(target_endian = "little")] + SampleFormat::U24 => Self::U24LE, + #[cfg(target_endian = "big")] + SampleFormat::U24 => Self::U24BE, + #[cfg(target_endian = "little")] + SampleFormat::I32 => Self::S32LE, + #[cfg(target_endian = "big")] + SampleFormat::I32 => Self::S32BE, + #[cfg(target_endian = "little")] + SampleFormat::U32 => Self::U32LE, + #[cfg(target_endian = "big")] + SampleFormat::U32 => Self::U32BE, + #[cfg(target_endian = "little")] + SampleFormat::F32 => Self::F32LE, + #[cfg(target_endian = "big")] + SampleFormat::F32 => Self::F32BE, + #[cfg(target_endian = "little")] + SampleFormat::F64 => Self::F64LE, + #[cfg(target_endian = "big")] + SampleFormat::F64 => Self::F64BE, + // NOTE: Seems PipeWire does support U64 and I64, but libspa doesn't yet. + // TODO: Maybe add the support in the future + _ => Self::Unknown, + } + } +} + +pub struct UserData { + data_callback: D, + error_callback: E, + sample_format: SampleFormat, + format: pw::spa::param::audio::AudioInfoRaw, + created_instance: Instant, +} +impl UserData +where + E: FnMut(StreamError) + Send + 'static, +{ + fn state_changed(&mut self, new: StreamState) { + match new { + pipewire::stream::StreamState::Error(e) => { + (self.error_callback)(StreamError::BackendSpecific { + err: BackendSpecificError { description: e }, + }) + } + // TODO: maybe we need to log information when every new state comes? + pipewire::stream::StreamState::Paused => {} + pipewire::stream::StreamState::Streaming => {} + pipewire::stream::StreamState::Connecting => {} + pipewire::stream::StreamState::Unconnected => {} + } + } +} + +/// Hardware timestamp from a PipeWire graph cycle. +struct PwTime { + /// CLOCK_MONOTONIC nanoseconds, stamped at the start of the graph cycle. + now_ns: i64, + /// Pipeline delay converted to nanoseconds. + /// For output: how far ahead of the driver our next sample will be played. + /// For input: how long ago the data in the buffer was captured. + delay_ns: i64, +} + +/// Returns a hardware timestamp for the current graph cycle, or `None` if +/// the driver has not started yet or the rate is unavailable. +fn pw_stream_time(stream: &pw::stream::Stream) -> Option<(StreamInstant, PwTime)> { + use pw::sys as pw_sys; + use std::mem; + let mut t: pw_sys::pw_time = unsafe { mem::zeroed() }; + let rc = unsafe { + pw_sys::pw_stream_get_time_n( + stream.as_raw_ptr(), + &mut t, + mem::size_of::(), + ) + }; + if rc != 0 || t.now == 0 || t.rate.denom == 0 { + return None; + } + debug_assert_eq!(t.rate.num, 1, "unexpected pw_time rate.num"); + let delay_ns = t.delay * 1_000_000_000i64 / t.rate.denom as i64; + let callback = crate::StreamInstant::from_nanos(t.now); + Some(( + callback, + PwTime { + now_ns: t.now, + delay_ns, + }, + )) +} + +impl UserData +where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + fn publish_data_in( + &mut self, + stream: &pw::stream::Stream, + frames: usize, + data: &Data, + ) -> Result<(), BackendSpecificError> { + let (callback, capture) = match pw_stream_time(stream) { + Some((cb, PwTime { now_ns, delay_ns })) => { + (cb, crate::StreamInstant::from_nanos(now_ns - delay_ns)) + } + None => { + let cb = stream_timestamp_fallback(self.created_instance)?; + let pl = cb + .sub(frames_to_duration(frames, self.format.rate())) + .ok_or_else(|| BackendSpecificError { + description: + "`capture` occurs beyond representation supported by `StreamInstant`" + .to_string(), + })?; + (cb, pl) + } + }; + let timestamp = crate::InputStreamTimestamp { callback, capture }; + let info = crate::InputCallbackInfo { timestamp }; + (self.data_callback)(data, &info); + Ok(()) + } +} +impl UserData +where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + fn publish_data_out( + &mut self, + stream: &pw::stream::Stream, + frames: usize, + data: &mut Data, + ) -> Result<(), BackendSpecificError> { + let (callback, playback) = match pw_stream_time(stream) { + Some((cb, PwTime { now_ns, delay_ns })) => { + (cb, crate::StreamInstant::from_nanos(now_ns + delay_ns)) + } + None => { + let cb = stream_timestamp_fallback(self.created_instance)?; + let pl = cb + .add(frames_to_duration(frames, self.format.rate())) + .ok_or_else(|| BackendSpecificError { + description: + "`playback` occurs beyond representation supported by `StreamInstant`" + .to_string(), + })?; + (cb, pl) + } + }; + let timestamp = crate::OutputStreamTimestamp { callback, playback }; + let info = crate::OutputCallbackInfo { timestamp }; + (self.data_callback)(data, &info); + Ok(()) + } +} +pub struct StreamData { + pub mainloop: MainLoopRc, + pub listener: StreamListener>, + pub stream: StreamRc, + pub context: ContextRc, +} + +// Use elapsed duration since stream creation as fallback when hardware timestamps are unavailable. +// +// This ensures positive values that are compatible with our `StreamInstant` representation. +#[inline] +fn stream_timestamp_fallback( + creation: std::time::Instant, +) -> Result { + let now = std::time::Instant::now(); + let duration = now.duration_since(creation); + crate::StreamInstant::from_nanos_i128(duration.as_nanos() as i128).ok_or(BackendSpecificError { + description: "stream duration has exceeded `StreamInstant` representation".to_string(), + }) +} + +// Convert the given duration in frames at the given sample rate to a `std::time::Duration`. +#[inline] +fn frames_to_duration(frames: usize, rate: crate::SampleRate) -> std::time::Duration { + let secsf = frames as f64 / rate as f64; + let secs = secsf as u64; + let nanos = ((secsf - secs as f64) * 1_000_000_000.0) as u32; + std::time::Duration::new(secs, nanos) +} + +pub fn connect_output( + config: &StreamConfig, + properties: pw::properties::PropertiesBox, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, +) -> Result, pw::Error> +where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + pw::init(); + let mainloop = pw::main_loop::MainLoopRc::new(None)?; + let context = pw::context::ContextRc::new(&mainloop, None)?; + let core = context.connect_rc(None)?; + + let data = UserData { + data_callback, + error_callback, + sample_format, + format: Default::default(), + created_instance: Instant::now(), + }; + let channels = config.channels as _; + let rate = config.sample_rate as _; + let stream = pw::stream::StreamRc::new(core, "cpal-playback", properties)?; + let listener = stream + .add_local_listener_with_user_data(data) + .param_changed(move|stream, user_data, id, param| { + let Some(param) = param else { + return; + }; + if id != pw::spa::param::ParamType::Format.as_raw() { + return; + } + + let (media_type, media_subtype) = match format_utils::parse_format(param) { + Ok(v) => v, + Err(_) => return, + }; + + // only accept raw audio + if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw { + return; + } + // call a helper function to parse the format for us. + // When the format update, we check the format first, in case it does not fit what we + // set + if user_data.format.parse(param).is_ok() { + let current_channels = user_data.format.channels(); + let current_rate = user_data.format.rate(); + if current_channels != channels || rate != current_rate { + (user_data.error_callback)(StreamError::BackendSpecific { + err: BackendSpecificError { + description: format!("channels or rate is not fit, current channels: {current_channels}, current rate: {current_rate}"), + }, + }); + // if the channels and rate do not match, we stop the stream + if let Err(e) = stream.set_active(false) { + (user_data.error_callback)(StreamError::BackendSpecific { + err: BackendSpecificError { + description: format!("failed to stop the stream, reason: {e}"), + }, + }); + } + } + + } + }) + .state_changed(|_stream, user_data, _old, new| { + user_data.state_changed(new); + }) + .process(|stream, user_data| match stream.dequeue_buffer() { + None => (user_data.error_callback)(StreamError::BufferUnderrun), + Some(mut buffer) => { + // Read the requested frame count before mutably borrowing datas_mut(). + let requested = buffer.requested() as usize; + let datas = buffer.datas_mut(); + if datas.is_empty() { + return; + } + let buf_data = &mut datas[0]; + let n_channels = user_data.format.channels(); + + let stride = user_data.sample_format.sample_size() * n_channels as usize; + // frames = samples / channels or frames = data_len / stride + // Honor the frame count PipeWire requests this cycle, capped by the + // mapped buffer capacity to guard against any mismatch. + let frames = requested.min(buf_data.as_raw().maxsize as usize / stride); + let Some(samples) = buf_data.data() else { + return; + }; + + // set buffers to zero + fill_with_equilibrium(samples, user_data.sample_format); + + // samples = frames * channels or samples = data_len / sample_size + let n_samples = frames * n_channels as usize; + + let data = samples.as_mut_ptr() as *mut (); + let mut data = + unsafe { Data::from_parts(data, n_samples, user_data.sample_format) }; + if let Err(err) = user_data.publish_data_out(stream, frames, &mut data) { + (user_data.error_callback)(StreamError::BackendSpecific { err }); + } + let chunk = buf_data.chunk_mut(); + *chunk.offset_mut() = 0; + *chunk.stride_mut() = stride as i32; + *chunk.size_mut() = (frames * stride) as u32; + } + }) + .register()?; + let mut audio_info = pw::spa::param::audio::AudioInfoRaw::new(); + audio_info.set_format(sample_format.into()); + audio_info.set_rate(rate); + audio_info.set_channels(channels); + + let obj = pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), + id: pw::spa::param::ParamType::EnumFormat.as_raw(), + properties: audio_info.into(), + }; + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(obj), + ) + .unwrap() + .0 + .into_inner(); + + let mut params = [Pod::from_bytes(&values).unwrap()]; + + // TODO: what about RT_PROCESS? + /* Now connect this stream. We ask that our process function is + * called in a realtime thread. */ + stream.connect( + pw::spa::utils::Direction::Output, + None, + pw::stream::StreamFlags::AUTOCONNECT | pw::stream::StreamFlags::MAP_BUFFERS, + &mut params, + )?; + + Ok(StreamData { + mainloop, + listener, + stream, + context, + }) +} +pub fn connect_input( + config: &StreamConfig, + properties: pw::properties::PropertiesBox, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, +) -> Result, pw::Error> +where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, +{ + pw::init(); + let mainloop = pw::main_loop::MainLoopRc::new(None)?; + let context = pw::context::ContextRc::new(&mainloop, None)?; + let core = context.connect_rc(None)?; + + let data = UserData { + data_callback, + error_callback, + sample_format, + format: Default::default(), + created_instance: Instant::now(), + }; + + let channels = config.channels as _; + let rate = config.sample_rate as _; + + let stream = pw::stream::StreamRc::new(core, "cpal-capture", properties)?; + let listener = stream + .add_local_listener_with_user_data(data) + .param_changed(move |_, user_data, id, param| { + let Some(param) = param else { + return; + }; + if id != pw::spa::param::ParamType::Format.as_raw() { + return; + } + + let (media_type, media_subtype) = match format_utils::parse_format(param) { + Ok(v) => v, + Err(_) => return, + }; + + // only accept raw audio + if media_type != MediaType::Audio || media_subtype != MediaSubtype::Raw { + return; + } + + // call a helper function to parse the format for us. + // When the format update, we check the format first, in case it does not fit what we + // set + if user_data.format.parse(param).is_ok() { + let current_channels = user_data.format.channels(); + let current_rate = user_data.format.rate(); + if current_channels != channels || rate != current_rate { + (user_data.error_callback)(StreamError::BackendSpecific { + err: BackendSpecificError { + description: format!("channels or rate is not fit, current channels: {current_channels}, current rate: {current_rate}"), + }, + }); + } + } + }) + .state_changed(|_stream, user_data, _old, new| { + user_data.state_changed(new); + }) + .process(|stream, user_data| match stream.dequeue_buffer() { + None => (user_data.error_callback)(StreamError::BufferUnderrun), + Some(mut buffer) => { + let datas = buffer.datas_mut(); + if datas.is_empty() { + return; + } + let data = &mut datas[0]; + let n_channels = user_data.format.channels(); + let n_samples = data.chunk().size() / user_data.sample_format.sample_size() as u32; + let frames = n_samples / n_channels; + + let Some(samples) = data.data() else { + return; + }; + let data = samples.as_mut_ptr() as *mut (); + let data = + unsafe { Data::from_parts(data, n_samples as usize, user_data.sample_format) }; + if let Err(err) = user_data.publish_data_in(stream, frames as usize, &data) { + (user_data.error_callback)(StreamError::BackendSpecific { err }); + } + } + }) + .register()?; + let mut audio_info = pw::spa::param::audio::AudioInfoRaw::new(); + audio_info.set_format(sample_format.into()); + audio_info.set_rate(rate); + audio_info.set_channels(channels); + + let obj = pw::spa::pod::Object { + type_: pw::spa::utils::SpaTypes::ObjectParamFormat.as_raw(), + id: pw::spa::param::ParamType::EnumFormat.as_raw(), + properties: audio_info.into(), + }; + let values: Vec = pw::spa::pod::serialize::PodSerializer::serialize( + std::io::Cursor::new(Vec::new()), + &pw::spa::pod::Value::Object(obj), + ) + .unwrap() + .0 + .into_inner(); + + let mut params = [Pod::from_bytes(&values).unwrap()]; + + // TODO: what about RT_PROCESS? + /* Now connect this stream. We ask that our process function is + * called in a realtime thread. */ + stream.connect( + pw::spa::utils::Direction::Input, + None, + pw::stream::StreamFlags::AUTOCONNECT | pw::stream::StreamFlags::MAP_BUFFERS, + &mut params, + )?; + + Ok(StreamData { + mainloop, + listener, + stream, + context, + }) +} diff --git a/src/host/pipewire/utils.rs b/src/host/pipewire/utils.rs new file mode 100644 index 000000000..12bea593b --- /dev/null +++ b/src/host/pipewire/utils.rs @@ -0,0 +1,30 @@ +pub const METADATA_NAME: &str = "metadata.name"; +pub const PORT_GROUP: &str = "port.group"; + +// NOTE: the icon name contains bluetooth and etc, not icon-name, but icon_name +// I have tried to get the information, and get +// "device.icon-name": "audio-card-analog", +// "device.icon_name": "video-display", +// So seems the `icon_name` is usable +pub const DEVICE_ICON_NAME: &str = "device.icon_name"; + +pub mod clock { + pub const RATE: &str = "clock.rate"; + pub const ALLOWED_RATES: &str = "clock.allowed-rates"; + pub const QUANTUM: &str = "clock.quantum"; + pub const MIN_QUANTUM: &str = "clock.min-quantum"; + pub const MAX_QUANTUM: &str = "clock.max-quantum"; +} + +pub mod audio { + pub const SINK: &str = "Audio/Sink"; + pub const SOURCE: &str = "Audio/Source"; + pub const DUPLEX: &str = "Audio/Duplex"; + pub const STREAM_OUTPUT: &str = "Stream/Output/Audio"; + pub const STREAM_INPUT: &str = "Stream/Input/Audio"; +} + +pub mod group { + pub const PLAY_BACK: &str = "playback"; + pub const CAPTURE: &str = "capture"; +} diff --git a/src/platform/mod.rs b/src/platform/mod.rs index da00b89fc..2d4715a4b 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -716,11 +716,26 @@ mod platform_impl { #[cfg(feature = "pulseaudio")] pub use crate::host::pulseaudio::Host as PulseAudioHost; + #[cfg(feature = "pipewire")] + #[cfg_attr( + docsrs, + doc(cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pipewire" + ))) + )] + pub use crate::host::pipewire::Host as PipeWireHost; impl_platform_host!( #[cfg(feature = "pulseaudio")] PulseAudio => PulseAudioHost, #[cfg(feature = "jack")] Jack => JackHost, Alsa => AlsaHost, - #[cfg(feature = "custom")] Custom => super::CustomHost + #[cfg(feature = "custom")] Custom => super::CustomHost, + #[cfg(feature = "pipewire")] PipeWire => super::PipeWireHost, ); /// The default host for the current compilation target platform.