Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ def initialize(endpoint: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPOR
ssl_verify_mode: MetricsExporter.ssl_verify_mode,
headers: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_HEADERS', 'OTEL_EXPORTER_OTLP_HEADERS', default: {}),
compression: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_COMPRESSION', 'OTEL_EXPORTER_OTLP_COMPRESSION', default: 'gzip'),
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10))
timeout: OpenTelemetry::Common::Utilities.config_opt('OTEL_EXPORTER_OTLP_METRICS_TIMEOUT', 'OTEL_EXPORTER_OTLP_TIMEOUT', default: 10),
aggregation_cardinality_limit: nil)
raise ArgumentError, "invalid url for OTLP::MetricsExporter #{endpoint}" unless OpenTelemetry::Common::Utilities.valid_url?(endpoint)
raise ArgumentError, "unsupported compression key #{compression}" unless compression.nil? || %w[gzip none].include?(compression)

# create the MetricStore object
super()
super(aggregation_cardinality_limit: aggregation_cardinality_limit)

@uri = if endpoint == ENV['OTEL_EXPORTER_OTLP_ENDPOINT']
URI.join(endpoint, 'v1/metrics')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def collect(start_time, end_time, data_points)
data_points.values.map!(&:dup)
end

def update(increment, attributes, data_points, exemplar_offer: false)
def update(increment, attributes, data_points, cardinality_limit, exemplar_offer: false)
data_points[attributes] = NumberDataPoint.new(
{},
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module Aggregation
# Contains the implementation of the ExplicitBucketHistogram aggregation
# https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#explicit-bucket-histogram-aggregation
class ExplicitBucketHistogram
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze
attr_reader :exemplar_reservoir

DEFAULT_BOUNDARIES = [0, 5, 10, 25, 50, 75, 100, 250, 500, 1000].freeze
Expand Down Expand Up @@ -59,60 +60,76 @@ def collect(start_time, end_time, data_points)
end
end

def update(amount, attributes, data_points, exemplar_offer: false)
hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end
def update(amount, attributes, data_points, cardinality_limit, exemplar_offer: false)
hdp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

data_points[attributes] = HistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
nil, # :time_unix_nano
0, # :count
0, # :sum
empty_bucket_counts, # :bucket_counts
@boundaries, # :explicit_bounds
nil, # :exemplars
min, # :min
max # :max
)
end
update_histogram_data_point(hdp, amount, exemplar_offer: exemplar_offer)
nil
end

reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end
def aggregation_temporality
@aggregation_temporality.temporality
end

if exemplar_offer
reservoir.offer(value: amount,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
private

def create_new_data_point(attributes, data_points)
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

data_points[attributes] = HistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
nil, # :time_unix_nano
0, # :count
0, # :sum
empty_bucket_counts, # :bucket_counts
@boundaries, # :explicit_bounds
nil, # :exemplars
min, # :min
max # :max
)
end

def update_histogram_data_point(hdp, amount, exemplar_offer: false)
reservior_update(hdp.attributes, amount, exemplar_offer)

if @record_min_max
hdp.max = amount if amount > hdp.max
hdp.min = amount if amount < hdp.min
end

hdp.sum += amount
hdp.count += 1
if @boundaries
bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size
hdp.bucket_counts[bucket_index] += 1
end
nil
end
return unless @boundaries

def aggregation_temporality
@aggregation_temporality.temporality
bucket_index = @boundaries.bsearch_index { |i| i >= amount } || @boundaries.size
hdp.bucket_counts[bucket_index] += 1
end

private
def reservior_update(attributes, amount, exemplar_offer)
reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end

return unless exemplar_offer

reservoir.offer(value: amount,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
end

def empty_bucket_counts
@boundaries ? Array.new(@boundaries.size + 1, 0) : nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ module Metrics
module Aggregation
# Contains the implementation of the {https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram ExponentialBucketHistogram} aggregation
class ExponentialBucketHistogram # rubocop:disable Metrics/ClassLength
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze

# relate to min max scale: https://opentelemetry.io/docs/specs/otel/metrics/sdk/#support-a-minimum-and-maximum-scale
DEFAULT_SIZE = 160
DEFAULT_SCALE = 20
Expand Down Expand Up @@ -221,49 +223,53 @@ def collect(start_time, end_time, data_points)
# rubocop:enable Metrics/MethodLength

# this is aggregate in python; there is no merge in aggregate; but rescale happened
# rubocop:disable Metrics/MethodLength, Metrics/CyclomaticComplexity
def update(amount, attributes, data_points, exemplar_offer: false)
# fetch or initialize the ExponentialHistogramDataPoint
hdp = data_points.fetch(attributes) do
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end
def update(amount, attributes, data_points, cardinality_limit, exemplar_offer: false)
hdp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

update_histogram_data_point(hdp, attributes, amount, exemplar_offer: exemplar_offer)
nil
end

# this code block will only be executed if no data_points was found with the attributes
data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
0, # :count
0, # :sum
@scale, # :scale
@zero_count, # :zero_count
ExponentialHistogram::Buckets.new, # :positive
ExponentialHistogram::Buckets.new, # :negative
0, # :flags
nil, # :exemplars
min, # :min
max, # :max
@zero_threshold # :zero_threshold
)
end
def aggregation_temporality
@aggregation_temporality.temporality
end

reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end
private

if exemplar_offer
reservoir.offer(value: amount,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
def create_new_data_point(attributes, data_points)
if @record_min_max
min = Float::INFINITY
max = -Float::INFINITY
end

# Start to populate the data point (esp. the buckets)
data_points[attributes] = ExponentialHistogramDataPoint.new(
attributes,
nil, # :start_time_unix_nano
0, # :time_unix_nano
0, # :count
0, # :sum
@scale, # :scale
@zero_count, # :zero_count
ExponentialHistogram::Buckets.new, # :positive
ExponentialHistogram::Buckets.new, # :negative
0, # :flags
nil, # :exemplars
min, # :min
max, # :max
@zero_threshold # :zero_threshold
)
end

# rubocop:disable Metrics/CyclomaticComplexity,Metrics/MethodLength
def update_histogram_data_point(hdp, attributes, amount, exemplar_offer: false)
reservior_update(attributes, amount, exemplar_offer)

if @record_min_max
hdp.max = amount if amount > hdp.max
hdp.min = amount if amount < hdp.min
Expand Down Expand Up @@ -339,15 +345,8 @@ def update(amount, attributes, data_points, exemplar_offer: false)
bucket_index += buckets.counts.size if bucket_index.negative?

buckets.increment_bucket(bucket_index)
nil
end
# rubocop:enable Metrics/MethodLength, Metrics/CyclomaticComplexity

def aggregation_temporality
@aggregation_temporality.temporality
end

private
# rubocop:enable Metrics/CyclomaticComplexity,Metrics/MethodLength

def grow_buckets(span, buckets)
return if span < buckets.counts.size
Expand All @@ -356,6 +355,22 @@ def grow_buckets(span, buckets)
buckets.grow(span + 1, @size)
end

def reservior_update(attributes, amount, exemplar_offer)
reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end

return unless exemplar_offer

reservoir.offer(value: amount,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
end

def new_mapping(scale)
scale = validate_scale(scale)
scale <= 0 ? ExponentialHistogram::ExponentMapping.new(scale) : ExponentialHistogram::LogarithmMapping.new(scale)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ module Metrics
module Aggregation
# Contains the implementation of the LastValue aggregation
class LastValue
OVERFLOW_ATTRIBUTE_SET = { 'otel.metric.overflow' => true }.freeze
attr_reader :exemplar_reservoir

# if no reservoir pass from instrument, then use this empty reservoir to avoid no method found error
Expand All @@ -33,29 +34,51 @@ def collect(start_time, end_time, data_points)
ndps
end

def update(increment, attributes, data_points, exemplar_offer: false)
reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end
def update(increment, attributes, data_points, cardinality_limit, exemplar_offer: false)
# Check if we already have this attribute set
ndp = if data_points.key?(attributes)
data_points[attributes]
elsif data_points.size >= cardinality_limit
data_points[OVERFLOW_ATTRIBUTE_SET] || create_new_data_point(OVERFLOW_ATTRIBUTE_SET, data_points)
else
create_new_data_point(attributes, data_points)
end

if exemplar_offer
reservoir.offer(value: increment,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
end
update_number_data_point(ndp, increment, exemplar_offer: exemplar_offer)
nil
end

private

def create_new_data_point(attributes, data_points)
data_points[attributes] = NumberDataPoint.new(
attributes,
nil,
nil,
increment,
0,
nil
)
nil
end

def update_number_data_point(ndp, increment, exemplar_offer: false)
ndp.value = increment
reservior_update(ndp.attributes, increment, exemplar_offer)
end

def reservior_update(attributes, increment, exemplar_offer)
reservoir = @exemplar_reservoir_storage[attributes]
unless reservoir
reservoir = @exemplar_reservoir.dup
reservoir.reset
@exemplar_reservoir_storage[attributes] = reservoir
end

return unless exemplar_offer

reservoir.offer(value: increment,
timestamp: OpenTelemetry::Common::Utilities.time_in_nanoseconds,
attributes: attributes,
context: OpenTelemetry::Context.current)
end
end
end
Expand Down
Loading
Loading