Skip to content
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#4935](https://github.com/open-telemetry/opentelemetry-python/pull/4935))
- `opentelemetry-sdk`: implement metric reader metrics
([#4970](https://github.com/open-telemetry/opentelemetry-python/pull/4970))
- `opentelemetry-sdk`: implement processor metrics
([#5012](https://github.com/open-telemetry/opentelemetry-python/pull/5012))
- `opentelemetry-sdk`: upgrade vendored OTel configuration schema from v1.0.0-rc.3 to v1.0.0
([#4965](https://github.com/open-telemetry/opentelemetry-python/pull/4965))
- improve check-links ci job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,27 @@
get_value,
set_value,
)
from opentelemetry.metrics import MeterProvider, get_meter_provider
from opentelemetry.sdk._logs import (
LogRecordProcessor,
ReadableLogRecord,
ReadWriteLogRecord,
)
from opentelemetry.sdk._shared_internal import BatchProcessor, DuplicateFilter
from opentelemetry.sdk._shared_internal import (
BatchProcessor,
DuplicateFilter,
ProcessorMetrics,
)
from opentelemetry.sdk.environment_variables import (
OTEL_BLRP_EXPORT_TIMEOUT,
OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
OTEL_BLRP_MAX_QUEUE_SIZE,
OTEL_BLRP_SCHEDULE_DELAY,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)

_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
Expand Down Expand Up @@ -170,9 +178,19 @@ class SimpleLogRecordProcessor(LogRecordProcessor):
propagating to the application.
"""

def __init__(self, exporter: LogRecordExporter):
def __init__(
self,
exporter: LogRecordExporter,
*,
meter_provider: MeterProvider | None = None,
):
self._exporter = exporter
self._shutdown = False
self._metrics = ProcessorMetrics(
"logs",
OtelComponentTypeValues.SIMPLE_LOG_PROCESSOR,
meter_provider or get_meter_provider(),
)

def on_emit(self, log_record: ReadWriteLogRecord):
# Prevent entering a recursive loop.
Expand All @@ -193,6 +211,7 @@ def on_emit(self, log_record: ReadWriteLogRecord):
set_value(_ON_EMIT_RECURSION_COUNT_KEY, cnt + 1), # pyright: ignore[reportOperatorIssue]
)
)
error: Exception | None = None
try:
if self._shutdown:
_logger.warning("Processor is already shutdown, ignoring call")
Expand All @@ -211,9 +230,11 @@ def on_emit(self, log_record: ReadWriteLogRecord):
limits=log_record.limits,
)
self._exporter.export((readable_log_record,))
except Exception: # pylint: disable=broad-exception-caught
except Exception as err: # pylint: disable=broad-exception-caught
error = err
_logger.exception("Exception while exporting logs.")
finally:
self._metrics.finish_items(1, error)
detach(token)

def shutdown(self):
Expand Down Expand Up @@ -246,6 +267,8 @@ def __init__(
max_export_batch_size: int | None = None,
export_timeout_millis: float | None = None,
max_queue_size: int | None = None,
*,
meter_provider: MeterProvider | None = None,
):
if max_queue_size is None:
max_queue_size = BatchLogRecordProcessor._default_max_queue_size()
Expand Down Expand Up @@ -276,6 +299,12 @@ def __init__(
export_timeout_millis,
max_queue_size,
"Log",
ProcessorMetrics(
"logs",
OtelComponentTypeValues.BATCHING_LOG_PROCESSOR,
meter_provider or get_meter_provider(),
capacity=max_queue_size,
),
)

def on_emit(self, log_record: ReadWriteLogRecord) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
detach,
set_value,
)
from opentelemetry.sdk._shared_internal._processor_metrics import (
ProcessorMetrics,
)
from opentelemetry.util._once import Once


Expand Down Expand Up @@ -98,6 +101,7 @@ def __init__(
export_timeout_millis: float,
max_queue_size: int,
exporting: str,
metrics: ProcessorMetrics,
):
self._bsp_reset_once = Once()
self._exporter = exporter
Expand Down Expand Up @@ -127,6 +131,9 @@ def __init__(
os.register_at_fork(after_in_child=lambda: weak_reinit()()) # pyright: ignore[reportOptionalCall] pylint: disable=unnecessary-lambda
self._pid = os.getpid()

metrics.register_queue_size(lambda: len(self._queue))
self._metrics = metrics

def _should_export_batch(
self, batch_strategy: BatchExportStrategy, num_iterations: int
) -> bool:
Expand Down Expand Up @@ -177,23 +184,27 @@ def _export(self, batch_strategy: BatchExportStrategy) -> None:
while self._should_export_batch(batch_strategy, iteration):
iteration += 1
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
error: Exception | None = None
count = 0
try:
count = min(
self._max_export_batch_size,
len(self._queue),
)
self._exporter.export(
[
# Oldest records are at the back, so pop from there.
self._queue.pop()
for _ in range(
min(
self._max_export_batch_size,
len(self._queue),
)
)
for _ in range(count)
]
)
except Exception: # pylint: disable=broad-exception-caught
except Exception as err: # pylint: disable=broad-exception-caught
error = err
_logger.exception(
"Exception while exporting %s.", self._exporting
)
finally:
self._metrics.finish_items(count, error)
detach(token)

def emit(self, data: Telemetry) -> None:
Expand All @@ -204,6 +215,7 @@ def emit(self, data: Telemetry) -> None:
self._bsp_reset_once.do_once(self._at_fork_reinit)
if len(self._queue) == self._max_queue_size:
_logger.warning("Queue full, dropping %s.", self._exporting)
self._metrics.drop_items(1)
# This will drop a log from the right side if the queue is at _max_queue_size.
self._queue.appendleft(data)
if len(self._queue) >= self._max_export_batch_size:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from collections import Counter
from collections.abc import Callable
from typing import Literal

from opentelemetry.metrics import CallbackOptions, MeterProvider, Observation
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OTEL_COMPONENT_NAME,
OTEL_COMPONENT_TYPE,
OtelComponentTypeValues,
)
from opentelemetry.semconv._incubating.metrics.otel_metrics import (
OTEL_SDK_PROCESSOR_LOG_QUEUE_SIZE,
OTEL_SDK_PROCESSOR_SPAN_QUEUE_SIZE,
create_otel_sdk_processor_log_processed,
create_otel_sdk_processor_log_queue_capacity,
create_otel_sdk_processor_span_processed,
create_otel_sdk_processor_span_queue_capacity,
)
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE

_component_counter = Counter()


class ProcessorMetrics:
def __init__(
self,
signal: Literal["traces", "logs"],
component_type: OtelComponentTypeValues,
meter_provider: MeterProvider,
*,
capacity: int | None = None,
) -> None:
self._signal = signal
meter = meter_provider.get_meter("opentelemetry-sdk")
self._meter = meter

count = _component_counter[component_type.value]
_component_counter[component_type.value] = count + 1

self._standard_attrs = {
OTEL_COMPONENT_TYPE: component_type.value,
OTEL_COMPONENT_NAME: f"{component_type.value}/{count}",
}

self._dropped_attrs = {
**self._standard_attrs,
ERROR_TYPE: "queue_full",
}

if signal == "traces":
create_processed = create_otel_sdk_processor_span_processed
create_queue_capacity = (
create_otel_sdk_processor_span_queue_capacity
)
else:
create_processed = create_otel_sdk_processor_log_processed
create_queue_capacity = (
create_otel_sdk_processor_log_queue_capacity
)

self._processed = create_processed(meter)

if capacity is not None:
self._queue_capacity = create_queue_capacity(meter)
self._queue_capacity.add(capacity, self._standard_attrs)

def register_queue_size(self, get_queue_size: Callable[[], int]) -> None:
def record_queue_size(
_options: CallbackOptions,
) -> tuple[Observation]:
return (Observation(get_queue_size(), self._standard_attrs),)

if self._signal == "traces":
queue_size_name = OTEL_SDK_PROCESSOR_SPAN_QUEUE_SIZE
queue_size_description = "The number of spans in the queue of a given instance of an SDK span processor."
queue_size_unit = "{span}"
else:
queue_size_name = OTEL_SDK_PROCESSOR_LOG_QUEUE_SIZE
queue_size_description = "The number of logs in the queue of a given instance of an SDK log processor."
queue_size_unit = "{log}"

self._meter.create_observable_up_down_counter(
queue_size_name,
callbacks=(record_queue_size,),
description=queue_size_description,
unit=queue_size_unit,
)

def drop_items(self, count: int) -> None:
self._processed.add(count, self._dropped_attrs)

def finish_items(self, count: int, error: Exception | None) -> None:
if not error:
self._processed.add(count, self._standard_attrs)
return
attrs = {
**self._standard_attrs,
ERROR_TYPE: type(error).__name__,
}
self._processed.add(count, attrs)
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@
detach,
set_value,
)
from opentelemetry.sdk._shared_internal import BatchProcessor
from opentelemetry.metrics import MeterProvider, get_meter_provider
from opentelemetry.sdk._shared_internal import BatchProcessor, ProcessorMetrics
from opentelemetry.sdk.environment_variables import (
OTEL_BSP_EXPORT_TIMEOUT,
OTEL_BSP_MAX_EXPORT_BATCH_SIZE,
OTEL_BSP_MAX_QUEUE_SIZE,
OTEL_BSP_SCHEDULE_DELAY,
)
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)

_DEFAULT_SCHEDULE_DELAY_MILLIS = 5000
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
Expand Down Expand Up @@ -91,8 +95,18 @@ class SimpleSpanProcessor(SpanProcessor):
passes ended spans directly to the configured `SpanExporter`.
"""

def __init__(self, span_exporter: SpanExporter):
def __init__(
self,
span_exporter: SpanExporter,
*,
meter_provider: MeterProvider | None = None,
):
self.span_exporter = span_exporter
self._metrics = ProcessorMetrics(
"traces",
OtelComponentTypeValues.SIMPLE_SPAN_PROCESSOR,
meter_provider or get_meter_provider(),
)

def on_start(
self, span: Span, parent_context: typing.Optional[Context] = None
Expand All @@ -106,11 +120,15 @@ def on_end(self, span: ReadableSpan) -> None:
if not (span.context and span.context.trace_flags.sampled):
return
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
error: Exception | None = None
try:
self.span_exporter.export((span,))
# pylint: disable=broad-exception-caught
except Exception:
except Exception as err:
error = err
logger.exception("Exception while exporting Span.")
finally:
self._metrics.finish_items(1, error)
detach(token)

def shutdown(self) -> None:
Expand Down Expand Up @@ -145,6 +163,8 @@ def __init__(
schedule_delay_millis: float | None = None,
max_export_batch_size: int | None = None,
export_timeout_millis: float | None = None,
*,
meter_provider: MeterProvider | None = None,
):
if max_queue_size is None:
max_queue_size = BatchSpanProcessor._default_max_queue_size()
Expand Down Expand Up @@ -176,6 +196,12 @@ def __init__(
export_timeout_millis,
max_queue_size,
"Span",
ProcessorMetrics(
"traces",
OtelComponentTypeValues.BATCHING_SPAN_PROCESSOR,
meter_provider or get_meter_provider(),
capacity=max_queue_size,
),
)

# Added for backward compatibility. Not recommended to directly access/use underlying exporter.
Expand Down
Loading
Loading