From 300621028b14f9571452ec7726340a6e48207950 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 15:02:52 +0100 Subject: [PATCH 01/12] feat: add support for custom tag order --- README.md | 22 ++++++++++++++ .../write_client/client/_base.py | 3 +- .../write_client/client/write/point.py | 29 ++++++++++++++++--- .../write_client/client/write_api.py | 25 +++++++++++++++- tests/test_influxdb_client_3.py | 7 ++++- tests/test_point.py | 11 +++++++ 6 files changed, 90 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 0be7419..9e78230 100644 --- a/README.md +++ b/README.md @@ -77,6 +77,28 @@ You can write data using the Point class, or supplying line protocol. point = Point("measurement").tag("location", "london").field("temperature", 42) client.write(point) ``` + +### Control tag order for first-write column order (InfluxDB 3 Enterprise) +```python +point = Point("cpu") \ + .tag("host", "server-a") \ + .tag("region", "us-east") \ + .tag("rack", "r1") \ + .field("usage", 0.42) + +write_options = WriteOptions( + write_type=WriteType.synchronous, + tag_order=["region", "host"], +) + +client = InfluxDBClient3( + token="your-token", + host="your-host", + database="your-database", + write_client_options=write_client_options(write_options=write_options), +) +client.write(point) +``` ### Using Line Protocol ```python point = "measurement fieldname=0" diff --git a/influxdb_client_3/write_client/client/_base.py b/influxdb_client_3/write_client/client/_base.py index 783eb3f..34ff05d 100644 --- a/influxdb_client_3/write_client/client/_base.py +++ b/influxdb_client_3/write_client/client/_base.py @@ -246,7 +246,8 @@ def _serialize(self, record, write_precision, payload, **kwargs): elif isinstance(record, Point): precision_from_point = kwargs.get('precision_from_point', True) precision = record.write_precision if precision_from_point else write_precision - self._serialize(record.to_line_protocol(precision=precision), precision, payload, **kwargs) + self._serialize(record.to_line_protocol(precision=precision, tag_order=kwargs.get('tag_order')), + precision, payload, **kwargs) elif isinstance(record, dict): self._serialize(Point.from_dict(record, write_precision=write_precision, **kwargs), diff --git a/influxdb_client_3/write_client/client/write/point.py b/influxdb_client_3/write_client/client/write/point.py index bc7211d..b379305 100644 --- a/influxdb_client_3/write_client/client/write/point.py +++ b/influxdb_client_3/write_client/client/write/point.py @@ -215,11 +215,12 @@ def field(self, field, value): self._fields[field] = value return self - def to_line_protocol(self, precision=None): + def to_line_protocol(self, precision=None, tag_order=None): """ Create LineProtocol. :param precision: required precision of LineProtocol. If it's not set then use the precision from ``Point``. + :param tag_order: optional list of tag names to prioritize in serialized output """ _measurement = _escape_key(self._name, _ESCAPE_MEASUREMENT) if _measurement.startswith("#"): @@ -229,7 +230,7 @@ def to_line_protocol(self, precision=None): - https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/#comments """ warnings.warn(message, SyntaxWarning) - _tags = _append_tags(self._tags) + _tags = _append_tags(self._tags, tag_order) _fields = _append_fields(self._fields, self._field_types) if not _fields: return "" @@ -252,9 +253,11 @@ def __str__(self): return self.to_line_protocol() -def _append_tags(tags): +def _append_tags(tags, tag_order=None): _return = [] - for tag_key, tag_value in sorted(tags.items()): + ordered_tag_keys = _ordered_tag_keys(tags, tag_order) + for tag_key in ordered_tag_keys: + tag_value = tags.get(tag_key) if tag_value is None: continue @@ -267,6 +270,24 @@ def _append_tags(tags): return f"{',' if _return else ''}{','.join(_return)} " +def _ordered_tag_keys(tags, tag_order=None): + sorted_keys = sorted(tags.keys()) + if not tag_order: + return sorted_keys + + remaining = set(sorted_keys) + ordered = [] + for tag_key in tag_order: + if not tag_key: + continue + if tag_key in remaining: + remaining.remove(tag_key) + ordered.append(tag_key) + + ordered.extend(sorted(remaining)) + return ordered + + def _append_fields(fields, field_types): _return = [] diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index b071856..7db8a3d 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -43,6 +43,8 @@ 'record_time_key', 'record_tag_keys', 'record_field_keys', + # Point serialization-specific kwargs + 'tag_order', } logger = logging.getLogger('influxdb_client_3.write_client.client.write_api') @@ -67,6 +69,20 @@ class DefaultWriteOptions(Enum): timeout = DEFAULT_WRITE_TIMEOUT +def _sanitize_tag_order(tag_order): + if not tag_order: + return [] + + sanitized = [] + seen = set() + for tag in tag_order: + if not tag or tag in seen: + continue + seen.add(tag) + sanitized.append(tag) + return sanitized + + class WriteOptions(object): """Write configuration.""" @@ -81,6 +97,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, max_close_wait=300_000, write_precision=DEFAULT_WRITE_PRECISION, no_sync=DEFAULT_WRITE_NO_SYNC, + tag_order=None, timeout=DEFAULT_WRITE_TIMEOUT, write_scheduler=ThreadPoolScheduler(max_workers=1)) -> None: """ @@ -100,6 +117,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, :param max_close_wait: the maximum time to wait for writes to be flushed if close() is called :param write_precision: precision to use when writing points to InfluxDB :param no_sync: skip waiting for WAL persistence on write + :param tag_order: optional list of tag names used to prioritize tag serialization order :param timeout: timeout to use when writing to the database in milliseconds. Default is 10_000 :param write_scheduler: """ @@ -117,6 +135,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.write_precision = write_precision self.timeout = timeout self.no_sync = no_sync + self.tag_order = _sanitize_tag_order(tag_order) def to_retry_strategy(self, **kwargs): """ @@ -380,6 +399,8 @@ def write(self, bucket: str, org: str = None, if write_precision is None: write_precision = self._write_options.write_precision + kwargs.setdefault('tag_order', self._write_options.tag_order) + if self._write_options.write_type is WriteType.batching: return self._write_batching(bucket, org, record, write_precision, **kwargs) @@ -520,7 +541,9 @@ def _write_batching(self, bucket, org, data, precision, **kwargs) elif isinstance(data, Point): - self._write_batching(bucket, org, data.to_line_protocol(), data.write_precision, **kwargs) + self._write_batching(bucket, org, + data.to_line_protocol(tag_order=kwargs.get('tag_order')), + data.write_precision, **kwargs) elif isinstance(data, dict): self._write_batching(bucket, org, Point.from_dict(data, write_precision=precision, **kwargs), diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 4b489d6..197a8bb 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -90,7 +90,8 @@ def test_write_options(self): max_retry_time=0, max_retry_delay=0, timeout=30_000, - flush_interval=500,)) + flush_interval=500, + tag_order=["region", "", "host", "region"])) ) self.assertIsInstance(client._write_client_options["write_options"], WriteOptions) @@ -103,6 +104,7 @@ def test_write_options(self): self.assertEqual(0, client._write_client_options["write_options"].max_retry_delay) self.assertEqual(WriteType.synchronous, client._write_client_options["write_options"].write_type) self.assertEqual(500, client._write_client_options["write_options"].flush_interval) + self.assertEqual(["region", "host"], client._write_client_options["write_options"].tag_order) def test_default_write_options(self): client = InfluxDBClient3( @@ -118,6 +120,7 @@ def test_default_write_options(self): self.assertEqual(DefaultWriteOptions.write_precision.value, client._write_client_options["write_options"].write_precision) self.assertEqual(DefaultWriteOptions.timeout.value, client._write_client_options["write_options"].timeout) + self.assertEqual([], client._write_client_options["write_options"].tag_order) @asyncio_run async def test_query_async(self): @@ -181,10 +184,12 @@ def verify_client_write_options(c): self.assertEqual(write_options.write_precision, expected_precision) self.assertEqual(write_options.write_type, expected_write_type) self.assertEqual(write_options.no_sync, expected_no_sync) + self.assertEqual(write_options.tag_order, []) self.assertEqual(c._write_api._write_options.write_precision, expected_precision) self.assertEqual(c._write_api._write_options.write_type, expected_write_type) self.assertEqual(c._write_api._write_options.no_sync, expected_no_sync) + self.assertEqual(c._write_api._write_options.tag_order, []) env_client = InfluxDBClient3.from_env() verify_client_write_options(env_client) diff --git a/tests/test_point.py b/tests/test_point.py index 1559b62..bc07461 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -12,3 +12,14 @@ def test_epoch(self): def test_point(self): point = Point.measurement("h2o").tag("location", "europe").field("level", 2.2).time(1_000_000) self.assertEqual('h2o,location=europe level=2.2 1000000', point.to_line_protocol()) + + def test_point_tag_order(self): + point = Point.measurement("h2o") \ + .tag("rack", "r1") \ + .tag("host", "h1") \ + .tag("region", "us-east") \ + .field("level", 2) + + self.assertEqual('h2o,host=h1,rack=r1,region=us-east level=2i', point.to_line_protocol()) + self.assertEqual('h2o,region=us-east,host=h1,rack=r1 level=2i', + point.to_line_protocol(tag_order=["region", "", "host", "region", "missing"])) From e6dacb829fead7cb7151740c4401da829e97ae7d Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 15:08:30 +0100 Subject: [PATCH 02/12] docs: update CHANGELOG --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1770318..839b845 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## 0.19.0 [unreleased] +### Features + +1. [#198](https://github.com/InfluxCommunity/influxdb3-python/pull/198): Support custom tag order via `tag_order` write option. + See [Sort tags by priority](https://docs.influxdata.com/influxdb3/enterprise/write-data/best-practices/schema-design/#sort-tags-by-query-priority) for more. + ## 0.18.0 [2026-02-19] ### Features From 261949df04d01f81e0b2bdb249ff7d8908231deb Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 15:13:07 +0100 Subject: [PATCH 03/12] test: more coverage --- tests/test_influxdb_client_3.py | 4 ++++ tests/test_point.py | 1 + 2 files changed, 5 insertions(+) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 197a8bb..3577f7d 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -152,6 +152,10 @@ def test_write_api_custom_options_no_error(self): client = InfluxDBClient3(write_client_options=write_client_option) try: client._write_api._write_batching("bucket", "org", Point.measurement("test"), None) + client._write_api._write_batching("bucket", "org", { + "measurement": "test", + "fields": {"value": 1} + }, None) self.assertTrue(True) except Exception as e: self.fail(f"Write API with default options raised an exception: {str(e)}") diff --git a/tests/test_point.py b/tests/test_point.py index bc07461..bbb2a87 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -15,6 +15,7 @@ def test_point(self): def test_point_tag_order(self): point = Point.measurement("h2o") \ + .tag("drop", None) \ .tag("rack", "r1") \ .tag("host", "h1") \ .tag("region", "us-east") \ From 6d09c309ca865e0fa9cfd34c8cb0ffc7cc376ce2 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 15:35:14 +0100 Subject: [PATCH 04/12] fix: mark never used generated methods for exclusion from coverage --- .../write_client/domain/write_precision.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/influxdb_client_3/write_client/domain/write_precision.py b/influxdb_client_3/write_client/domain/write_precision.py index 4917201..cf175dd 100644 --- a/influxdb_client_3/write_client/domain/write_precision.py +++ b/influxdb_client_3/write_client/domain/write_precision.py @@ -30,7 +30,7 @@ class WritePrecision(object): def __init__(self): # noqa: E501,D401,D403 """WritePrecision - a model defined in OpenAPI.""" # noqa: E501 self.discriminator = None - def to_dict(self): + def to_dict(self): # pragma: no cover """Return the model properties as a dict.""" result = {} @@ -54,21 +54,21 @@ def to_dict(self): return result - def to_str(self): + def to_str(self): # pragma: no cover """Return the string representation of the model.""" return pprint.pformat(self.to_dict()) - def __repr__(self): + def __repr__(self): # pragma: no cover """For `print` and `pprint`.""" return self.to_str() - def __eq__(self, other): + def __eq__(self, other): # pragma: no cover """Return true if both objects are equal.""" if not isinstance(other, WritePrecision): return False return self.__dict__ == other.__dict__ - def __ne__(self, other): + def __ne__(self, other): # pragma: no cover """Return true if both objects are not equal.""" return not self == other From b1d07e72c5ecb6ad9ef93712e23928a3f3ed666f Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 15:36:07 +0100 Subject: [PATCH 05/12] test: more coverage --- tests/test_point.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/tests/test_point.py b/tests/test_point.py index bbb2a87..42320af 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -1,6 +1,7 @@ import datetime import unittest +from influxdb_client_3 import WritePrecision from influxdb_client_3.write_client.client.write.point import EPOCH, Point @@ -24,3 +25,33 @@ def test_point_tag_order(self): self.assertEqual('h2o,host=h1,rack=r1,region=us-east level=2i', point.to_line_protocol()) self.assertEqual('h2o,region=us-east,host=h1,rack=r1 level=2i', point.to_line_protocol(tag_order=["region", "", "host", "region", "missing"])) + + def test_point_field_types_and_time_conversion(self): + point = Point.measurement("m") \ + .field("drop", None) \ + .field("flag", True) \ + .field("name", "abc") \ + .field("value", 1) + + self.assertEqual('m flag=true,name="abc",value=1i', point.to_line_protocol()) + + dt = datetime.datetime(1970, 1, 1, 0, 0, 1, tzinfo=datetime.timezone.utc) + self.assertEqual('m value=1i 1000000000', + Point.measurement("m").field("value", 1).time(dt, WritePrecision.NS).to_line_protocol()) + self.assertEqual('m value=1i 1000000', + Point.measurement("m").field("value", 1).time(dt, WritePrecision.US).to_line_protocol()) + self.assertEqual('m value=1i 1000', + Point.measurement("m").field("value", 1).time(dt, WritePrecision.MS).to_line_protocol()) + self.assertEqual('m value=1i 1', + Point.measurement("m").field("value", 1).time(dt, WritePrecision.S).to_line_protocol()) + self.assertEqual('m value=1i 1000000', + Point.measurement("m").field("value", 1) + .time(datetime.timedelta(seconds=1), WritePrecision.US).to_line_protocol()) + self.assertEqual('m value=1i 1', + Point.measurement("m").field("value", 1) + .time("1970-01-01T00:00:01Z", WritePrecision.S).to_line_protocol()) + + with self.assertRaisesRegex(ValueError, 'not supported'): + Point.measurement("m").field("bad", object()).to_line_protocol() + with self.assertRaises(ValueError): + Point.measurement("m").field("value", 1).time([]).to_line_protocol() From b4784daa3ae6e311e78e55835f6342cc2aec6c42 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 15:50:11 +0100 Subject: [PATCH 06/12] docs: fix example --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 9e78230..522ee1c 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,8 @@ client.write(point) ### Control tag order for first-write column order (InfluxDB 3 Enterprise) ```python +from influxdb_client_3 import InfluxDBClient3, Point, WriteOptions, WriteType, write_client_options + point = Point("cpu") \ .tag("host", "server-a") \ .tag("region", "us-east") \ From e20491ba7f823970b971d174b7e6442044db758d Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 15:51:44 +0100 Subject: [PATCH 07/12] fix: increases robustness of tag_order handling --- influxdb_client_3/write_client/client/write_api.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 7db8a3d..0d139b9 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -70,13 +70,23 @@ class DefaultWriteOptions(Enum): def _sanitize_tag_order(tag_order): - if not tag_order: + if tag_order is None: return [] + if isinstance(tag_order, (str, bytes)): + raise TypeError("tag_order must be an iterable of strings, not str/bytes") + + if not isinstance(tag_order, Iterable): + raise TypeError("tag_order must be an iterable of strings") + sanitized = [] seen = set() for tag in tag_order: - if not tag or tag in seen: + if tag is None or tag == "": + continue + if not isinstance(tag, str): + raise TypeError("tag_order entries must be strings") + if tag in seen: continue seen.add(tag) sanitized.append(tag) From 30a51c04c20aec5a9ce76c42311be5d311ba0757 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 15:51:54 +0100 Subject: [PATCH 08/12] fix: increases robustness of tag_order handling --- tests/test_influxdb_client_3.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 3577f7d..21eede6 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -106,6 +106,12 @@ def test_write_options(self): self.assertEqual(500, client._write_client_options["write_options"].flush_interval) self.assertEqual(["region", "host"], client._write_client_options["write_options"].tag_order) + with self.assertRaisesRegex(TypeError, "tag_order must be an iterable of strings, not str/bytes"): + WriteOptions(tag_order="region,host") + + with self.assertRaisesRegex(TypeError, "tag_order entries must be strings"): + WriteOptions(tag_order=["region", 1]) + def test_default_write_options(self): client = InfluxDBClient3( host="localhost", From 81108bde91104ff997e9535427df1adf1cad326a Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 16:10:01 +0100 Subject: [PATCH 09/12] refactor: tag order shared across all serializers --- .../client/write/dataframe_serializer.py | 11 +++-- .../write_client/client/write/point.py | 48 ++++++++++++++----- .../write/polars_dataframe_serializer.py | 40 +++++++++------- .../write_client/client/write_api.py | 33 +++---------- tests/test_dataframe_serializer.py | 9 ++++ tests/test_polars.py | 18 +++++-- 6 files changed, 97 insertions(+), 62 deletions(-) diff --git a/influxdb_client_3/write_client/client/write/dataframe_serializer.py b/influxdb_client_3/write_client/client/write/dataframe_serializer.py index 245d980..394e19f 100644 --- a/influxdb_client_3/write_client/client/write/dataframe_serializer.py +++ b/influxdb_client_3/write_client/client/write/dataframe_serializer.py @@ -10,7 +10,7 @@ from influxdb_client_3.write_client.domain import WritePrecision from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, \ - DEFAULT_WRITE_PRECISION + DEFAULT_WRITE_PRECISION, ordered_tag_keys_for_serialization logger = logging.getLogger('influxdb_client.client.write.dataframe_serializer') @@ -130,8 +130,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # keys holds a list of string keys. keys = [] - # tags holds a list of tag f-string segments ordered alphabetically by tag key. - tags = [] + # tags holds map of tag key -> tag f-string segment. + tags = {} # fields holds a list of field f-string segments ordered alphabetically by field key fields = [] # field_indexes holds the index into each row of all the fields. @@ -188,7 +188,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION }}""" else: key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}' - tags.append(key_value) + tags[key] = key_value continue elif timestamp_column is not None and key in timestamp_column: timestamp_index = field_index @@ -225,7 +225,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT) - tags = ''.join(tags) + tag_keys = ordered_tag_keys_for_serialization(list(tags.keys()), kwargs.get('tag_order')) + tags = ''.join(tags[tag_key] for tag_key in tag_keys) fields = ''.join(fields) timestamp = '{p[%s].value}' % timestamp_index if precision == WritePrecision.US: diff --git a/influxdb_client_3/write_client/client/write/point.py b/influxdb_client_3/write_client/client/write/point.py index b379305..2dccc65 100644 --- a/influxdb_client_3/write_client/client/write/point.py +++ b/influxdb_client_3/write_client/client/write/point.py @@ -3,6 +3,7 @@ import math import warnings from builtins import int +from collections.abc import Iterable from datetime import datetime, timedelta, timezone from decimal import Decimal from numbers import Integral @@ -255,7 +256,7 @@ def __str__(self): def _append_tags(tags, tag_order=None): _return = [] - ordered_tag_keys = _ordered_tag_keys(tags, tag_order) + ordered_tag_keys = ordered_tag_keys_for_serialization(sorted(tags.keys()), tag_order) for tag_key in ordered_tag_keys: tag_value = tags.get(tag_key) @@ -270,22 +271,47 @@ def _append_tags(tags, tag_order=None): return f"{',' if _return else ''}{','.join(_return)} " -def _ordered_tag_keys(tags, tag_order=None): - sorted_keys = sorted(tags.keys()) +def sanitize_tag_order(tag_order): + if tag_order is None: + return [] + + if isinstance(tag_order, (str, bytes)): + raise TypeError("tag_order must be an iterable of strings, not str/bytes") + + if not isinstance(tag_order, Iterable): + raise TypeError("tag_order must be an iterable of strings") + + sanitized = [] + seen = set() + for tag in tag_order: + if tag is None or tag == "": + continue + if not isinstance(tag, str): + raise TypeError("tag_order entries must be strings") + if tag in seen: + continue + seen.add(tag) + sanitized.append(tag) + return sanitized + + +def ordered_tag_keys_for_serialization(existing_keys, tag_order=None): + ordered_keys = list(existing_keys) if not tag_order: - return sorted_keys + return ordered_keys - remaining = set(sorted_keys) - ordered = [] + remaining = set(ordered_keys) + prioritized = [] for tag_key in tag_order: if not tag_key: continue - if tag_key in remaining: - remaining.remove(tag_key) - ordered.append(tag_key) + if tag_key not in remaining: + continue + remaining.remove(tag_key) + prioritized.append(tag_key) - ordered.extend(sorted(remaining)) - return ordered + prioritized.extend([tag_key for tag_key in ordered_keys if tag_key in remaining]) + return prioritized def _append_fields(fields, field_types): diff --git a/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py b/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py index 2156cf7..1b88a32 100644 --- a/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py +++ b/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py @@ -7,7 +7,8 @@ import logging import math -from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION +from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION, \ + ordered_tag_keys_for_serialization logger = logging.getLogger('influxdb_client.client.write.polars_dataframe_serializer') @@ -36,6 +37,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION self.chunk_size = chunk_size self.measurement_name = kwargs.get("data_frame_measurement_name", "measurement") self.tag_columns = kwargs.get("data_frame_tag_columns", []) + self.tag_order = kwargs.get("tag_order", None) self.timestamp_column = kwargs.get("data_frame_timestamp_column", None) self.timestamp_timezone = kwargs.get("data_frame_timestamp_timezone", None) @@ -62,25 +64,31 @@ def escape_value(self, value): return str(value).translate(_ESCAPE_STRING) def to_line_protocol(self, row): - # Filter out None or empty values for tags - tags = "" + tag_values = {} + tag_keys = [] + for col in self.tag_columns: + value = row[self.column_indices[col]] + if value is None or value == "": + continue + if col not in tag_values: + tag_keys.append(col) + tag_values[col] = value + if self.point_settings.defaultTags: + for key, value in self.point_settings.defaultTags.items(): + if value is None or value == "": + continue + if key in tag_values: + continue + tag_keys.append(key) + tag_values[key] = value + + final_tag_keys = ordered_tag_keys_for_serialization(tag_keys, self.tag_order) tags = ",".join( - f'{self.escape_key(col)}={self.escape_key(row[self.column_indices[col]])}' - for col in self.tag_columns - if row[self.column_indices[col]] is not None and row[self.column_indices[col]] != "" + f'{self.escape_key(key)}={self.escape_key(tag_values[key])}' + for key in final_tag_keys ) - if self.point_settings.defaultTags: - default_tags = ",".join( - f'{self.escape_key(key)}={self.escape_key(value)}' - for key, value in self.point_settings.defaultTags.items() - ) - # Ensure there's a comma between existing tags and default tags if both are present - if tags and default_tags: - tags += "," - tags += default_tags - # add escape symbols for special characters to tags fields = ",".join( diff --git a/influxdb_client_3/write_client/client/write_api.py b/influxdb_client_3/write_client/client/write_api.py index 0d139b9..250a07e 100644 --- a/influxdb_client_3/write_client/client/write_api.py +++ b/influxdb_client_3/write_client/client/write_api.py @@ -22,7 +22,7 @@ from influxdb_client_3.write_client.client._base import _BaseWriteApi, _HAS_DATACLASS from influxdb_client_3.write_client.client.util.helpers import get_org_query_param from influxdb_client_3.write_client.client.write.dataframe_serializer import DataframeSerializer -from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION +from influxdb_client_3.write_client.client.write.point import Point, DEFAULT_WRITE_PRECISION, sanitize_tag_order from influxdb_client_3.write_client.client.write.retry import WritesRetry from influxdb_client_3.write_client.domain import WritePrecision from influxdb_client_3.write_client.rest import _UTF_8_encoding @@ -69,30 +69,6 @@ class DefaultWriteOptions(Enum): timeout = DEFAULT_WRITE_TIMEOUT -def _sanitize_tag_order(tag_order): - if tag_order is None: - return [] - - if isinstance(tag_order, (str, bytes)): - raise TypeError("tag_order must be an iterable of strings, not str/bytes") - - if not isinstance(tag_order, Iterable): - raise TypeError("tag_order must be an iterable of strings") - - sanitized = [] - seen = set() - for tag in tag_order: - if tag is None or tag == "": - continue - if not isinstance(tag, str): - raise TypeError("tag_order entries must be strings") - if tag in seen: - continue - seen.add(tag) - sanitized.append(tag) - return sanitized - - class WriteOptions(object): """Write configuration.""" @@ -145,7 +121,7 @@ def __init__(self, write_type: WriteType = WriteType.batching, self.write_precision = write_precision self.timeout = timeout self.no_sync = no_sync - self.tag_order = _sanitize_tag_order(tag_order) + self.tag_order = sanitize_tag_order(tag_order) def to_retry_strategy(self, **kwargs): """ @@ -409,7 +385,10 @@ def write(self, bucket: str, org: str = None, if write_precision is None: write_precision = self._write_options.write_precision - kwargs.setdefault('tag_order', self._write_options.tag_order) + if 'tag_order' in kwargs: + kwargs['tag_order'] = sanitize_tag_order(kwargs.get('tag_order')) + else: + kwargs['tag_order'] = self._write_options.tag_order if self._write_options.write_type is WriteType.batching: return self._write_batching(bucket, org, record, diff --git a/tests/test_dataframe_serializer.py b/tests/test_dataframe_serializer.py index 7cb67e6..fc761e8 100644 --- a/tests/test_dataframe_serializer.py +++ b/tests/test_dataframe_serializer.py @@ -276,6 +276,15 @@ def test_tags_order(self): self.assertEqual(1, len(points)) self.assertEqual("h2o,a=a,b=b,c=c level=2i 1586048400000000000", points[0]) + points = data_frame_to_list_of_points(data_frame=data_frame, + point_settings=PointSettings(), + data_frame_measurement_name='h2o', + data_frame_tag_columns={"c", "a", "b"}, + tag_order=["c", "a"]) + + self.assertEqual(1, len(points)) + self.assertEqual("h2o,c=c,a=a,b=b level=2i 1586048400000000000", points[0]) + def test_escape_text_value(self): now = pd.Timestamp('2020-04-05 00:00+00:00') an_hour_ago = now - timedelta(hours=1) diff --git a/tests/test_polars.py b/tests/test_polars.py index 70571c9..d607db5 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -17,6 +17,7 @@ def test_to_list_of_points(self): df = pl.DataFrame(data={ "name": ['iot-devices', 'iot-devices', 'iot-devices'], "building": ['5a', '5a', '5a'], + "region": ['us-east', 'us-east', 'us-east'], "temperature": [72.3, 72.1, 72.2], "time": pl.Series(["2022-10-01T12:01:00Z", "2022-10-02T12:01:00Z", "2022-10-03T12:01:00Z"]) .str.to_datetime(time_unit='ns') @@ -25,13 +26,24 @@ def test_to_list_of_points(self): data_frame_measurement_name='iot-devices', data_frame_tag_columns=['building'], data_frame_timestamp_column='time') + actual_with_order = polars_data_frame_to_list_of_points(df, ps, + data_frame_measurement_name='iot-devices', + data_frame_tag_columns=['building', 'region'], + data_frame_timestamp_column='time', + tag_order=['region', 'building']) expected = [ - 'iot-devices,building=5a name="iot-devices",temperature=72.3 1664625660000000000', - 'iot-devices,building=5a name="iot-devices",temperature=72.1 1664712060000000000', - 'iot-devices,building=5a name="iot-devices",temperature=72.2 1664798460000000000' + 'iot-devices,building=5a name="iot-devices",region="us-east",temperature=72.3 1664625660000000000', + 'iot-devices,building=5a name="iot-devices",region="us-east",temperature=72.1 1664712060000000000', + 'iot-devices,building=5a name="iot-devices",region="us-east",temperature=72.2 1664798460000000000' + ] + expected_with_order = [ + 'iot-devices,region=us-east,building=5a name="iot-devices",temperature=72.3 1664625660000000000', + 'iot-devices,region=us-east,building=5a name="iot-devices",temperature=72.1 1664712060000000000', + 'iot-devices,region=us-east,building=5a name="iot-devices",temperature=72.2 1664798460000000000' ] self.assertEqual(expected, actual) + self.assertEqual(expected_with_order, actual_with_order) @unittest.skipIf(importlib.util.find_spec("polars") is None, 'Polars package not installed') From 0c6aa3cf76d4cd20b032ecb83da3213b255c40ad Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 16:14:21 +0100 Subject: [PATCH 10/12] refactor: improve readability --- .../client/write/dataframe_serializer.py | 14 +++++----- .../write_client/client/write/point.py | 5 ++-- .../write/polars_dataframe_serializer.py | 4 +-- tests/test_polars.py | 27 ++++++++++++++----- 4 files changed, 31 insertions(+), 19 deletions(-) diff --git a/influxdb_client_3/write_client/client/write/dataframe_serializer.py b/influxdb_client_3/write_client/client/write/dataframe_serializer.py index 394e19f..55d7319 100644 --- a/influxdb_client_3/write_client/client/write/dataframe_serializer.py +++ b/influxdb_client_3/write_client/client/write/dataframe_serializer.py @@ -10,7 +10,7 @@ from influxdb_client_3.write_client.domain import WritePrecision from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, \ - DEFAULT_WRITE_PRECISION, ordered_tag_keys_for_serialization + DEFAULT_WRITE_PRECISION, ordered_tag_keys logger = logging.getLogger('influxdb_client.client.write.dataframe_serializer') @@ -130,8 +130,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION # keys holds a list of string keys. keys = [] - # tags holds map of tag key -> tag f-string segment. - tags = {} + # tag_segments holds map of tag key -> tag f-string segment. + tag_segments = {} # fields holds a list of field f-string segments ordered alphabetically by field key fields = [] # field_indexes holds the index into each row of all the fields. @@ -188,7 +188,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION }}""" else: key_value = f',{key_format}={{str({val_format}).translate(_ESCAPE_KEY)}}' - tags[key] = key_value + tag_segments[key] = key_value continue elif timestamp_column is not None and key in timestamp_column: timestamp_index = field_index @@ -225,8 +225,8 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION measurement_name = str(data_frame_measurement_name).translate(_ESCAPE_MEASUREMENT) - tag_keys = ordered_tag_keys_for_serialization(list(tags.keys()), kwargs.get('tag_order')) - tags = ''.join(tags[tag_key] for tag_key in tag_keys) + tag_keys = ordered_tag_keys(list(tag_segments.keys()), kwargs.get('tag_order')) + tag_string = ''.join(tag_segments[tag_key] for tag_key in tag_keys) fields = ''.join(fields) timestamp = '{p[%s].value}' % timestamp_index if precision == WritePrecision.US: @@ -236,7 +236,7 @@ def __init__(self, data_frame, point_settings, precision=DEFAULT_WRITE_PRECISION elif precision == WritePrecision.S: timestamp = '{int(p[%s].value / 1e9)}' % timestamp_index - f = eval(f'lambda p: f"""{{measurement_name}}{tags} {fields} {timestamp}"""', { + f = eval(f'lambda p: f"""{{measurement_name}}{tag_string} {fields} {timestamp}"""', { 'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, '_ESCAPE_STRING': _ESCAPE_STRING, diff --git a/influxdb_client_3/write_client/client/write/point.py b/influxdb_client_3/write_client/client/write/point.py index 2dccc65..8346b0c 100644 --- a/influxdb_client_3/write_client/client/write/point.py +++ b/influxdb_client_3/write_client/client/write/point.py @@ -256,8 +256,7 @@ def __str__(self): def _append_tags(tags, tag_order=None): _return = [] - ordered_tag_keys = ordered_tag_keys_for_serialization(sorted(tags.keys()), tag_order) - for tag_key in ordered_tag_keys: + for tag_key in ordered_tag_keys(sorted(tags.keys()), tag_order): tag_value = tags.get(tag_key) if tag_value is None: @@ -295,7 +294,7 @@ def sanitize_tag_order(tag_order): return sanitized -def ordered_tag_keys_for_serialization(existing_keys, tag_order=None): +def ordered_tag_keys(existing_keys, tag_order=None): ordered_keys = list(existing_keys) if not tag_order: return ordered_keys diff --git a/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py b/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py index 1b88a32..61f272c 100644 --- a/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py +++ b/influxdb_client_3/write_client/client/write/polars_dataframe_serializer.py @@ -8,7 +8,7 @@ import math from influxdb_client_3.write_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, DEFAULT_WRITE_PRECISION, \ - ordered_tag_keys_for_serialization + ordered_tag_keys logger = logging.getLogger('influxdb_client.client.write.polars_dataframe_serializer') @@ -83,7 +83,7 @@ def to_line_protocol(self, row): tag_keys.append(key) tag_values[key] = value - final_tag_keys = ordered_tag_keys_for_serialization(tag_keys, self.tag_order) + final_tag_keys = ordered_tag_keys(tag_keys, self.tag_order) tags = ",".join( f'{self.escape_key(key)}={self.escape_key(tag_values[key])}' for key in final_tag_keys diff --git a/tests/test_polars.py b/tests/test_polars.py index d607db5..f8f1bde 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -26,24 +26,37 @@ def test_to_list_of_points(self): data_frame_measurement_name='iot-devices', data_frame_tag_columns=['building'], data_frame_timestamp_column='time') - actual_with_order = polars_data_frame_to_list_of_points(df, ps, - data_frame_measurement_name='iot-devices', - data_frame_tag_columns=['building', 'region'], - data_frame_timestamp_column='time', - tag_order=['region', 'building']) expected = [ 'iot-devices,building=5a name="iot-devices",region="us-east",temperature=72.3 1664625660000000000', 'iot-devices,building=5a name="iot-devices",region="us-east",temperature=72.1 1664712060000000000', 'iot-devices,building=5a name="iot-devices",region="us-east",temperature=72.2 1664798460000000000' ] - expected_with_order = [ + self.assertEqual(expected, actual) + + def test_to_list_of_points_with_tag_order(self): + import polars as pl + ps = PointSettings() + df = pl.DataFrame(data={ + "name": ['iot-devices', 'iot-devices', 'iot-devices'], + "building": ['5a', '5a', '5a'], + "region": ['us-east', 'us-east', 'us-east'], + "temperature": [72.3, 72.1, 72.2], + "time": pl.Series(["2022-10-01T12:01:00Z", "2022-10-02T12:01:00Z", "2022-10-03T12:01:00Z"]) + .str.to_datetime(time_unit='ns') + }) + actual = polars_data_frame_to_list_of_points(df, ps, + data_frame_measurement_name='iot-devices', + data_frame_tag_columns=['building', 'region'], + data_frame_timestamp_column='time', + tag_order=['region', 'building']) + + expected = [ 'iot-devices,region=us-east,building=5a name="iot-devices",temperature=72.3 1664625660000000000', 'iot-devices,region=us-east,building=5a name="iot-devices",temperature=72.1 1664712060000000000', 'iot-devices,region=us-east,building=5a name="iot-devices",temperature=72.2 1664798460000000000' ] self.assertEqual(expected, actual) - self.assertEqual(expected_with_order, actual_with_order) @unittest.skipIf(importlib.util.find_spec("polars") is None, 'Polars package not installed') From 7dd378af40bea13482734a42a0621cb6c4aaf3c9 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 17:18:27 +0100 Subject: [PATCH 11/12] test: more coverage --- tests/test_influxdb_client_3.py | 38 ++++++++++++++++++++++++++++++++- tests/test_polars.py | 23 ++++++++++++++++++++ 2 files changed, 60 insertions(+), 1 deletion(-) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 21eede6..40d1b73 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -1,7 +1,7 @@ import re import unittest +from collections import defaultdict from unittest.mock import patch - from pytest_httpserver import HTTPServer from influxdb_client_3 import InfluxDBClient3, WritePrecision, DefaultWriteOptions, Point, WriteOptions, WriteType, \ @@ -156,17 +156,53 @@ def test_write_api_custom_options_no_error(self): write_options = WriteOptions(write_type=WriteType.batching) write_client_option = {'write_options': write_options} client = InfluxDBClient3(write_client_options=write_client_option) + sync_client = None try: client._write_api._write_batching("bucket", "org", Point.measurement("test"), None) client._write_api._write_batching("bucket", "org", { "measurement": "test", "fields": {"value": 1} }, None) + point = Point.measurement("test").tag("host", "h1").field("value", 1).time(1, WritePrecision.S) + payload = defaultdict(list) + client._write_api._serialize(point, WritePrecision.NS, payload, tag_order=["host"]) + self.assertIn(WritePrecision.S, payload) + + payload_forced = defaultdict(list) + client._write_api._serialize(point, WritePrecision.NS, payload_forced, + precision_from_point=False, tag_order=["host"]) + self.assertIn(WritePrecision.NS, payload_forced) + + sync_client = InfluxDBClient3( + host="localhost", + org="my_org", + database="my_db", + token="my_token", + write_client_options=write_client_options( + write_options=WriteOptions(write_type=WriteType.synchronous)) + ) + with patch.object(sync_client._write_api, "_post_write", return_value=None) as mock_post: + sync_point = Point.measurement("measurement") \ + .tag("host", "h1") \ + .tag("region", "us-east") \ + .field("value", 1) + sync_client.write(record=sync_point, tag_order=["region", "", "host", "region"]) + + args, kwargs = mock_post.call_args + body = kwargs.get("body") + if body is None and len(args) >= 4: + body = args[3] + if isinstance(body, bytes): + body = body.decode("utf-8") + self.assertIn("measurement,region=us-east,host=h1 value=1i", body) + self.assertTrue(True) except Exception as e: self.fail(f"Write API with default options raised an exception: {str(e)}") finally: client._write_api._on_complete() # abort batch writes - otherwise test cycles through urllib3 retries + if sync_client is not None: + sync_client.close() def test_default_client(self): expected_precision = DefaultWriteOptions.write_precision.value diff --git a/tests/test_polars.py b/tests/test_polars.py index f8f1bde..ddbaed6 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -58,6 +58,29 @@ def test_to_list_of_points_with_tag_order(self): ] self.assertEqual(expected, actual) + def test_to_list_of_points_with_default_tags(self): + import polars as pl + ps = PointSettings(env="prod", building="ignored") + df = pl.DataFrame(data={ + "name": ['iot-devices'], + "building": ['5a'], + "temperature": [72.3], + "time": pl.Series(["2022-10-01T12:01:00Z"]).str.to_datetime(time_unit='ns') + }) + + actual = polars_data_frame_to_list_of_points( + df, ps, + data_frame_measurement_name='iot-devices', + data_frame_tag_columns=['building'], + data_frame_timestamp_column='time', + tag_order=['env', 'building'], + ) + + expected = [ + 'iot-devices,env=prod,building=5a name="iot-devices",temperature=72.3 1664625660000000000' + ] + self.assertEqual(expected, actual) + @unittest.skipIf(importlib.util.find_spec("polars") is None, 'Polars package not installed') class TestWritePolars(unittest.TestCase): From b68c65795098508a8e2ce67cf9f71988183bd558 Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Mon, 2 Mar 2026 17:53:28 +0100 Subject: [PATCH 12/12] test: more coverage --- tests/test_dataframe_serializer.py | 15 ++++++++++ tests/test_influxdb_client_3.py | 7 +++++ tests/test_point.py | 12 +++++++- tests/test_polars.py | 48 ++++++++++++++++++++++++++++-- 4 files changed, 78 insertions(+), 4 deletions(-) diff --git a/tests/test_dataframe_serializer.py b/tests/test_dataframe_serializer.py index fc761e8..419125f 100644 --- a/tests/test_dataframe_serializer.py +++ b/tests/test_dataframe_serializer.py @@ -285,6 +285,21 @@ def test_tags_order(self): self.assertEqual(1, len(points)) self.assertEqual("h2o,c=c,a=a,b=b level=2i 1586048400000000000", points[0]) + ps = PointSettings(z="from-default", c="override-ignored") + points_with_defaults = data_frame_to_list_of_points( + data_frame=data_frame, + point_settings=ps, + data_frame_measurement_name='h2o', + data_frame_tag_columns={"c", "a", "b"}, + tag_order=["z", "c", "a"], + ) + + self.assertEqual(1, len(points_with_defaults)) + self.assertEqual( + "h2o,z=from-default,c=c,a=a,b=b level=2i 1586048400000000000", + points_with_defaults[0] + ) + def test_escape_text_value(self): now = pd.Timestamp('2020-04-05 00:00+00:00') an_hour_ago = now - timedelta(hours=1) diff --git a/tests/test_influxdb_client_3.py b/tests/test_influxdb_client_3.py index 40d1b73..c928f60 100644 --- a/tests/test_influxdb_client_3.py +++ b/tests/test_influxdb_client_3.py @@ -163,6 +163,13 @@ def test_write_api_custom_options_no_error(self): "measurement": "test", "fields": {"value": 1} }, None) + df = pd.DataFrame({ + "value": [1, 2], + }, index=pd.to_datetime(["2024-01-01T00:00:00Z", "2024-01-01T01:00:00Z"])) + client._write_api._write_batching( + "bucket", "org", df, None, + data_frame_measurement_name="test_measurement", + ) point = Point.measurement("test").tag("host", "h1").field("value", 1).time(1, WritePrecision.S) payload = defaultdict(list) client._write_api._serialize(point, WritePrecision.NS, payload, tag_order=["host"]) diff --git a/tests/test_point.py b/tests/test_point.py index 42320af..98f724c 100644 --- a/tests/test_point.py +++ b/tests/test_point.py @@ -2,7 +2,7 @@ import unittest from influxdb_client_3 import WritePrecision -from influxdb_client_3.write_client.client.write.point import EPOCH, Point +from influxdb_client_3.write_client.client.write.point import EPOCH, Point, _np_is_subtype class TestPoint(unittest.TestCase): @@ -55,3 +55,13 @@ def test_point_field_types_and_time_conversion(self): Point.measurement("m").field("bad", object()).to_line_protocol() with self.assertRaises(ValueError): Point.measurement("m").field("value", 1).time([]).to_line_protocol() + + def test_np_is_subtype(self): + try: + import numpy as np + except ImportError: + self.skipTest("numpy not installed") + + self.assertTrue(_np_is_subtype(np.float64(1.0), 'float')) + self.assertTrue(_np_is_subtype(np.int64(1), 'int')) + self.assertFalse(_np_is_subtype(np.int64(1), 'other')) diff --git a/tests/test_polars.py b/tests/test_polars.py index ddbaed6..61ff206 100644 --- a/tests/test_polars.py +++ b/tests/test_polars.py @@ -60,10 +60,10 @@ def test_to_list_of_points_with_tag_order(self): def test_to_list_of_points_with_default_tags(self): import polars as pl - ps = PointSettings(env="prod", building="ignored") + ps = PointSettings(env="prod", building="ignored", skip_empty="") df = pl.DataFrame(data={ "name": ['iot-devices'], - "building": ['5a'], + "building": [''], "temperature": [72.3], "time": pl.Series(["2022-10-01T12:01:00Z"]).str.to_datetime(time_unit='ns') }) @@ -77,10 +77,52 @@ def test_to_list_of_points_with_default_tags(self): ) expected = [ - 'iot-devices,env=prod,building=5a name="iot-devices",temperature=72.3 1664625660000000000' + 'iot-devices,env=prod,building=ignored name="iot-devices",temperature=72.3 1664625660000000000' ] self.assertEqual(expected, actual) + def test_to_list_of_points_with_precision_variants(self): + import polars as pl + ps = PointSettings() + df = pl.DataFrame(data={ + "name": ['iot-devices'], + "temperature": [72.3], + "time": pl.Series(["2022-10-01T12:01:00Z"]).str.to_datetime(time_unit='ns') + }) + + actual_us = polars_data_frame_to_list_of_points( + df, ps, precision='us', + data_frame_measurement_name='iot-devices', + data_frame_timestamp_column='time') + self.assertEqual( + ['iot-devices name="iot-devices",temperature=72.3 1664625660000000'], + actual_us + ) + + actual_ms = polars_data_frame_to_list_of_points( + df, ps, precision='ms', + data_frame_measurement_name='iot-devices', + data_frame_timestamp_column='time') + self.assertEqual( + ['iot-devices name="iot-devices",temperature=72.3 1664625660000'], + actual_ms + ) + + actual_s = polars_data_frame_to_list_of_points( + df, ps, precision='s', + data_frame_measurement_name='iot-devices', + data_frame_timestamp_column='time') + self.assertEqual( + ['iot-devices name="iot-devices",temperature=72.3 1664625660'], + actual_s + ) + + with self.assertRaisesRegex(ValueError, "Unsupported precision"): + polars_data_frame_to_list_of_points( + df, ps, precision='bad', + data_frame_measurement_name='iot-devices', + data_frame_timestamp_column='time') + @unittest.skipIf(importlib.util.find_spec("polars") is None, 'Polars package not installed') class TestWritePolars(unittest.TestCase):