Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
722bf3e
Add basic tracing middleware and global control
davidigandan Jan 13, 2026
52cb04d
Instrument on subscribe and add dcid to span attributes
davidigandan Jan 26, 2026
cc9ee12
Add spanid and traceid metadata to greylog
davidigandan Jan 26, 2026
f7cc658
Add recipe_id to spans
davidigandan Jan 26, 2026
8b2a2f1
Add dev and prod dependencies
davidigandan Jan 26, 2026
0686e28
Remove dcid extract from message and inject to span logic. Will be ad…
davidigandan Jan 26, 2026
2d9e21c
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
3a5283a
Use plugin configurations to configure connection to OTELCollector
davidigandan Jan 26, 2026
4b999f1
Remove vestigial dcid handling and unnecessary debug statements
davidigandan Jan 26, 2026
4b86715
remove unhelpful docstring
davidigandan Jan 26, 2026
9c13d07
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan Jan 26, 2026
3e0b902
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
d446e80
imported OTEL config class to common_service
davidigandan Jan 26, 2026
16b0e10
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jan 26, 2026
7ad857f
add marshmallow dependency
davidigandan Jan 27, 2026
468f940
Merge branch 'dev' of https://github.com/DiamondLightSource/python-wo…
davidigandan Jan 27, 2026
7aae664
add zocalo dependency
davidigandan Jan 27, 2026
902a7df
Fix possibly unbound error
davidigandan Jan 27, 2026
9e5adb7
Moved plugin functionality to python-workflows
davidigandan Feb 2, 2026
1d7457e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 2, 2026
ff3679c
Fixed typos, vestigial code and improper use of log_extender
davidigandan Feb 6, 2026
e6be925
Remove vestigial try block and fix runtime issue where None[] or None…
davidigandan Feb 6, 2026
aebe1ca
Remove vestigial try block and fix runtime issue where None[] or None…
davidigandan Feb 6, 2026
77eb9e9
Implement ExitStack() to manage multiple context managers and clean t…
davidigandan Feb 6, 2026
5f94077
Fix broken tracing functionality
davidigandan Feb 17, 2026
786bd00
Fix
davidigandan Feb 17, 2026
33fbce2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 17, 2026
ed6bc93
Fix rw.environment.get('ID') bug
davidigandan Feb 17, 2026
2369ba1
Ensure environment and environment.id exists
davidigandan Feb 17, 2026
554baa2
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 17, 2026
7221ec4
Remove the need for enironment variable in mock
davidigandan Feb 17, 2026
b7e7999
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Feb 17, 2026
2fd00fa
Fix ruff error
davidigandan Feb 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
]
license = { text = "BSD-3-Clause" }
requires-python = ">=3.10"
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"]
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ]

[project.urls]
Download = "https://github.com/DiamondLightSource/python-workflows/releases"
Expand Down Expand Up @@ -53,6 +53,7 @@ OfflineTransport = "workflows.transport.offline_transport:OfflineTransport"
pika = "workflows.util.zocalo.configuration:Pika"
stomp = "workflows.util.zocalo.configuration:Stomp"
transport = "workflows.util.zocalo.configuration:DefaultTransport"
opentelemetry = "workflows.util.zocalo.configuration:OTEL"

[project.scripts]
"workflows.validate_recipe" = "workflows.recipe.validate:main"
Expand Down
4 changes: 4 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ pytest-mock==3.14.0
pytest-timeout==2.3.1
stomp-py==8.1.2
websocket-client==1.8.0
opentelemetry-api==1.20.0
opentelemetry-sdk==1.20.0
opentelemetry-exporter-otlp-proto-http==1.20.0
marshmallow
48 changes: 46 additions & 2 deletions src/workflows/recipe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
import functools
import logging
from collections.abc import Callable
from contextlib import ExitStack
from typing import Any

from opentelemetry import trace

from workflows.recipe.recipe import Recipe
from workflows.recipe.validate import validate_recipe
from workflows.recipe.wrapper import RecipeWrapper
Expand Down Expand Up @@ -68,11 +71,52 @@ def unwrap_recipe(header, message):
if mangle_for_receiving:
message = mangle_for_receiving(message)
if header.get("workflows-recipe") in {True, "True", "true", 1}:
otel_logs = None
rw = RecipeWrapper(message=message, transport=transport_layer)
if log_extender and rw.environment and rw.environment.get("ID"):
with log_extender("recipe_ID", rw.environment["ID"]):

if hasattr(rw, "environment") and rw.environment.get("ID"):
# Extract recipe ID from environment and add to current span
span = trace.get_current_span()
recipe_id = rw.environment.get("ID")

if recipe_id:
span.set_attribute("recipe_id", recipe_id)

# Extract span_id and trace_id for logging
span_context = span.get_span_context()
if span_context and span_context.is_valid:
span_id = span_context.span_id
trace_id = span_context.trace_id

otel_logs = {
"span_id": span_id,
"trace_id": trace_id,
}

if recipe_id:
otel_logs["recipe_id"] = recipe_id

with ExitStack() as stack:
# Configure the context depending on if service is emitting spans
if (
otel_logs
and log_extender
and rw.environment
and rw.environment.get("ID")
):
stack.enter_context(
log_extender("recipe_ID", rw.environment.get("ID"))
)
stack.enter_context(log_extender("otel_logs", otel_logs))
elif log_extender and rw.environment and rw.environment.get("ID"):
stack.enter_context(
log_extender("recipe_ID", rw.environment.get("ID"))
)

return callback(rw, header, message.get("payload"))

return callback(rw, header, message.get("payload"))

if allow_non_recipe_messages:
return callback(None, header, message)
# self.log.warning('Discarding non-recipe message:\n' + \
Expand Down
41 changes: 41 additions & 0 deletions src/workflows/services/common_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@
import time
from typing import Any

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

import workflows
import workflows.logging
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware


class Status(enum.Enum):
Expand Down Expand Up @@ -185,6 +192,40 @@ def start_transport(self):
self.transport.subscription_callback_set_intercept(
self._transport_interceptor
)

# Configure OTELTracing if configuration is available
otel_config = (
self.config._opentelemetry
if self.config and hasattr(self.config, "opentelemetry")
else None
)
if otel_config:
# Configure OTELTracing
resource = Resource.create(
{
SERVICE_NAME: self._service_name,
}
)

self.log.debug("Configuring OTELTracing")
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

# Configure BatchProcessor and OTLPSpanExporter using config values
otlp_exporter = OTLPSpanExporter(
endpoint=otel_config["endpoint"],
timeout=otel_config.get("timeout", 10),
)
span_processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(span_processor)

# Add OTELTracingMiddleware to the transport layer
tracer = trace.get_tracer(__name__)
otel_middleware = OTELTracingMiddleware(
tracer, service_name=self._service_name
)
self._transport.add_middleware(otel_middleware)

metrics = self._environment.get("metrics")
if metrics:
import prometheus_client
Expand Down
5 changes: 5 additions & 0 deletions src/workflows/transport/middleware/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,10 @@ def wrapped_callback(header, message):


def wrap(f: Callable):
# debugging
if f.__name__ == "send":
print("we are wrapping send now")

@functools.wraps(f)
def wrapper(self, *args, **kwargs):
return functools.reduce(
Expand All @@ -243,4 +247,5 @@ def wrapper(self, *args, **kwargs):
lambda *args, **kwargs: f(self, *args, **kwargs),
)(*args, **kwargs)

print(wrapper.__wrapped__)
return wrapper
Loading
Loading