diff --git a/doc/examples/ex_owon_sds1104.py b/doc/examples/ex_owon_sds1104.py new file mode 100644 index 00000000..e064ec7c --- /dev/null +++ b/doc/examples/ex_owon_sds1104.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python +""" +Minimal example for connecting to an OWON SDS1104-family oscilloscope. +""" + +import instruments as ik + + +def main(): + """ + Open the scope over raw USB, print a few stable values, and read the + current CH1 screen waveform. + """ + scope = ik.owon.OWONSDS1104.open_usb() + + print(f"Identity: {scope.name}") + print(f"Timebase scale: {scope.timebase_scale}") + print(f"CH1 displayed: {scope.channel[0].display}") + print(f"CH1 coupling: {scope.channel[0].coupling}") + time_s, voltage_v = scope.channel[0].read_waveform() + print(f"CH1 waveform samples: {len(voltage_v)}") + print(f"First sample: t={time_s[0]!r}, v={voltage_v[0]!r}") + + +if __name__ == "__main__": + main() diff --git a/doc/source/apiref/index.rst b/doc/source/apiref/index.rst index 4ad370ae..92654c1e 100644 --- a/doc/source/apiref/index.rst +++ b/doc/source/apiref/index.rst @@ -30,6 +30,7 @@ Contents: newport ondax oxford + owon pfeiffer phasematrix picowatt diff --git a/doc/source/apiref/owon.rst b/doc/source/apiref/owon.rst new file mode 100644 index 00000000..aedc077d --- /dev/null +++ b/doc/source/apiref/owon.rst @@ -0,0 +1,15 @@ +.. + TODO: put documentation license header here. + +.. currentmodule:: instruments.owon + +==== +OWON +==== + +:class:`OWONSDS1104` Oscilloscope +================================= + +.. autoclass:: OWONSDS1104 + :members: + :undoc-members: diff --git a/src/instruments/__init__.py b/src/instruments/__init__.py index 660326bf..499cc4ec 100644 --- a/src/instruments/__init__.py +++ b/src/instruments/__init__.py @@ -29,6 +29,7 @@ from . import minghe from . import newport from . import oxford +from . import owon from . import phasematrix from . import pfeiffer from . import picowatt diff --git a/src/instruments/abstract_instruments/comm/usb_communicator.py b/src/instruments/abstract_instruments/comm/usb_communicator.py index 05c882d4..b4cab32d 100644 --- a/src/instruments/abstract_instruments/comm/usb_communicator.py +++ b/src/instruments/abstract_instruments/comm/usb_communicator.py @@ -112,7 +112,7 @@ def timeout(self): @timeout.setter def timeout(self, newval): newval = assume_units(newval, u.second).to(u.ms).magnitude - self._dev.default_timeout = newval + self._dev.default_timeout = int(round(newval)) # FILE-LIKE METHODS # @@ -136,7 +136,7 @@ def read_raw(self, size=-1): if size == -1: size = self._max_packet_size term = self._terminator.encode("utf-8") - read_val = bytes(self._ep_in.read(size)) + read_val = self.read_packet(size) if term not in read_val: raise OSError( f"Did not find the terminator in the returned string. " @@ -144,6 +144,75 @@ def read_raw(self, size=-1): ) return read_val.rstrip(term) + def read_packet(self, size=-1): + """ + Read a single raw USB packet without interpreting terminators. + + :param int size: Number of bytes requested from the USB endpoint. + A value of ``-1`` reads one full endpoint packet. + :rtype: `bytes` + """ + if size == -1: + size = self._max_packet_size + return bytes(self._ep_in.read(size)) + + def read_exact(self, size, chunk_size=None): + """ + Read exactly ``size`` raw bytes from the USB endpoint. + + :param int size: Total number of bytes to read. + :param int chunk_size: Optional packet request size to use for each + underlying endpoint read. + :rtype: `bytes` + """ + if size < 0: + raise ValueError("Size must be non-negative.") + if chunk_size is None: + chunk_size = self._max_packet_size + if chunk_size <= 0: + raise ValueError("Chunk size must be positive.") + + result = bytearray() + while len(result) < size: + packet = self.read_packet(min(chunk_size, size - len(result))) + result.extend(packet) + return bytes(result) + + def read_binary(self, size=-1): + """ + Read raw binary data without looking for a terminator. + + If ``size`` is negative, this reads packets until a short packet or a + USB timeout indicates the transfer has completed. If ``size`` is + non-negative, this reads exactly that many bytes. + + :param int size: Number of bytes to read, or ``-1`` to read until the + transfer completes. + :rtype: `bytes` + """ + if size >= 0: + return self.read_exact(size) + + result = bytearray() + while True: + try: + packet = self.read_packet() + except usb.core.USBTimeoutError: + if result: + break + raise + except usb.core.USBError as exc: + if result and "timeout" in str(exc).lower(): + break + raise + + if not packet: + break + result.extend(packet) + if len(packet) < self._max_packet_size: + break + return bytes(result) + def write_raw(self, msg): """Write bytes to the raw usb connection object. @@ -152,10 +221,10 @@ def write_raw(self, msg): """ self._ep_out.write(msg) - def seek(self, offset): # pylint: disable=unused-argument,no-self-use + def seek(self, offset): # pylint: disable=unused-argument raise NotImplementedError - def tell(self): # pylint: disable=no-self-use + def tell(self): raise NotImplementedError def flush_input(self): @@ -163,7 +232,22 @@ def flush_input(self): Instruct the communicator to flush the input buffer, discarding the entirety of its contents. """ - self._ep_in.read(self._max_packet_size) + original_timeout = self._dev.default_timeout + self._dev.default_timeout = 50 + try: + while True: + try: + packet = self.read_packet() + except usb.core.USBTimeoutError: + break + except usb.core.USBError as exc: + if "timeout" in str(exc).lower(): + break + raise + if not packet: + break + finally: + self._dev.default_timeout = original_timeout # METHODS # diff --git a/src/instruments/owon/__init__.py b/src/instruments/owon/__init__.py new file mode 100644 index 00000000..cc18e775 --- /dev/null +++ b/src/instruments/owon/__init__.py @@ -0,0 +1,10 @@ +#!/usr/bin/env python +""" +Module containing OWON instruments. +""" + +from .sds1104 import ( + OWONSDS1104, + SDS1104DeepMemoryCapture, + SDS1104SavedWaveformEntry, +) diff --git a/src/instruments/owon/sds1104.py b/src/instruments/owon/sds1104.py new file mode 100644 index 00000000..3594c168 --- /dev/null +++ b/src/instruments/owon/sds1104.py @@ -0,0 +1,1816 @@ +#!/usr/bin/env python +""" +Provides support for the OWON SDS1104 oscilloscope family. +""" + +# pylint: disable=too-many-lines + +# IMPORTS ##################################################################### + + +from dataclasses import dataclass +from enum import Enum +import json +import re +import struct +from typing import Any + +import usb.core + +from instruments.abstract_instruments import Oscilloscope +from instruments.abstract_instruments.comm import USBCommunicator +from instruments.generic_scpi import SCPIInstrument +from instruments.optional_dep_finder import numpy +from instruments.units import ureg as u +from instruments.util_fns import ProxyList, assume_units + +# HELPERS ##################################################################### + + +_TIME_UNITS = { + "ns": 1e-9, + "us": 1e-6, + "ms": 1e-3, + "s": 1.0, +} + +_VERTICAL_UNITS = { + "mv": 1e-3, + "v": 1.0, + "kv": 1e3, +} + +_MEASUREMENT_UNITS = { + "uv": 1e-6, + "mv": 1e-3, + "v": 1.0, + "kv": 1e3, + "uvs": 1e-6, + "mvs": 1e-3, + "vs": 1.0, + "ns": 1e-9, + "us": 1e-6, + "ms": 1e-3, + "s": 1.0, + "hz": 1.0, + "khz": 1e3, + "mhz": 1e6, + "ghz": 1e9, + "%": 1.0, +} + +_TIMEBASE_TOKENS = { + 1.0e-9: "1.0ns", + 2.0e-9: "2.0ns", + 5.0e-9: "5.0ns", + 10e-9: "10ns", + 20e-9: "20ns", + 50e-9: "50ns", + 100e-9: "100ns", + 200e-9: "200ns", + 500e-9: "500ns", + 1e-6: "1us", + 2e-6: "2us", + 5e-6: "5us", + 10e-6: "10us", + 20e-6: "20us", + 50e-6: "50us", + 100e-6: "100us", + 200e-6: "200us", + 500e-6: "500us", + 1e-3: "1ms", + 2e-3: "2ms", + 5e-3: "5ms", + 10e-3: "10ms", + 20e-3: "20ms", + 50e-3: "50ms", + 100e-3: "100ms", + 200e-3: "200ms", + 500e-3: "500ms", + 1.0: "1s", + 2.0: "2s", + 5.0: "5s", + 10.0: "10s", + 20.0: "20s", + 50.0: "50s", + 100.0: "100s", +} + +_VERTICAL_SCALE_TOKENS = { + 2e-3: "2mv", + 5e-3: "5mv", + 10e-3: "10mv", + 20e-3: "20mv", + 50e-3: "50mv", + 100e-3: "100mv", + 200e-3: "200mv", + 500e-3: "500mv", + 1.0: "1v", + 2.0: "2v", + 5.0: "5v", + 10.0: "10v", +} + +_MEMORY_DEPTH_TOKENS = { + 1_000: "1K", + 5_000: "5K", + 10_000: "10K", + 100_000: "100K", + 1_000_000: "1M", + 10_000_000: "10M", +} + +_MEASUREMENT_VALUE_RE = re.compile( + r"(?P[-+]?\d+(?:\.\d*)?(?:[eE][-+]?\d+)?)\s*(?P[A-Za-z%*]+)?\s*$" +) +_MEASUREMENT_KV_RE = re.compile(r'"(?P[A-Za-z0-9]+)"\s*:\s*"(?P[^"\r\n]*)"') +_MEASUREMENT_CHANNEL_BLOCK_RE = re.compile( + r'"CH(?P\d+)"\s*:\s*\{(?P.*?)\}(?=,\s*"CH\d+"\s*:|\s*\}\s*$)', + re.DOTALL, +) + + +def _clean_reply(reply): + """ + Normalizes a DOS1104 text reply. + """ + text = reply.strip() + if text.endswith("->"): + text = text[:-2].rstrip() + return text + + +def _strip_packet_prefix(payload, field_name): + """ + Strips the four-byte SDS1104 binary packet prefix. + """ + if len(payload) < 4: + raise ValueError(f"{field_name} payload is too short.") + return payload[4:] + + +def _parse_bool(reply, field_name): + """ + Parses a boolean-like reply. + """ + cleaned = _clean_reply(reply).upper() + if cleaned in {"ON", "1"}: + return True + if cleaned in {"OFF", "0"}: + return False + raise ValueError(f"Invalid {field_name} reply: {reply!r}") + + +def _parse_float(reply, field_name): + """ + Parses a float reply. + """ + cleaned = _clean_reply(reply) + try: + return float(cleaned) + except ValueError as exc: + raise ValueError(f"Invalid {field_name} reply: {reply!r}") from exc + + +def _parse_quantity_token(token, units_map, field_name): + """ + Parses a quantity token like ``100mV`` or ``1ms``. + """ + cleaned = _clean_reply(token).strip().lower() + for suffix, scale in units_map.items(): + if cleaned.endswith(suffix): + magnitude = cleaned[: -len(suffix)] + try: + return float(magnitude) * scale + except ValueError as exc: + raise ValueError(f"Invalid {field_name} reply: {token!r}") from exc + raise ValueError(f"Invalid {field_name} reply: {token!r}") + + +def _parse_timebase_token(token): + """ + Parses a timebase token to seconds per division. + """ + return _parse_quantity_token(token, _TIME_UNITS, "timebase") + + +def _parse_vertical_scale_token(token): + """ + Parses a vertical scale token to volts per division. + """ + return _parse_quantity_token(token, _VERTICAL_UNITS, "vertical scale") + + +def _parse_probe_token(token): + """ + Parses a probe token such as ``10X`` or ``X10``. + """ + cleaned = _clean_reply(token).upper() + if cleaned.startswith("X"): + cleaned = cleaned[1:] + elif cleaned.endswith("X"): + cleaned = cleaned[:-1] + try: + value = int(cleaned) + except ValueError as exc: + raise ValueError(f"Invalid probe reply: {token!r}") from exc + if value not in {1, 10, 100, 1000}: + raise ValueError(f"Invalid probe reply: {token!r}") + return value + + +def _format_probe_token(value): + """ + Formats a probe attenuation token. + """ + if isinstance(value, str): + value = _parse_probe_token(value) + if value not in {1, 10, 100, 1000}: + raise ValueError("Probe attenuation must be one of 1, 10, 100, or 1000.") + return f"{value}X" + + +def _parse_memory_depth_token(token): + """ + Parses a memory depth token. + """ + cleaned = _clean_reply(token).upper() + if cleaned.endswith("K"): + return int(float(cleaned[:-1]) * 1000) + if cleaned.endswith("M"): + return int(float(cleaned[:-1]) * 1_000_000) + return int(cleaned) + + +def _parse_measurement_token(token, field_name): + """ + Parses a scalar measurement token. + """ + cleaned = _clean_reply(token).strip() + if not cleaned or cleaned == "?": + raise ValueError(f"Invalid {field_name} reply: {token!r}") + + match = _MEASUREMENT_VALUE_RE.search(cleaned) + if match is None: + raise ValueError(f"Invalid {field_name} reply: {token!r}") + + value = float(match.group("value")) + unit = (match.group("unit") or "").replace("*", "").lower() + if not unit: + return value + if unit not in _MEASUREMENT_UNITS: + raise ValueError(f"Invalid {field_name} reply: {token!r}") + return value * _MEASUREMENT_UNITS[unit] + + +def _parse_measurement_count(token, field_name): + """ + Parses a count-like measurement reply. + """ + return int(round(_parse_measurement_token(token, field_name))) + + +def _parse_json_payload(payload, field_name): + """ + Parses an SDS1104 binary JSON payload. + """ + text = _strip_packet_prefix(payload, field_name).decode("utf-8", errors="replace") + try: + parsed = json.loads(text.strip()) + except json.JSONDecodeError as exc: + raise ValueError(f"Invalid {field_name} payload: {text!r}") from exc + if not isinstance(parsed, dict): + raise ValueError(f"Invalid {field_name} payload: {text!r}") + return parsed + + +def _sanitize_json_text(text): + """ + Replaces control characters with spaces before JSON parsing. + """ + return "".join(character if ord(character) >= 0x20 else " " for character in text) + + +def _parse_json_array_payload(payload, field_name): + """ + Parses a length-prefixed JSON array payload. + """ + text = _strip_packet_prefix(payload, field_name).decode("utf-8", errors="replace") + text = _sanitize_json_text(text).strip() + if text == "[?]": + return [] + text = re.sub(r",\s*}", "}", text) + text = re.sub(r",\s*]", "]", text) + try: + parsed = json.loads(text) + except json.JSONDecodeError as exc: + raise ValueError(f"Invalid {field_name} payload: {text!r}") from exc + if not isinstance(parsed, list): + raise ValueError(f"Invalid {field_name} payload: {text!r}") + for item in parsed: + if not isinstance(item, dict): + raise ValueError(f"Invalid {field_name} entry: {item!r}") + return parsed + + +def _parse_measurement_payload(payload, channel): + """ + Parses a single-channel measurement blob payload. + """ + text = _strip_packet_prefix(payload, "measurement data").decode( + "utf-8", errors="replace" + ) + text = _sanitize_json_text(text).strip() + try: + parsed = json.loads(text) + except json.JSONDecodeError as exc: + pairs = { + match.group("key"): match.group("value") + for match in _MEASUREMENT_KV_RE.finditer(text) + if match.group("key") != f"CH{channel}" + } + if not pairs: + raise ValueError( + f"Invalid CH{channel} measurement data payload: {text!r}" + ) from exc + return pairs + + nested = parsed.get(f"CH{channel}") + if isinstance(nested, dict): + return { + str(key): "" if value is None else str(value) + for key, value in nested.items() + } + if not isinstance(parsed, dict): + raise ValueError(f"Invalid CH{channel} measurement data payload: {text!r}") + return { + str(key): "" if value is None else str(value) for key, value in parsed.items() + } + + +def _parse_measurement_map_payload(payload): + """ + Parses an all-channel measurement blob payload. + """ + text = _strip_packet_prefix(payload, "all-channel measurement data").decode( + "utf-8", errors="replace" + ) + text = _sanitize_json_text(text).strip() + try: + parsed = json.loads(text) + except json.JSONDecodeError as exc: + channel_map = {} + for match in _MEASUREMENT_CHANNEL_BLOCK_RE.finditer(text): + channel = int(match.group("channel")) + channel_map[channel] = { + kv_match.group("key"): kv_match.group("value") + for kv_match in _MEASUREMENT_KV_RE.finditer(match.group("body")) + } + if not channel_map: + raise ValueError( + f"Invalid all-channel measurement payload: {text!r}" + ) from exc + return channel_map + + channel_map = {} + if not isinstance(parsed, dict): + raise ValueError(f"Invalid all-channel measurement payload: {text!r}") + for key, value in parsed.items(): + if ( + not isinstance(key, str) + or not key.startswith("CH") + or not isinstance(value, dict) + ): + continue + try: + channel = int(key[2:]) + except ValueError: + continue + channel_map[channel] = { + str(item_key): "" if item_value is None else str(item_value) + for item_key, item_value in value.items() + } + if not channel_map: + raise ValueError(f"Invalid all-channel measurement payload: {text!r}") + return channel_map + + +def _parse_sample_rate(token): + """ + Parses a sample rate token such as ``1MS/s``. + """ + cleaned = _clean_reply(token).strip().lower() + units = { + "ks/s": 1e3, + "ms/s": 1e6, + "gs/s": 1e9, + } + for suffix, scale in units.items(): + if cleaned.endswith(suffix): + magnitude = cleaned[: -len(suffix)] + try: + return float(magnitude) * scale + except ValueError as exc: + raise ValueError(f"Invalid sample rate reply: {token!r}") from exc + raise ValueError(f"Invalid sample rate reply: {token!r}") + + +def _parse_waveform_adc(raw_bytes, field_name): + """ + Parses little-endian signed 16-bit ADC samples. + """ + if len(raw_bytes) % 2 != 0: + raise ValueError( + f"{field_name} payload length is not 16-bit aligned: {len(raw_bytes)}" + ) + if numpy is not None: + return numpy.frombuffer(raw_bytes, dtype="`` prompt. + """ + + def __init__(self, dev): + super().__init__(dev) + self._terminator = "" + + def _sendcmd(self, msg): + self.write(msg, encoding="ascii") + + def _query(self, msg, size=-1): + self._sendcmd(msg) + return self.read_binary(size).decode("utf-8") + + +@dataclass(frozen=True) +class SDS1104DeepMemoryCapture: + """ + Parsed deep-memory bundle returned by ``:DATA:WAVE:DEPMem:All?``. + """ + + metadata: dict[str, Any] + raw_channels: dict[int, Any] + + +@dataclass(frozen=True) +class SDS1104SavedWaveformEntry: + """ + Saved-waveform index entry returned by ``:SAVE:READ:HEAD?``. + """ + + index: str + raw: dict[str, Any] + + +class OWONSDS1104( + SCPIInstrument, Oscilloscope +): # pylint: disable=too-many-public-methods + """ + Conservative driver for the OWON SDS1104 oscilloscope family. + + This driver targets the text-based raw-USB control surface shared by the + OWON SDS1104 and compatible HANMATEK DOS1104 units. The public API covers + stable control, scalar measurements, measurement blobs, screen-waveform + retrieval, BMP capture, deep-memory capture, and saved-waveform access. + + Example usage: + + >>> import instruments as ik + >>> scope = ik.owon.OWONSDS1104.open_usb() + >>> scope.name + 'OWON,SDS1104,...' + >>> scope.channel[0].display + True + """ + + DEFAULT_USB_VID = 0x5345 + DEFAULT_USB_PID = 0x1234 + + class AcquisitionMode(Enum): + """ + Acquisition modes supported by the SDS1104 family. + """ + + # pylint: disable=invalid-name + + sample = "SAMPle" + average = "AVERage" + peak_detect = "PEAK" + + class Coupling(Enum): + """ + Input coupling modes for SDS1104 channels. + """ + + # pylint: disable=invalid-name + + ac = "AC" + dc = "DC" + ground = "GND" + + class TriggerStatus(Enum): + """ + Acquisition / trigger status values reported by ``:TRIGger:STATUS?``. + """ + + # pylint: disable=invalid-name + + auto = "AUTO" + ready = "READY" + trig = "TRIG" + scan = "SCAN" + stop = "STOP" + + class TriggerMode(Enum): + """ + General trigger modes supported by the verified SDS1104 API surface. + """ + + # pylint: disable=invalid-name + + edge = "EDGE" + video = "VIDEO" + + class TriggerSource(Enum): + """ + Edge-trigger sources. + """ + + # pylint: disable=invalid-name + + ch1 = "CH1" + ch2 = "CH2" + ch3 = "CH3" + ch4 = "CH4" + + class TriggerCoupling(Enum): + """ + Edge-trigger coupling modes verified on hardware. + """ + + # pylint: disable=invalid-name + + ac = "AC" + dc = "DC" + + class TriggerSlope(Enum): + """ + Edge-trigger slope modes verified on hardware. + """ + + # pylint: disable=invalid-name + + rise = "RISE" + fall = "FALL" + + class DataSource(Oscilloscope.DataSource): + """ + Represents a non-waveform SDS1104 data source. + + Only physical channels support waveform transfer in the initial + driver. + """ + + @property + def name(self): + return self._name + + def read_waveform(self, bin_format=True): # pylint: disable=unused-argument + raise NotImplementedError( + "Waveform transfer is only supported for physical SDS1104 channels." + ) + + class Channel(DataSource, Oscilloscope.Channel): + """ + Class representing a physical channel on the SDS1104. + """ + + def __init__(self, parent, idx): + self._parent = parent + self._idx = idx + 1 + super().__init__(parent, f"CH{self._idx}") + + def sendcmd(self, cmd): + """ + Sends a channel-scoped command. + """ + self._parent.sendcmd(f":CH{self._idx}:{cmd}") + + def query(self, cmd): + """ + Queries a channel-scoped command. + """ + return self._parent.query(f":CH{self._idx}:{cmd}") + + @property + def display(self): + """ + Gets/sets whether the channel is displayed. + + :type: `bool` + """ + return _parse_bool(self.query("DISP?"), "channel display state") + + @display.setter + def display(self, newval): + if not isinstance(newval, bool): + raise TypeError("Display state must be specified with a boolean value.") + self.sendcmd(f"DISP {'ON' if newval else 'OFF'}") + + @property + def coupling(self): + """ + Gets/sets the channel coupling mode. + + :type: `OWONSDS1104.Coupling` + """ + return OWONSDS1104.Coupling(_clean_reply(self.query("COUP?")).upper()) + + @coupling.setter + def coupling(self, newval): + if not isinstance(newval, OWONSDS1104.Coupling): + raise TypeError( + "Coupling setting must be a `OWONSDS1104.Coupling` value." + ) + self.sendcmd(f"COUP {newval.value}") + + @property + def probe_attenuation(self): + """ + Gets/sets the configured probe attenuation. + + :type: `int` + """ + return _parse_probe_token(self.query("PROB?")) + + @probe_attenuation.setter + def probe_attenuation(self, newval): + self.sendcmd(f"PROB {_format_probe_token(newval)}") + + @property + def scale(self): + """ + Gets/sets the vertical scale in volts per division. + + :type: `~pint.Quantity` + """ + return u.Quantity(_parse_vertical_scale_token(self.query("SCAL?")), u.volt) + + @scale.setter + def scale(self, newval): + token = _format_discrete_quantity( + newval, u.volt, _VERTICAL_SCALE_TOKENS, "vertical scale" + ) + self.sendcmd(f"SCAL {token}") + + @property + def offset(self): + """ + Gets/sets the vertical offset in volts. + + :type: `~pint.Quantity` + """ + return u.Quantity(_parse_float(self.query("OFFS?"), "offset"), u.volt) + + @offset.setter + def offset(self, newval): + newval = assume_units(newval, u.volt).to(u.volt) + self.sendcmd(f"OFFS {newval.magnitude}") + + @property + def position(self): + """ + Gets/sets the vertical channel position in divisions. + + :type: `float` + """ + return _parse_float(self.query("POS?"), "position") + + @position.setter + def position(self, newval): + self.sendcmd(f"POS {float(newval)}") + + @property + def invert(self): + """ + Gets/sets whether the channel waveform is inverted. + + :type: `bool` + """ + return _parse_bool(self.query("INVErse?"), "channel invert state") + + @invert.setter + def invert(self, newval): + if not isinstance(newval, bool): + raise TypeError("Invert state must be specified with a boolean value.") + self.sendcmd(f"INVErse {'ON' if newval else 'OFF'}") + + def measure_frequency(self): + """ + Measures the channel frequency. + + :rtype: `~pint.Quantity` + """ + return self._parent.measure_frequency(self._idx) + + def measure_period(self): + """ + Measures the channel period. + + :rtype: `~pint.Quantity` + """ + return self._parent.measure_period(self._idx) + + def measure_peak_to_peak(self): + """ + Measures the channel peak-to-peak voltage. + + :rtype: `~pint.Quantity` + """ + return self._parent.measure_peak_to_peak(self._idx) + + def measure_rms(self): + """ + Measures the channel cycle RMS voltage. + + :rtype: `~pint.Quantity` + """ + return self._parent.measure_rms(self._idx) + + def measure_average(self): + """ + Measures the channel average voltage. + """ + return self._parent.measure_average(self._idx) + + def measure_maximum(self): + """ + Measures the channel maximum voltage. + """ + return self._parent.measure_maximum(self._idx) + + def measure_minimum(self): + """ + Measures the channel minimum voltage. + """ + return self._parent.measure_minimum(self._idx) + + def read_measurement_data(self, long_form=False): + """ + Reads the measurement blob for this channel. + """ + return self._parent.read_measurement_data(self._idx, long_form=long_form) + + def read_waveform(self, bin_format=True): + """ + Reads the current screen waveform for this channel. + """ + if not bin_format: + raise NotImplementedError( + "The OWON SDS1104 driver currently supports binary " + "waveform transfer only." + ) + return self._parent.read_waveform(self._idx) + + def read_deep_memory(self): + """ + Reads the deep-memory waveform for this channel. + """ + return self._parent.read_deep_memory_channel(self._idx) + + def __init__(self, filelike): + super().__init__(filelike) + self._file.timeout = 1 * u.second + + @classmethod + def open_usb(cls, vid=DEFAULT_USB_VID, pid=DEFAULT_USB_PID, timeout=1 * u.second): + """ + Opens an SDS1104-family scope using the default raw USB VID/PID. + + A best-effort OWON-family SCPI enable handshake is attempted after the + communicator is opened. + """ + dev = usb.core.find(idVendor=vid, idProduct=pid) + if dev is None: + raise OSError("No such device found.") + + inst = cls(_OWONPromptUSBCommunicator(dev)) + inst.timeout = assume_units(timeout, u.second) + inst._enable_scpi_mode() + return inst + + def _enable_scpi_mode(self): + """ + Best-effort OWON-family SCPI enable handshake. + """ + try: + self._file.write_raw(b":SDSLSCPI#") + return _clean_reply(self._file.read()) == ":SCPION" + except OSError: + return False + + def _binary_query(self, command): + """ + Sends a raw USB command and reads a binary reply. + """ + self._file.write_raw(command.encode("ascii")) + if not hasattr(self._file, "read_binary"): + raise NotImplementedError( + "Binary waveform support requires a communicator that " + "implements read_binary()." + ) + return self._file.read_binary() + + def _binary_query_exact(self, command, size): + """ + Sends a raw USB command and reads an exact-size binary reply. + """ + self._file.write_raw(command.encode("ascii")) + if not hasattr(self._file, "read_exact"): + raise NotImplementedError( + "Binary waveform support requires a communicator that " + "implements read_exact()." + ) + return self._file.read_exact(size) + + def _query_length_prefixed_binary(self, command, max_body_size=20_000_000): + """ + Sends a raw USB command and reads a little-endian length-prefixed body. + """ + self._file.write_raw(command.encode("ascii")) + if not hasattr(self._file, "read_exact"): + raise NotImplementedError( + "Length-prefixed binary support requires a communicator that " + "implements read_exact()." + ) + + header = self._file.read_exact(4) + if len(header) < 4: + raise ValueError(f"Length-prefixed reply too short for {command!r}.") + + body_size = int.from_bytes(header, byteorder="little", signed=False) + if body_size <= 0: + raise ValueError( + f"Invalid length-prefixed body size for {command!r}: {body_size}" + ) + if body_size > max_body_size: + raise ValueError( + f"Length-prefixed body for {command!r} exceeds safety limit: " + f"{body_size}" + ) + return header + self._file.read_exact(body_size) + + def _waveform_metadata(self): + """ + Reads the screen-waveform metadata block. + """ + return _parse_json_payload( + self._binary_query(":DATA:WAVE:SCREen:HEAD?"), "waveform metadata" + ) + + def read_waveform_metadata(self): + """ + Reads the screen-waveform metadata JSON. + + :rtype: `dict` + """ + return self._waveform_metadata() + + def _extract_channel_metadata(self, metadata, channel): + channels = metadata.get("CHANNEL") + if not isinstance(channels, list) or channel - 1 >= len(channels): + raise ValueError( + f"Metadata does not contain channel {channel}: {metadata!r}" + ) + channel_metadata = channels[channel - 1] + if not isinstance(channel_metadata, dict): + raise ValueError( + f"Invalid channel metadata for CH{channel}: {channel_metadata!r}" + ) + return channel_metadata + + def _sample_rate_hz(self, metadata): + sample = metadata.get("SAMPLE") + if not isinstance(sample, dict): + raise ValueError(f"Metadata missing SAMPLE block: {metadata!r}") + return _parse_sample_rate(str(sample["SAMPLERATE"])) + + def _waveform_point_count(self, metadata): + sample = metadata.get("SAMPLE") + if not isinstance(sample, dict): + raise ValueError(f"Metadata missing SAMPLE block: {metadata!r}") + return int(sample["DATALEN"]) + + def _horizontal_offset_pixels(self, metadata): + timebase = metadata.get("TIMEBASE") + if not isinstance(timebase, dict): + raise ValueError(f"Metadata missing TIMEBASE block: {metadata!r}") + return int(timebase["HOFFSET"]) + + def _vertical_scale_v_div(self, metadata, channel): + channel_metadata = self._extract_channel_metadata(metadata, channel) + return _parse_vertical_scale_token(str(channel_metadata["SCALE"])) + + def _vertical_offset_pixels(self, metadata, channel): + channel_metadata = self._extract_channel_metadata(metadata, channel) + return int(channel_metadata["OFFSET"]) + + def _probe_attenuation(self, metadata, channel): + channel_metadata = self._extract_channel_metadata(metadata, channel) + return _parse_probe_token(str(channel_metadata["PROBE"])) + + def _waveform_time_axis(self, metadata, point_count): + sample_rate = self._sample_rate_hz(metadata) + sample_time = 5.0 / sample_rate + horizontal_offset = self._horizontal_offset_pixels(metadata) + time_offset = -1.0 * horizontal_offset * 2.0 * sample_time + if numpy is not None: + indices = numpy.arange(point_count, dtype=float) + return (indices - point_count / 2.0) * sample_time - time_offset + return tuple( + (index - point_count / 2.0) * sample_time - time_offset + for index in range(point_count) + ) + + def _waveform_voltage_axis(self, metadata, channel, raw_adc): + vertical_offset = self._vertical_offset_pixels(metadata, channel) + volts_per_div = self._vertical_scale_v_div(metadata, channel) + probe = self._probe_attenuation(metadata, channel) + if numpy is not None and isinstance(raw_adc, numpy.ndarray): + return ( + volts_per_div + * probe + * (raw_adc.astype(float) - vertical_offset * 8.25) + / 410.0 + ) + return tuple( + volts_per_div * probe * (sample - vertical_offset * 8.25) / 410.0 + for sample in raw_adc + ) + + @property + def name(self): + """ + The cleaned instrument identity string reported by ``*IDN?``. + """ + return _clean_reply(super().name) + + @property + def channel(self): + """ + Gets the SDS1104 channel proxy list. + """ + return ProxyList(self, self.Channel, range(4)) + + @property + def ref(self): + """ + Gets reference data-source objects. + """ + return ProxyList( + self, lambda scope, idx: self.DataSource(scope, f"REF{idx + 1}"), range(4) + ) + + @property + def math(self): + """ + Gets the math data-source object. + """ + return self.DataSource(self, "MATH") + + @property + def acquire_mode(self): + """ + Gets/sets the acquisition mode. + + :type: `OWONSDS1104.AcquisitionMode` + """ + reply = _clean_reply(self.query(":ACQUire:Mode?")).upper() + if reply.startswith("SAMP"): + return self.AcquisitionMode.sample + if reply.startswith("AVER"): + return self.AcquisitionMode.average + if reply.startswith("PEAK"): + return self.AcquisitionMode.peak_detect + raise ValueError(f"Invalid acquisition mode reply: {reply!r}") + + @acquire_mode.setter + def acquire_mode(self, newval): + if not isinstance(newval, self.AcquisitionMode): + raise TypeError( + 'Acquisition mode must be one of "SAMPle", "AVERage", or "PEAK".' + ) + self.sendcmd(f":ACQUire:Mode {newval.value}") + + @property + def acquire_averages(self): + """ + Gets/sets the acquisition average count. + + :type: `int` + """ + return int(_clean_reply(self.query(":ACQUire:average:num?"))) + + @acquire_averages.setter + def acquire_averages(self, newval): + if newval not in {4, 16, 64, 128}: + raise ValueError( + "Average count not supported by instrument; must be one of " + "{4, 16, 64, 128}." + ) + self.sendcmd(f":ACQUire:average:num {int(newval)}") + + @property + def memory_depth(self): + """ + Gets/sets the acquisition memory depth. + + :type: `int` + """ + return _parse_memory_depth_token(self.query(":ACQUIRE:DEPMEM?")) + + @memory_depth.setter + def memory_depth(self, newval): + if newval not in _MEMORY_DEPTH_TOKENS: + raise ValueError( + "Memory depth must be one of 1K, 5K, 10K, 100K, 1M, or 10M. " + "20M and 40M are documented, but are not yet verified in this driver." + ) + self.sendcmd(f":ACQUIRE:DEPMEM {_MEMORY_DEPTH_TOKENS[int(newval)]}") + + @property + def timebase_scale(self): + """ + Gets/sets the horizontal scale in seconds per division. + + :type: `~pint.Quantity` + """ + seconds = _parse_timebase_token(self.query(":HORIzontal:Scale?")) + return u.Quantity(seconds, u.second) + + @timebase_scale.setter + def timebase_scale(self, newval): + token = _format_discrete_quantity( + newval, u.second, _TIMEBASE_TOKENS, "timebase scale" + ) + self.sendcmd(f":HORIzontal:Scale {token}") + + @property + def trigger_status(self): + """ + Gets the current trigger / acquisition status. + + :type: `OWONSDS1104.TriggerStatus` + """ + reply = _clean_reply(self.query(":TRIGger:STATUS?")).upper() + try: + return self.TriggerStatus(reply) + except ValueError as exc: + raise ValueError(f"Invalid trigger status reply: {reply!r}") from exc + + @property + def trigger_mode(self): + """ + Gets/sets the current trigger mode. + + :type: `OWONSDS1104.TriggerMode` + """ + reply = _clean_reply(self.query(":TRIGger:SINGle:MODE?")).upper() + try: + return self.TriggerMode(reply) + except ValueError as exc: + raise ValueError(f"Invalid trigger mode reply: {reply!r}") from exc + + @trigger_mode.setter + def trigger_mode(self, newval): + if not isinstance(newval, self.TriggerMode): + raise TypeError( + "Trigger mode must be specified with a " + "`OWONSDS1104.TriggerMode` value." + ) + self.sendcmd(f":TRIGger:SINGle:MODE {newval.value}") + + def _require_edge_trigger_mode(self): + mode = self.trigger_mode + if mode != self.TriggerMode.edge: + raise NotImplementedError( + "Trigger source, coupling, slope, and level are only exposed " + "for EDGE trigger mode in this driver." + ) + + @property + def trigger_source(self): + """ + Gets/sets the edge-trigger source. + + This property is only available when ``trigger_mode`` is ``EDGE``. + + :type: `OWONSDS1104.TriggerSource` + """ + self._require_edge_trigger_mode() + reply = _clean_reply(self.query(":TRIGger:SINGle:EDGE:SOURce?")).upper() + try: + return self.TriggerSource(reply) + except ValueError as exc: + raise ValueError(f"Invalid trigger source reply: {reply!r}") from exc + + @trigger_source.setter + def trigger_source(self, newval): + self._require_edge_trigger_mode() + if not isinstance(newval, self.TriggerSource): + raise TypeError( + "Trigger source must be specified with a " + "`OWONSDS1104.TriggerSource` value." + ) + self.sendcmd(f":TRIGger:SINGle:EDGE:SOURce {newval.value}") + + @property + def trigger_coupling(self): + """ + Gets/sets the edge-trigger coupling. + + This property is only available when ``trigger_mode`` is ``EDGE``. + + :type: `OWONSDS1104.TriggerCoupling` + """ + self._require_edge_trigger_mode() + reply = _clean_reply(self.query(":TRIGger:SINGle:EDGE:COUPling?")).upper() + try: + return self.TriggerCoupling(reply) + except ValueError as exc: + raise ValueError(f"Invalid trigger coupling reply: {reply!r}") from exc + + @trigger_coupling.setter + def trigger_coupling(self, newval): + self._require_edge_trigger_mode() + if not isinstance(newval, self.TriggerCoupling): + raise TypeError( + "Trigger coupling must be specified with a " + "`OWONSDS1104.TriggerCoupling` value." + ) + self.sendcmd(f":TRIGger:SINGle:EDGE:COUPling {newval.value}") + + @property + def trigger_slope(self): + """ + Gets/sets the edge-trigger slope. + + This property is only available when ``trigger_mode`` is ``EDGE``. + + :type: `OWONSDS1104.TriggerSlope` + """ + self._require_edge_trigger_mode() + reply = _clean_reply(self.query(":TRIGger:SINGle:EDGE:SLOPe?")).upper() + try: + return self.TriggerSlope(reply) + except ValueError as exc: + raise ValueError(f"Invalid trigger slope reply: {reply!r}") from exc + + @trigger_slope.setter + def trigger_slope(self, newval): + self._require_edge_trigger_mode() + if not isinstance(newval, self.TriggerSlope): + raise TypeError( + "Trigger slope must be specified with a " + "`OWONSDS1104.TriggerSlope` value." + ) + self.sendcmd(f":TRIGger:SINGle:EDGE:SLOPe {newval.value}") + + @property + def trigger_level(self): + """ + Gets/sets the edge-trigger level. + + This property is only available when ``trigger_mode`` is ``EDGE``. + + :type: `~pint.Quantity` + """ + self._require_edge_trigger_mode() + value = _parse_measurement_token( + self.query(":TRIGger:SINGle:EDGE:LEVel?"), "trigger level" + ) + return u.Quantity(value, u.volt) + + @trigger_level.setter + def trigger_level(self, newval): + self._require_edge_trigger_mode() + newval = assume_units(newval, u.volt).to(u.volt) + self.sendcmd(f":TRIGger:SINGle:EDGE:LEVel {newval.magnitude}V") + + @property + def horizontal_offset(self): + """ + Gets/sets the horizontal offset in the instrument's division units. + + :type: `float` + """ + return _parse_float(self.query(":HORIzontal:OFFSET?"), "horizontal offset") + + @horizontal_offset.setter + def horizontal_offset(self, newval): + self.sendcmd(f":HORIzontal:OFFSET {float(newval)}") + + @property + def measurement_display_enabled(self): + """ + Gets/sets whether the on-screen measurement table is displayed. + + :type: `bool` + """ + return _parse_bool( + self.query(":MEASUrement:DISPlay?"), "measurement display state" + ) + + @measurement_display_enabled.setter + def measurement_display_enabled(self, newval): + if not isinstance(newval, bool): + raise TypeError( + "Measurement display state must be specified with a boolean value." + ) + self.sendcmd(f":MEASUrement:DISPlay {'ON' if newval else 'OFF'}") + + def run(self): + """ + Starts acquisition. + """ + self.sendcmd(":RUN") + + def stop(self): + """ + Stops acquisition. + """ + self.sendcmd(":STOP") + + def autoscale(self): + """ + Executes the scope autoscale action. + + This is an action-like command, not a persistent boolean setting. It + may reconfigure acquisition, timebase, and channel settings. + """ + self.sendcmd(":AUTOscale ON") + + def read_waveform(self, channel): + """ + Reads the current screen waveform for a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `tuple` + :return: Pair ``(x, y)`` of time and voltage samples. + """ + self._validate_channel(channel) + metadata = self._waveform_metadata() + point_count = self._waveform_point_count(metadata) + payload = self._binary_query_exact( + f":DATA:WAVE:SCREEN:CH{channel}?", 4 + 2 * point_count + ) + raw_adc = _parse_waveform_adc( + _strip_packet_prefix(payload, f"screen waveform CH{channel}"), + f"screen waveform CH{channel}", + ) + if len(raw_adc) != point_count: + raise ValueError( + f"Screen waveform point count mismatch for CH{channel}: " + f"metadata={point_count}, payload={len(raw_adc)}" + ) + return ( + self._waveform_time_axis(metadata, point_count), + self._waveform_voltage_axis(metadata, channel, raw_adc), + ) + + def force_trigger(self): + raise NotImplementedError( + "The initial OWON SDS1104 driver does not expose trigger control." + ) + + def _validate_channel(self, channel): + if channel not in {1, 2, 3, 4}: + raise ValueError("Channel index must be between 1 and 4.") + + def _measure(self, channel, item, field_name, units): + self._validate_channel(channel) + token = self.query(f":MEASUrement:CH{channel}:{item}?") + value = _parse_measurement_token(token, field_name) + return u.Quantity(value, units) + + def _measure_short(self, channel, item, field_name, units): + self._validate_channel(channel) + token = self.query(f":MEAS:CH{channel}:{item}?") + value = _parse_measurement_token(token, field_name) + return u.Quantity(value, units) + + def _measure_short_count(self, channel, item, field_name): + self._validate_channel(channel) + token = self.query(f":MEAS:CH{channel}:{item}?") + return _parse_measurement_count(token, field_name) + + def read_measurement_data(self, channel, long_form=False): + """ + Reads the wrapper-style measurement blob for a single channel. + + :param int channel: One-based channel number from 1 to 4. + :param bool long_form: Use ``:MEASUrement:CH?`` instead of + ``:MEAS:CH?``. + :rtype: `dict` + """ + self._validate_channel(channel) + command = f":MEASUrement:CH{channel}?" if long_form else f":MEAS:CH{channel}?" + return _parse_measurement_payload(self._binary_query(command), channel) + + def read_all_measurement_data(self, long_form=False): + """ + Reads the wrapper-style all-channel measurement blob. + + :param bool long_form: Use ``:MEASUrement:ALL?`` instead of ``:MEAS?``. + :rtype: `dict` + """ + command = ":MEASUrement:ALL?" if long_form else ":MEAS?" + return _parse_measurement_map_payload(self._binary_query(command)) + + def measure_frequency(self, channel): + """ + Measures the frequency of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "FREQuency", "frequency", u.hertz) + + def measure_period(self, channel): + """ + Measures the period of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "PERiod", "period", u.second) + + def measure_peak_to_peak(self, channel): + """ + Measures the peak-to-peak voltage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "PKPK", "peak-to-peak voltage", u.volt) + + def measure_rms(self, channel): + """ + Measures the cycle RMS voltage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "CYCRms", "RMS voltage", u.volt) + + def measure_average(self, channel): + """ + Measures the average voltage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "AVERage", "average voltage", u.volt) + + def measure_maximum(self, channel): + """ + Measures the maximum voltage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "MAX", "maximum voltage", u.volt) + + def measure_minimum(self, channel): + """ + Measures the minimum voltage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "MIN", "minimum voltage", u.volt) + + def measure_top(self, channel): + """ + Measures the top voltage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "VTOP", "top voltage", u.volt) + + def measure_base(self, channel): + """ + Measures the base voltage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "VBASe", "base voltage", u.volt) + + def measure_amplitude(self, channel): + """ + Measures the amplitude voltage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "VAMP", "amplitude voltage", u.volt) + + def measure_rise_time(self, channel): + """ + Measures the rise time of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "RTime", "rise time", u.second) + + def measure_fall_time(self, channel): + """ + Measures the fall time of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "FTime", "fall time", u.second) + + def measure_positive_width(self, channel): + """ + Measures the positive pulse width of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "PWIDth", "positive pulse width", u.second) + + def measure_negative_width(self, channel): + """ + Measures the negative pulse width of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "NWIDth", "negative pulse width", u.second) + + def measure_positive_duty(self, channel): + """ + Measures the positive duty cycle of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "PDUTy", "positive duty cycle", u.percent) + + def measure_negative_duty(self, channel): + """ + Measures the negative duty cycle of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "NDUTy", "negative duty cycle", u.percent) + + def measure_overshoot(self, channel): + """ + Measures the overshoot percentage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "OVERshoot", "overshoot", u.percent) + + def measure_preshoot(self, channel): + """ + Measures the preshoot percentage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "PREShoot", "preshoot", u.percent) + + def measure_square_sum(self, channel): + """ + Measures the short-form ``SQUAresum`` quantity for a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure_short(channel, "SQUARESUM", "square sum", u.volt) + + def measure_cursor_rms(self, channel): + """ + Measures the cursor RMS voltage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure_short(channel, "CURSorrms", "cursor RMS voltage", u.volt) + + def measure_screen_duty(self, channel): + """ + Measures the screen duty percentage of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure_short(channel, "SCREenduty", "screen duty", u.percent) + + def measure_positive_pulse_count(self, channel): + """ + Measures the positive pulse count of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `int` + """ + return self._measure_short_count(channel, "PPULSENUM", "positive pulse count") + + def measure_negative_pulse_count(self, channel): + """ + Measures the negative pulse count of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `int` + """ + return self._measure_short_count(channel, "NPULSENUM", "negative pulse count") + + def measure_rise_edge_count(self, channel): + """ + Measures the rising-edge count of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `int` + """ + self._validate_channel(channel) + return _parse_measurement_count( + self.query(f":MEASUrement:CH{channel}:RISEedgenum?"), "rising-edge count" + ) + + def measure_fall_edge_count(self, channel): + """ + Measures the falling-edge count of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `int` + """ + self._validate_channel(channel) + return _parse_measurement_count( + self.query(f":MEASUrement:CH{channel}:FALLedgenum?"), "falling-edge count" + ) + + def measure_area(self, channel): + """ + Measures the area of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure_short(channel, "AREA", "area", u.volt * u.second) + + def measure_cycle_area(self, channel): + """ + Measures the cycle area of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure_short( + channel, "CYCLEAREA", "cycle area", u.volt * u.second + ) + + def measure_hard_frequency(self, channel): + """ + Measures the hard frequency of a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `~pint.Quantity` + """ + return self._measure(channel, "HARDfrequency", "hard frequency", u.hertz) + + def _validate_waveform_point_count( # pylint: disable=too-many-arguments,too-many-positional-arguments + self, + metadata, + point_count, + channel, + field_name, + allow_metadata_short_by_one=False, + ): + expected = self._waveform_point_count(metadata) + if point_count == expected: + return + if allow_metadata_short_by_one and point_count + 1 == expected: + return + raise ValueError( + f"{field_name} point count mismatch for CH{channel}: " + f"metadata={expected}, payload={point_count}" + ) + + def _build_waveform_axes( # pylint: disable=too-many-arguments,too-many-positional-arguments + self, metadata, channel, raw_adc, field_name, allow_metadata_short_by_one=False + ): + self._validate_waveform_point_count( + metadata, + len(raw_adc), + channel, + field_name, + allow_metadata_short_by_one=allow_metadata_short_by_one, + ) + point_count = len(raw_adc) + return ( + self._waveform_time_axis(metadata, point_count), + self._waveform_voltage_axis(metadata, channel, raw_adc), + ) + + def read_screen_bmp(self): + """ + Reads the current screen image as BMP bytes. + + :rtype: `bytes` + """ + payload = self._query_length_prefixed_binary(":DATA:WAVE:SCREen:BMP?") + bmp_data = payload[4:] + if len(bmp_data) < 14 or not bmp_data.startswith(b"BM"): + raise ValueError( + "SDS1104 screen BMP payload does not contain a valid BMP header." + ) + bmp_size = int.from_bytes(bmp_data[2:6], byteorder="little", signed=False) + if bmp_size != len(bmp_data): + raise ValueError( + f"SDS1104 BMP length mismatch: header says {bmp_size} bytes, " + f"received {len(bmp_data)}." + ) + return bmp_data + + def read_deep_memory_metadata(self): + """ + Reads the deep-memory metadata JSON. + + :rtype: `dict` + """ + return _parse_json_payload( + self._query_length_prefixed_binary(":DATA:WAVE:DEPMEM:HEAD?"), + "deep-memory metadata", + ) + + def read_deep_memory_channel(self, channel): + """ + Reads the deep-memory waveform for a channel. + + :param int channel: One-based channel number from 1 to 4. + :rtype: `tuple` + """ + self._validate_channel(channel) + metadata = self.read_deep_memory_metadata() + payload = self._query_length_prefixed_binary(f":DATA:WAVE:DEPMEM:CH{channel}?") + raw_adc = _parse_waveform_adc( + _strip_packet_prefix(payload, f"deep-memory waveform CH{channel}"), + f"deep-memory waveform CH{channel}", + ) + return self._build_waveform_axes( + metadata, + channel, + raw_adc, + "Deep-memory waveform", + allow_metadata_short_by_one=True, + ) + + def read_deep_memory_all(self): # pylint: disable=too-many-locals,too-many-branches + """ + Reads the bundled deep-memory capture as metadata plus raw channel data. + + :rtype: `SDS1104DeepMemoryCapture` + """ + payload = self._query_length_prefixed_binary( + ":DATA:WAVE:DEPMem:All?", max_body_size=100_000_000 + ) + body = payload[4:] + if len(body) < 4: + raise ValueError( + "Deep-memory bundle payload is too short for metadata length." + ) + + metadata_size = int.from_bytes(body[:4], byteorder="little", signed=False) + if metadata_size <= 0: + raise ValueError( + f"Invalid deep-memory bundle metadata length: {metadata_size}" + ) + if 4 + metadata_size > len(body): + raise ValueError( + "Deep-memory bundle metadata length exceeds the received payload size." + ) + + metadata_text = body[4 : 4 + metadata_size].decode("utf-8", errors="replace") + metadata = json.loads(metadata_text.strip()) + if not isinstance(metadata, dict): + raise ValueError(f"Invalid deep-memory bundle metadata: {metadata_text!r}") + + offset = 4 + metadata_size + raw_blocks = [] + block_channel = 1 + while offset < len(body): + if offset + 4 > len(body): + raise ValueError( + "Deep-memory bundle is truncated before a channel block length." + ) + block_size = int.from_bytes( + body[offset : offset + 4], byteorder="little", signed=False + ) + offset += 4 + if offset + block_size > len(body): + raise ValueError( + f"Deep-memory bundle CH{block_channel} block exceeds the " + "received payload size." + ) + raw_adc = _parse_waveform_adc( + body[offset : offset + block_size], + f"deep-memory bundle CH{block_channel}", + ) + self._validate_waveform_point_count( + metadata, + len(raw_adc), + block_channel, + "Deep-memory bundle waveform", + allow_metadata_short_by_one=True, + ) + raw_blocks.append(raw_adc) + offset += block_size + block_channel += 1 + + if not raw_blocks: + raise ValueError("Deep-memory bundle did not contain any channel blocks.") + + raw_channels = {} + metadata_channels = metadata.get("CHANNEL") + displayed_channel_ids = [] + if isinstance(metadata_channels, list): + for index, channel_metadata in enumerate(metadata_channels, start=1): + if ( + isinstance(channel_metadata, dict) + and str(channel_metadata.get("DISPLAY", "")).upper() == "ON" + ): + displayed_channel_ids.append(index) + + if displayed_channel_ids and len(displayed_channel_ids) == len(raw_blocks): + channel_ids = displayed_channel_ids + else: + channel_ids = list(range(1, len(raw_blocks) + 1)) + + for channel_id, raw_adc in zip(channel_ids, raw_blocks, strict=True): + raw_channels[channel_id] = raw_adc + + return SDS1104DeepMemoryCapture( + metadata=metadata, + raw_channels=raw_channels, + ) + + def list_saved_waveforms(self): + """ + Lists saved-waveform entries exposed by ``:SAVE:READ:HEAD?``. + + :rtype: `list` + """ + payload = self._query_length_prefixed_binary(":SAVE:READ:HEAD?") + return [ + SDS1104SavedWaveformEntry(index=str(item["Index"]), raw=dict(item)) + for item in _parse_json_array_payload(payload, "saved waveform head") + ] + + def read_saved_waveform_data(self, index): + """ + Reads the raw ADC payload for a saved-waveform entry. + + :param index: Saved-waveform index token. + :rtype: `numpy.ndarray` or `tuple` + """ + cleaned_index = str(index).strip() + if not cleaned_index: + raise ValueError("Saved waveform index must not be empty.") + payload = self._query_length_prefixed_binary(f":SAVE:READ:DATA {cleaned_index}") + return _parse_waveform_adc( + _strip_packet_prefix( + payload, f"saved waveform data for index {cleaned_index}" + ), + f"saved waveform data for index {cleaned_index}", + ) diff --git a/src/instruments/thorlabs/_abstract.py b/src/instruments/thorlabs/_abstract.py index 7959760d..68c6b636 100644 --- a/src/instruments/thorlabs/_abstract.py +++ b/src/instruments/thorlabs/_abstract.py @@ -84,7 +84,7 @@ def querypacket(self, packet, expect=None, timeout=None, expect_data_len=None): break else: tic = time.time() - if tic - t_start > timeout: + if tic - t_start >= timeout: break if not resp: diff --git a/tests/test_comm/test_usb_communicator.py b/tests/test_comm/test_usb_communicator.py index 8969302c..fa580b71 100644 --- a/tests/test_comm/test_usb_communicator.py +++ b/tests/test_comm/test_usb_communicator.py @@ -127,6 +127,7 @@ def test_timeout_set_unitless(inst): set_val = inst._dev.default_timeout exp_val = 1000 * val assert set_val == exp_val + assert isinstance(set_val, int) def test_timeout_set_minutes(inst): @@ -157,6 +158,14 @@ def test_read_raw(inst): assert inst.read_raw() == msg_exp +def test_read_packet(inst): + """Read a single raw USB packet.""" + msg = b"\x01\x02\x03" + inst._ep_in.read.return_value = msg + + assert inst.read_packet() == msg + + def test_read_raw_size(inst): """If size is -1, read 1000 bytes.""" msg = b"message\n" @@ -170,6 +179,46 @@ def test_read_raw_size(inst): inst._ep_in.read.assert_called_with(max_size) +def test_read_exact(inst): + """Read an exact number of raw USB bytes.""" + inst.read_packet = mock.MagicMock(side_effect=[b"\x01\x02", b"\x03\x04"]) + + assert inst.read_exact(4, chunk_size=2) == b"\x01\x02\x03\x04" + + +def test_read_exact_negative_size(inst): + """Reject a negative exact read size.""" + with pytest.raises(ValueError) as err: + inst.read_exact(-1) + assert err.value.args[0] == "Size must be non-negative." + + +def test_read_binary_exact(inst): + """Read a binary reply with explicit size.""" + inst.read_exact = mock.MagicMock(return_value=b"\x00\x01") + + assert inst.read_binary(2) == b"\x00\x01" + inst.read_exact.assert_called_with(2) + + +def test_read_binary_until_short_packet(inst): + """Read a binary reply until the device sends a short packet.""" + inst._max_packet_size = 4 + inst.read_packet = mock.MagicMock(side_effect=[b"1234", b"56"]) + + assert inst.read_binary() == b"123456" + + +def test_read_binary_until_timeout(inst): + """Read a binary reply until a timeout after receiving data.""" + inst._max_packet_size = 4 + inst.read_packet = mock.MagicMock( + side_effect=[b"1234", usb.core.USBTimeoutError("timeout")] + ) + + assert inst.read_binary() == b"1234" + + def test_read_raw_termination_char_not_found(inst): """Raise IOError if termination character not found.""" msg = b"message" @@ -209,9 +258,9 @@ def test_tell(inst): def test_flush_input(inst): """Flush the input out by trying to read until no more available.""" - inst._ep_in.read.side_effect = [b"message\n", usb.core.USBTimeoutError] + inst._ep_in.read.side_effect = [b"message\n", usb.core.USBTimeoutError("timeout")] inst.flush_input() - inst._ep_in.read.assert_called() + assert inst._ep_in.read.call_count == 2 def test_sendcmd(inst): diff --git a/tests/test_owon/test_sds1104.py b/tests/test_owon/test_sds1104.py new file mode 100644 index 00000000..f356bed2 --- /dev/null +++ b/tests/test_owon/test_sds1104.py @@ -0,0 +1,803 @@ +#!/usr/bin/env python +""" +Tests for the OWON SDS1104 oscilloscope driver. +""" + +# IMPORTS #################################################################### + + +from io import BytesIO +import json +import struct + +import pytest + +import instruments as ik +from instruments.abstract_instruments.comm import LoopbackCommunicator +from instruments.units import ureg as u +from tests import expected_protocol, unit_eq + +# TESTS ###################################################################### + + +def _make_length_prefixed_payload(body): + return struct.pack(""] + ) as scope: + assert scope.name == "OWON,SDS1104,1234,V1.0" + + +def test_channel_proxy(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + assert scope.channel[0].name == "CH1" + assert scope.channel[3].name == "CH4" + + +def test_channel_display(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":CH1:DISP?", ":CH2:DISP ON"], + ["1"], + ) as scope: + assert scope.channel[0].display is True + scope.channel[1].display = True + + +def test_channel_display_type_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(TypeError) as err: + scope.channel[0].display = "ON" + assert ( + err.value.args[0] == "Display state must be specified with a boolean value." + ) + + +def test_channel_coupling(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":CH1:COUP?", ":CH2:COUP AC"], + ["DC"], + ) as scope: + assert scope.channel[0].coupling == scope.Coupling.dc + scope.channel[1].coupling = scope.Coupling.ac + + +def test_channel_coupling_type_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(TypeError) as err: + scope.channel[0].coupling = "DC" + assert ( + err.value.args[0] + == "Coupling setting must be a `OWONSDS1104.Coupling` value." + ) + + +def test_channel_probe(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":CH1:PROB?", ":CH2:PROB 100X"], + ["10X"], + ) as scope: + assert scope.channel[0].probe_attenuation == 10 + scope.channel[1].probe_attenuation = 100 + + +def test_channel_probe_value_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(ValueError) as err: + scope.channel[0].probe_attenuation = 3 + assert ( + err.value.args[0] == "Probe attenuation must be one of 1, 10, 100, or 1000." + ) + + +def test_channel_scale(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":CH1:SCAL?", ":CH2:SCAL 500mv"], + ["100mV"], + ) as scope: + unit_eq(scope.channel[0].scale, 0.1 * u.volt) + scope.channel[1].scale = 0.5 * u.volt + + +def test_channel_scale_value_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(ValueError) as err: + scope.channel[0].scale = 3.0 * u.volt + assert ( + err.value.args[0] + == "Unsupported vertical scale. Must be one of the documented discrete values." + ) + + +def test_channel_offset(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":CH1:OFFS?", ":CH2:OFFS 0.25"], + ["-0.5"], + ) as scope: + unit_eq(scope.channel[0].offset, -0.5 * u.volt) + scope.channel[1].offset = 0.25 * u.volt + + +def test_channel_position(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":CH1:POS?", ":CH2:POS -1.5"], + ["0.25"], + ) as scope: + assert scope.channel[0].position == pytest.approx(0.25) + scope.channel[1].position = -1.5 + + +def test_channel_invert(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":CH1:INVErse?", ":CH2:INVErse OFF"], + ["ON"], + ) as scope: + assert scope.channel[0].invert is True + scope.channel[1].invert = False + + +def test_channel_invert_type_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(TypeError) as err: + scope.channel[0].invert = 1 + assert ( + err.value.args[0] == "Invert state must be specified with a boolean value." + ) + + +def test_acquire_mode(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":ACQUire:Mode?", ":ACQUire:Mode AVERage"], + ["SAMP"], + ) as scope: + assert scope.acquire_mode == scope.AcquisitionMode.sample + scope.acquire_mode = scope.AcquisitionMode.average + + +def test_acquire_mode_type_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(TypeError) as err: + scope.acquire_mode = "SAMPle" + assert ( + err.value.args[0] + == 'Acquisition mode must be one of "SAMPle", "AVERage", or "PEAK".' + ) + + +def test_acquire_averages(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":ACQUire:average:num?", ":ACQUire:average:num 64"], + ["16"], + ) as scope: + assert scope.acquire_averages == 16 + scope.acquire_averages = 64 + + +def test_acquire_averages_value_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(ValueError) as err: + scope.acquire_averages = 8 + assert ( + err.value.args[0] + == "Average count not supported by instrument; must be one of {4, 16, 64, 128}." + ) + + +def test_memory_depth(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":ACQUIRE:DEPMEM?", ":ACQUIRE:DEPMEM 100K"], + ["10K"], + ) as scope: + assert scope.memory_depth == 10_000 + scope.memory_depth = 100_000 + + +def test_memory_depth_value_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(ValueError) as err: + scope.memory_depth = 42 + assert ( + err.value.args[0] + == "Memory depth must be one of 1K, 5K, 10K, 100K, 1M, or 10M. 20M and 40M are documented, but are not yet verified in this driver." + ) + + +def test_timebase_scale(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":HORIzontal:Scale?", ":HORIzontal:Scale 10ms"], + ["1ms"], + ) as scope: + unit_eq(scope.timebase_scale, 1e-3 * u.second) + scope.timebase_scale = 10e-3 * u.second + + +def test_timebase_scale_value_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(ValueError) as err: + scope.timebase_scale = 3e-3 * u.second + assert ( + err.value.args[0] + == "Unsupported timebase scale. Must be one of the documented discrete values." + ) + + +def test_horizontal_offset(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":HORIzontal:OFFSET?", ":HORIzontal:OFFSET 1.5"], + ["0.25"], + ) as scope: + assert scope.horizontal_offset == pytest.approx(0.25) + scope.horizontal_offset = 1.5 + + +def test_measurement_display_enabled(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":MEASUrement:DISPlay?", ":MEASUrement:DISPlay OFF"], + ["ON"], + ) as scope: + assert scope.measurement_display_enabled is True + scope.measurement_display_enabled = False + + +def test_measurement_display_enabled_type_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(TypeError) as err: + scope.measurement_display_enabled = "OFF" + assert ( + err.value.args[0] + == "Measurement display state must be specified with a boolean value." + ) + + +def test_run_stop(): + with expected_protocol(ik.owon.OWONSDS1104, [":RUN", ":STOP"], []) as scope: + scope.run() + scope.stop() + + +def test_trigger_status(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":TRIGger:STATUS?"], + ["READy"], + ) as scope: + assert scope.trigger_status == scope.TriggerStatus.ready + + +def test_trigger_mode(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":TRIGger:SINGle:MODE?", ":TRIGger:SINGle:MODE VIDEO"], + ["EDGE"], + ) as scope: + assert scope.trigger_mode == scope.TriggerMode.edge + scope.trigger_mode = scope.TriggerMode.video + + +def test_trigger_mode_type_error(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(TypeError) as err: + scope.trigger_mode = "EDGE" + assert ( + err.value.args[0] + == "Trigger mode must be specified with a `OWONSDS1104.TriggerMode` value." + ) + + +def test_edge_trigger_properties(): + with expected_protocol( + ik.owon.OWONSDS1104, + [ + ":TRIGger:SINGle:MODE?", + ":TRIGger:SINGle:EDGE:SOURce?", + ":TRIGger:SINGle:MODE?", + ":TRIGger:SINGle:EDGE:SOURce CH2", + ":TRIGger:SINGle:MODE?", + ":TRIGger:SINGle:EDGE:COUPling?", + ":TRIGger:SINGle:MODE?", + ":TRIGger:SINGle:EDGE:COUPling AC", + ":TRIGger:SINGle:MODE?", + ":TRIGger:SINGle:EDGE:SLOPe?", + ":TRIGger:SINGle:MODE?", + ":TRIGger:SINGle:EDGE:SLOPe FALL", + ":TRIGger:SINGle:MODE?", + ":TRIGger:SINGle:EDGE:LEVel?", + ":TRIGger:SINGle:MODE?", + ":TRIGger:SINGle:EDGE:LEVel 0V", + ], + [ + "EDGE", + "CH1", + "EDGE", + "EDGE", + "DC", + "EDGE", + "EDGE", + "RISE", + "EDGE", + "EDGE", + "4.00mV", + "EDGE", + ], + ) as scope: + assert scope.trigger_source == scope.TriggerSource.ch1 + scope.trigger_source = scope.TriggerSource.ch2 + assert scope.trigger_coupling == scope.TriggerCoupling.dc + scope.trigger_coupling = scope.TriggerCoupling.ac + assert scope.trigger_slope == scope.TriggerSlope.rise + scope.trigger_slope = scope.TriggerSlope.fall + unit_eq(scope.trigger_level, 4e-3 * u.volt) + scope.trigger_level = 0 * u.volt + + +def test_edge_trigger_properties_require_edge_mode(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":TRIGger:SINGle:MODE?"], + ["VIDEO"], + ) as scope: + with pytest.raises(NotImplementedError) as err: + _ = scope.trigger_source + assert ( + err.value.args[0] + == "Trigger source, coupling, slope, and level are only exposed for EDGE trigger mode in this driver." + ) + + +def test_edge_trigger_type_errors(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":TRIGger:SINGle:MODE?", ":TRIGger:SINGle:MODE?", ":TRIGger:SINGle:MODE?"], + ["EDGE", "EDGE", "EDGE"], + ) as scope: + with pytest.raises(TypeError) as err: + scope.trigger_source = "CH1" + assert ( + err.value.args[0] + == "Trigger source must be specified with a `OWONSDS1104.TriggerSource` value." + ) + + with pytest.raises(TypeError) as err: + scope.trigger_coupling = "DC" + assert ( + err.value.args[0] + == "Trigger coupling must be specified with a `OWONSDS1104.TriggerCoupling` value." + ) + + with pytest.raises(TypeError) as err: + scope.trigger_slope = "RISE" + assert ( + err.value.args[0] + == "Trigger slope must be specified with a `OWONSDS1104.TriggerSlope` value." + ) + + +def test_autoscale(): + with expected_protocol(ik.owon.OWONSDS1104, [":AUTOscale ON"], []) as scope: + scope.autoscale() + + +def test_measure_frequency(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":MEASUrement:CH1:FREQuency?"], + ["12.5kHz"], + ) as scope: + unit_eq(scope.measure_frequency(1), 12_500 * u.hertz) + + +def test_measure_period(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":MEASUrement:CH2:PERiod?"], + ["5ms"], + ) as scope: + unit_eq(scope.measure_period(2), 5e-3 * u.second) + + +def test_measure_peak_to_peak(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":MEASUrement:CH3:PKPK?"], + ["2.5V"], + ) as scope: + unit_eq(scope.measure_peak_to_peak(3), 2.5 * u.volt) + + +def test_measure_rms(): + with expected_protocol( + ik.owon.OWONSDS1104, + [":MEASUrement:CH4:CYCRms?"], + ["125mV"], + ) as scope: + unit_eq(scope.measure_rms(4), 0.125 * u.volt) + + +def test_extended_long_form_measurements(): + with expected_protocol( + ik.owon.OWONSDS1104, + [ + ":MEASUrement:CH1:AVERage?", + ":MEASUrement:CH1:MAX?", + ":MEASUrement:CH1:MIN?", + ":MEASUrement:CH1:VTOP?", + ":MEASUrement:CH1:VBASe?", + ":MEASUrement:CH1:VAMP?", + ":MEASUrement:CH1:RTime?", + ":MEASUrement:CH1:FTime?", + ":MEASUrement:CH1:PWIDth?", + ":MEASUrement:CH1:NWIDth?", + ":MEASUrement:CH1:PDUTy?", + ":MEASUrement:CH1:NDUTy?", + ":MEASUrement:CH1:OVERshoot?", + ":MEASUrement:CH1:PREShoot?", + ], + [ + "V : 10.00mV", + "Vmax : 80.00mV", + "Vmin : -10.00mV", + "Vtop : 70.00mV", + "Vbase : 5.00mV", + "Vamp : 65.00mV", + "Rt : 400us", + "Ft : 500us", + "PW : 2.00ms", + "NW : 3.00ms", + "PD : 60.0%", + "ND : 40.0%", + "OS : 5.0%", + "PS : 2.5%", + ], + ) as scope: + unit_eq(scope.measure_average(1), 10e-3 * u.volt) + unit_eq(scope.measure_maximum(1), 80e-3 * u.volt) + unit_eq(scope.measure_minimum(1), -10e-3 * u.volt) + unit_eq(scope.measure_top(1), 70e-3 * u.volt) + unit_eq(scope.measure_base(1), 5e-3 * u.volt) + unit_eq(scope.measure_amplitude(1), 65e-3 * u.volt) + unit_eq(scope.measure_rise_time(1), 400e-6 * u.second) + unit_eq(scope.measure_fall_time(1), 500e-6 * u.second) + unit_eq(scope.measure_positive_width(1), 2e-3 * u.second) + unit_eq(scope.measure_negative_width(1), 3e-3 * u.second) + unit_eq(scope.measure_positive_duty(1), 60 * u.percent) + unit_eq(scope.measure_negative_duty(1), 40 * u.percent) + unit_eq(scope.measure_overshoot(1), 5 * u.percent) + unit_eq(scope.measure_preshoot(1), 2.5 * u.percent) + + +def test_extended_short_form_measurements(): + with expected_protocol( + ik.owon.OWONSDS1104, + [ + ":MEAS:CH1:SQUARESUM?", + ":MEAS:CH1:CURSorrms?", + ":MEAS:CH1:SCREenduty?", + ":MEAS:CH1:PPULSENUM?", + ":MEAS:CH1:NPULSENUM?", + ":MEAS:CH1:AREA?", + ":MEAS:CH1:CYCLEAREA?", + ], + [ + "Vr : 20.00mV", + "CR : 5.000mV", + "WP : 40.0%", + "+PC : 4", + "-PC : 3", + "AR : 1.5mV*s", + "CA : 500uV*s", + ], + ) as scope: + unit_eq(scope.measure_square_sum(1), 20e-3 * u.volt) + unit_eq(scope.measure_cursor_rms(1), 5e-3 * u.volt) + unit_eq(scope.measure_screen_duty(1), 40 * u.percent) + assert scope.measure_positive_pulse_count(1) == 4 + assert scope.measure_negative_pulse_count(1) == 3 + unit_eq(scope.measure_area(1), 1.5e-3 * u.volt * u.second) + unit_eq(scope.measure_cycle_area(1), 500e-6 * u.volt * u.second) + + +def test_edge_count_and_hard_frequency_measurements(): + with expected_protocol( + ik.owon.OWONSDS1104, + [ + ":MEASUrement:CH1:RISEedgenum?", + ":MEASUrement:CH1:FALLedgenum?", + ":MEASUrement:CH1:HARDfrequency?", + ], + [ + "+E : 4", + "-E : 3", + "<2Hz", + ], + ) as scope: + assert scope.measure_rise_edge_count(1) == 4 + assert scope.measure_fall_edge_count(1) == 3 + unit_eq(scope.measure_hard_frequency(1), 2 * u.hertz) + + +def test_channel_measurement_helpers(): + with expected_protocol( + ik.owon.OWONSDS1104, + [ + ":MEASUrement:CH1:FREQuency?", + ":MEASUrement:CH1:PERiod?", + ":MEASUrement:CH1:PKPK?", + ":MEASUrement:CH1:CYCRms?", + ":MEASUrement:CH1:AVERage?", + ":MEASUrement:CH1:MAX?", + ":MEASUrement:CH1:MIN?", + ], + ["100Hz", "10ms", "800mV", "200mV", "100mV", "900mV", "-50mV"], + ) as scope: + unit_eq(scope.channel[0].measure_frequency(), 100 * u.hertz) + unit_eq(scope.channel[0].measure_period(), 10e-3 * u.second) + unit_eq(scope.channel[0].measure_peak_to_peak(), 0.8 * u.volt) + unit_eq(scope.channel[0].measure_rms(), 0.2 * u.volt) + unit_eq(scope.channel[0].measure_average(), 0.1 * u.volt) + unit_eq(scope.channel[0].measure_maximum(), 0.9 * u.volt) + unit_eq(scope.channel[0].measure_minimum(), -0.05 * u.volt) + + +def test_measurement_channel_validation(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(ValueError) as err: + scope.measure_frequency(0) + assert err.value.args[0] == "Channel index must be between 1 and 4." + + +def test_force_trigger_not_implemented(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(NotImplementedError): + scope.force_trigger() + + +def test_channel_read_waveform_binary(): + metadata = { + "CHANNEL": [ + {"SCALE": "100mV", "PROBE": "10X", "OFFSET": "0"}, + {"SCALE": "100mV", "PROBE": "10X", "OFFSET": "0"}, + {"SCALE": "100mV", "PROBE": "10X", "OFFSET": "0"}, + {"SCALE": "100mV", "PROBE": "10X", "OFFSET": "0"}, + ], + "SAMPLE": {"SAMPLERATE": "1MS/s", "DATALEN": "4"}, + "TIMEBASE": {"HOFFSET": "0"}, + } + metadata_payload = b"\x00\x00\x00\x00" + json.dumps(metadata).encode("ascii") + waveform_payload = b"\x00\x00\x00\x00" + struct.pack("<4h", 0, 82, -82, 410) + + scope, stdout = _make_binary_scope( + binary_replies=[metadata_payload], + exact_replies=[waveform_payload], + ) + + x, y = scope.channel[0].read_waveform() + + assert stdout.getvalue() == (b":DATA:WAVE:SCREen:HEAD?" b":DATA:WAVE:SCREEN:CH1?") + assert tuple(float(value) for value in x) == pytest.approx( + (-10e-6, -5e-6, 0.0, 5e-6) + ) + assert tuple(float(value) for value in y) == pytest.approx((0.0, 0.2, -0.2, 1.0)) + + +def test_read_waveform_metadata(): + metadata = { + "CHANNEL": [ + {"SCALE": "100mV", "PROBE": "10X", "OFFSET": "0"}, + {"SCALE": "100mV", "PROBE": "10X", "OFFSET": "0"}, + {"SCALE": "100mV", "PROBE": "10X", "OFFSET": "0"}, + {"SCALE": "100mV", "PROBE": "10X", "OFFSET": "0"}, + ], + "SAMPLE": {"SAMPLERATE": "1MS/s", "DATALEN": "4"}, + "TIMEBASE": {"HOFFSET": "0"}, + } + metadata_payload = b"\x00\x00\x00\x00" + json.dumps(metadata).encode("ascii") + + scope, stdout = _make_binary_scope(binary_replies=[metadata_payload]) + + result = scope.read_waveform_metadata() + + assert result["SAMPLE"]["DATALEN"] == "4" + assert stdout.getvalue() == b":DATA:WAVE:SCREen:HEAD?" + + +def test_channel_read_waveform_ascii_not_implemented(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(NotImplementedError): + scope.channel[0].read_waveform(bin_format=False) + + +def test_math_read_waveform_not_implemented(): + with expected_protocol(ik.owon.OWONSDS1104, [], []) as scope: + with pytest.raises(NotImplementedError): + scope.math.read_waveform() + + +def test_read_measurement_data_short_and_long(): + short_payload = ( + b"\x00\x00\x00\x00" + b'{"CH1":{"FREQuency":"1.00kHz","PKPK":"2.00V"}}' + ) + long_payload = ( + b"\x00\x00\x00\x00" + b'{"CH1":{"FREQuency":"1.00kHz","PKPK":"2.00V"}}' + ) + scope, stdout = _make_binary_scope( + binary_replies=[short_payload, long_payload], + ) + + assert scope.read_measurement_data(1) == {"FREQuency": "1.00kHz", "PKPK": "2.00V"} + assert scope.channel[0].read_measurement_data(long_form=True) == { + "FREQuency": "1.00kHz", + "PKPK": "2.00V", + } + assert stdout.getvalue() == (b":MEAS:CH1?" b":MEASUrement:CH1?") + + +def test_read_all_measurement_data_short_and_long(): + short_payload = ( + b"\x00\x00\x00\x00" + b'{"CH1":{"FREQuency":"1.00kHz"},"CH2":{"PKPK":"2.00V"}}' + ) + long_payload = ( + b"\x00\x00\x00\x00" + b'{"CH1":{"FREQuency":"1.00kHz"},"CH2":{"PKPK":"2.00V"}}' + ) + scope, stdout = _make_binary_scope( + binary_replies=[short_payload, long_payload], + ) + + assert scope.read_all_measurement_data() == { + 1: {"FREQuency": "1.00kHz"}, + 2: {"PKPK": "2.00V"}, + } + assert scope.read_all_measurement_data(long_form=True) == { + 1: {"FREQuency": "1.00kHz"}, + 2: {"PKPK": "2.00V"}, + } + assert stdout.getvalue() == (b":MEAS?" b":MEASUrement:ALL?") + + +def test_read_screen_bmp(): + bmp_body = b"BM" + struct.pack("