Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import datadog.trace.api.cache.DDCaches;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.api.sampling.SamplingMechanism;
import datadog.trace.api.time.SystemTimeSource;
import datadog.trace.api.time.TimeSource;
import datadog.trace.common.writer.RemoteResponseListener;
import datadog.trace.core.CoreSpan;
import java.util.Collections;
Expand All @@ -24,8 +26,21 @@ public class RateByServiceTraceSampler implements Sampler, PrioritySampler, Remo
public static final String SAMPLING_AGENT_RATE = "_dd.agent_psr";

private static final double DEFAULT_RATE = 1.0;
private static final double MAX_RATE_INCREASE_FACTOR = 2.0;
static final long RAMP_UP_INTERVAL_NANOS = 1_000_000_000L;

private final TimeSource timeSource;

private volatile RateSamplersByEnvAndService serviceRates = new RateSamplersByEnvAndService();
private long lastCappedNanos;

public RateByServiceTraceSampler() {
this(SystemTimeSource.INSTANCE);
}

RateByServiceTraceSampler(TimeSource timeSource) {
this.timeSource = timeSource;
}

@Override
public <T extends CoreSpan<T>> boolean sample(final T span) {
Expand Down Expand Up @@ -62,6 +77,14 @@ private <T extends CoreSpan<T>> String getSpanEnv(final T span) {
return span.getTag("env", "");
}

static boolean shouldCap(double oldRate, double newRate) {
return oldRate != 0 && newRate > oldRate * MAX_RATE_INCREASE_FACTOR;
}

static double cappedRate(double oldRate) {
return oldRate * MAX_RATE_INCREASE_FACTOR;
}

@Override
public void onResponse(
final String endpoint, final Map<String, Map<String, Number>> responseJson) {
Expand All @@ -72,6 +95,13 @@ public void onResponse(
}

log.debug("Update service sampler rates: {} -> {}", endpoint, responseJson);

final RateSamplersByEnvAndService currentSnapshot = serviceRates;
final long now = timeSource.getNanoTicks();
final boolean canIncrease =
lastCappedNanos == 0 || (now - lastCappedNanos) >= RAMP_UP_INTERVAL_NANOS;
boolean anyCapped = false;

final TreeMap<String, TreeMap<String, RateSampler>> updatedEnvServiceRates =
new TreeMap<>(String::compareToIgnoreCase);

Expand All @@ -84,17 +114,42 @@ public void onResponse(

EnvAndService envAndService = EnvAndService.fromString(entry.getKey());
if (envAndService.isFallback()) {
double oldRate = currentSnapshot.getFallbackSampler().getSampleRate();
if (shouldCap(oldRate, rate)) {
if (canIncrease) {
rate = cappedRate(oldRate);
anyCapped = true;
} else {
rate = oldRate;
}
}
fallbackSampler = RateByServiceTraceSampler.createRateSampler(rate);
} else {
double oldRate =
currentSnapshot
.getSampler(envAndService.lowerEnv, envAndService.lowerService)
.getSampleRate();
if (shouldCap(oldRate, rate)) {
if (canIncrease) {
rate = cappedRate(oldRate);
anyCapped = true;
} else {
rate = oldRate;
}
}
final double effectiveRate = rate;
Map<String, RateSampler> serviceRates =
updatedEnvServiceRates.computeIfAbsent(
envAndService.lowerEnv, env -> new TreeMap<>(String::compareToIgnoreCase));

serviceRates.computeIfAbsent(
envAndService.lowerService,
service -> RateByServiceTraceSampler.createRateSampler(rate));
service -> RateByServiceTraceSampler.createRateSampler(effectiveRate));
}
}
if (canIncrease && anyCapped) {
lastCappedNanos = now;
}
serviceRates = new RateSamplersByEnvAndService(updatedEnvServiceRates, fallbackSampler);
}

Expand Down Expand Up @@ -128,6 +183,10 @@ private static final class RateSamplersByEnvAndService {
this.fallbackSampler = fallbackSampler;
}

RateSampler getFallbackSampler() {
return fallbackSampler;
}

