diff --git a/langfuse/_client/client.py b/langfuse/_client/client.py index f7ee28e36..c8782c638 100644 --- a/langfuse/_client/client.py +++ b/langfuse/_client/client.py @@ -2483,7 +2483,7 @@ def get_dataset( items = [DatasetItemClient(i, langfuse=self) for i in dataset_items] - return DatasetClient(dataset, items=items) + return DatasetClient(dataset, items=items, version=version) except Error as e: handle_fern_exception(e) @@ -2574,6 +2574,7 @@ def run_experiment( run_evaluators: List[RunEvaluatorFunction] = [], max_concurrency: int = 50, metadata: Optional[Dict[str, str]] = None, + _dataset_version: Optional[datetime] = None, ) -> ExperimentResult: """Run an experiment on a dataset with automatic tracing and evaluation. @@ -2751,6 +2752,7 @@ def average_accuracy(*, item_results, **kwargs): run_evaluators=run_evaluators or [], max_concurrency=max_concurrency, metadata=metadata, + dataset_version=_dataset_version, ), ), ) @@ -2768,6 +2770,7 @@ async def _run_experiment_async( run_evaluators: List[RunEvaluatorFunction], max_concurrency: int, metadata: Optional[Dict[str, Any]] = None, + dataset_version: Optional[datetime] = None, ) -> ExperimentResult: langfuse_logger.debug( f"Starting experiment '{name}' run '{run_name}' with {len(data)} items" @@ -2788,6 +2791,7 @@ async def process_item(item: ExperimentItem) -> ExperimentItemResult: run_name, description, metadata, + dataset_version, ) # Run all items concurrently @@ -2874,6 +2878,7 @@ async def _process_experiment_item( experiment_run_name: str, experiment_description: Optional[str], experiment_metadata: Optional[Dict[str, Any]] = None, + dataset_version: Optional[datetime] = None, ) -> ExperimentItemResult: span_name = "experiment-item-run" @@ -2925,6 +2930,7 @@ async def _process_experiment_item( datasetItemId=item.id, # type: ignore traceId=trace_id, observationId=span.id, + datasetVersion=dataset_version, ), ) diff --git a/langfuse/_client/datasets.py b/langfuse/_client/datasets.py index 0a9a0312c..4a698e0ab 100644 --- a/langfuse/_client/datasets.py +++ b/langfuse/_client/datasets.py @@ -155,7 +155,7 @@ class DatasetClient: created_at (datetime): Timestamp of dataset creation. updated_at (datetime): Timestamp of the last update to the dataset. items (List[DatasetItemClient]): List of dataset items associated with the dataset. - + version (Optional[datetime]): Timestamp of the dataset version. Example: Print the input of each dataset item in a dataset. ```python @@ -178,8 +178,14 @@ class DatasetClient: created_at: dt.datetime updated_at: dt.datetime items: List[DatasetItemClient] + version: Optional[dt.datetime] - def __init__(self, dataset: Dataset, items: List[DatasetItemClient]): + def __init__( + self, + dataset: Dataset, + items: List[DatasetItemClient], + version: Optional[dt.datetime] = None, + ): """Initialize the DatasetClient.""" self.id = dataset.id self.name = dataset.name @@ -189,6 +195,7 @@ def __init__(self, dataset: Dataset, items: List[DatasetItemClient]): self.created_at = dataset.created_at self.updated_at = dataset.updated_at self.items = items + self.version = version self._langfuse: Optional["Langfuse"] = None def _get_langfuse_client(self) -> Optional["Langfuse"]: @@ -421,4 +428,5 @@ def content_diversity(*, item_results, **kwargs): run_evaluators=run_evaluators, max_concurrency=max_concurrency, metadata=metadata, + _dataset_version=self.version, ) diff --git a/tests/test_datasets.py b/tests/test_datasets.py index f86812138..1a0cda63a 100644 --- a/tests/test_datasets.py +++ b/tests/test_datasets.py @@ -569,3 +569,74 @@ def test_get_dataset_with_version(): # Verify fetching without version returns both items (latest) dataset_latest = langfuse.get_dataset(name) assert len(dataset_latest.items) == 2 + + +def test_run_experiment_with_versioned_dataset(): + """Test that running an experiment on a versioned dataset works correctly.""" + from datetime import timedelta + import time + + langfuse = Langfuse(debug=False) + + # Create dataset + name = create_uuid() + langfuse.create_dataset(name=name) + + # Create first item + langfuse.create_dataset_item( + dataset_name=name, input={"question": "What is 2+2?"}, expected_output=4 + ) + langfuse.flush() + time.sleep(3) + + # Fetch dataset to get the actual server-assigned timestamp of item1 + dataset_after_item1 = langfuse.get_dataset(name) + assert len(dataset_after_item1.items) == 1 + item1_id = dataset_after_item1.items[0].id + item1_created_at = dataset_after_item1.items[0].created_at + + # Use a timestamp 1 second after item1's creation + version_timestamp = item1_created_at + timedelta(seconds=1) + time.sleep(3) + + # Update item1 after the version timestamp (this should not affect versioned query) + langfuse.create_dataset_item( + id=item1_id, + dataset_name=name, + input={"question": "What is 4+4?"}, + expected_output=8, + ) + langfuse.flush() + time.sleep(3) + + # Create second item (after version timestamp) + langfuse.create_dataset_item( + dataset_name=name, input={"question": "What is 3+3?"}, expected_output=6 + ) + langfuse.flush() + time.sleep(3) + + # Get versioned dataset (should only have first item with ORIGINAL state) + versioned_dataset = langfuse.get_dataset(name, version=version_timestamp) + assert len(versioned_dataset.items) == 1 + assert versioned_dataset.version == version_timestamp + # Verify it returns the ORIGINAL version of item1 (before the update) + assert versioned_dataset.items[0].input == {"question": "What is 2+2?"} + assert versioned_dataset.items[0].expected_output == 4 + assert versioned_dataset.items[0].id == item1_id + + # Run a simple experiment on the versioned dataset + def simple_task(*, item, **kwargs): + # Just return a static answer + return item.expected_output + + result = versioned_dataset.run_experiment( + name="Versioned Dataset Test", + description="Testing experiment with versioned dataset", + task=simple_task, + ) + + # Verify experiment ran successfully + assert result.name == "Versioned Dataset Test" + assert len(result.item_results) == 1 # Only one item in versioned dataset + assert result.item_results[0].output == 4