// used in tests only
RateSampler getSampler(EnvAndService envAndService) {
return getSampler(envAndService.lowerEnv, envAndService.lowerService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datadog.trace.common.sampling

import datadog.trace.api.DDTags
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.api.time.ControllableTimeSource
import datadog.trace.common.writer.ListWriter
import datadog.trace.common.writer.LoggingWriter
import datadog.trace.common.writer.ddagent.DDAgentApi
Expand Down Expand Up @@ -222,6 +223,171 @@ class RateByServiceTraceSamplerTest extends DDCoreSpecification {
'manual.keep' | true | PrioritySampling.USER_KEEP
}

def "shouldCap returns false when rate decreases or stays same"() {
expect:
!RateByServiceTraceSampler.shouldCap(0.8, 0.4)
!RateByServiceTraceSampler.shouldCap(0.5, 0.5)
!RateByServiceTraceSampler.shouldCap(0.5, 1.0) // 1.0 <= 0.5 * 2, no cap needed
}

def "shouldCap returns false when old rate is zero"() {
expect:
!RateByServiceTraceSampler.shouldCap(0.0, 0.5)
!RateByServiceTraceSampler.shouldCap(0.0, 1.0)
}

def "shouldCap returns true when new rate exceeds 2x old rate"() {
expect:
RateByServiceTraceSampler.shouldCap(0.1, 1.0)
RateByServiceTraceSampler.shouldCap(0.2, 0.8)
RateByServiceTraceSampler.shouldCap(0.1, 0.3)
}

def "cappedRate returns 2x old rate"() {
expect:
RateByServiceTraceSampler.cappedRate(0.1) == 0.2
RateByServiceTraceSampler.cappedRate(0.2) == 0.4
RateByServiceTraceSampler.cappedRate(0.4) == 0.8
}

def "ramp-up caps rate increases at 2x per interval"() {
setup:
def time = new ControllableTimeSource()
time.set(1_000_000_000L)
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler(time)
def tolerance = 0.01

// Set initial rate to 0.1
String response = '{"rate_by_service": {"service:foo,env:bar":0.1, "service:,env:":0.1}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

expect:
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.1) < tolerance

when: "agent restart sends rate 1.0, first interval"
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
response = '{"rate_by_service": {"service:foo,env:bar":1.0, "service:,env:":1.0}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate is capped at 2x = 0.2"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance

when: "second interval"
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate doubles to 0.4"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.4) < tolerance

when: "third interval"
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate doubles to 0.8"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.8) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.8) < tolerance

when: "fourth interval"
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate reaches target 1.0 (2x=1.6 > 1.0)"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 1.0) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 1.0) < tolerance
}

def "ramp-down applies immediately"() {
setup:
def time = new ControllableTimeSource()
time.set(1_000_000_000L)
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler(time)
def tolerance = 0.01

// Set initial rate to 0.8
String response = '{"rate_by_service": {"service:foo,env:bar":0.8, "service:,env:":0.8}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

when: "rate decreases to 0.2"
response = '{"rate_by_service": {"service:foo,env:bar":0.2, "service:,env:":0.2}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "decrease is applied immediately"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance
Math.abs(serviceSampler.serviceRates.getFallbackSampler().sampleRate - 0.2) < tolerance
}

def "rate increase blocked during cooldown"() {
setup:
def time = new ControllableTimeSource()
time.set(1_000_000_000L)
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler(time)
def tolerance = 0.01

// Set initial rate to 0.1
String response = '{"rate_by_service": {"service:foo,env:bar":0.1}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

when: "rate jumps, first capped increase"
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
response = '{"rate_by_service": {"service:foo,env:bar":1.0}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "capped to 0.2"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance

when: "try again immediately (within cooldown)"
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate stays at 0.2 because cooldown hasn't elapsed"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.2) < tolerance

when: "after cooldown elapsed"
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate doubles to 0.4"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.4) < tolerance
}

def "cooldown not reset by blocked increase"() {
setup:
def time = new ControllableTimeSource()
time.set(1_000_000_000L)
RateByServiceTraceSampler serviceSampler = new RateByServiceTraceSampler(time)
def tolerance = 0.01

// Set initial low rate
String response = '{"rate_by_service": {"service:foo,env:bar":0.01}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

expect:
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.01) < tolerance

when: "wait for cooldown, apply increase: 0.01 -> 0.02"
time.advance(RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS)
response = '{"rate_by_service": {"service:foo,env:bar":1.0}}'
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate is capped at 2x = 0.02"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance

when: "before cooldown elapses, send another increase - rate should be held and lastCapped NOT reset"
time.advance((long) (RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2))
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate stays at 0.02 (cooldown)"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.02) < tolerance

when: "wait remaining half of cooldown from the original cap - should allow next ramp-up"
time.advance((long) (RateByServiceTraceSampler.RAMP_UP_INTERVAL_NANOS / 2))
serviceSampler.onResponse("traces", serializer.fromJson(response))

then: "rate doubles to 0.04 because lastCapped was NOT reset by the blocked increase"
Math.abs(serviceSampler.serviceRates.getSampler("bar", "foo").sampleRate - 0.04) < tolerance
}

def "not setting forced tracing via tag or setting it wrong value not causing exception"() {
setup:
def sampler = new RateByServiceTraceSampler()
Expand Down