Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.common.InternalTelemetryVersion;
import io.opentelemetry.sdk.common.internal.ComponentId;
import io.opentelemetry.sdk.common.internal.DaemonThreadFactory;
import io.opentelemetry.sdk.common.internal.ThrowableUtil;
import io.opentelemetry.sdk.trace.ReadWriteSpan;
import io.opentelemetry.sdk.trace.ReadableSpan;
Expand All @@ -21,12 +20,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -85,8 +83,8 @@ public static BatchSpanProcessorBuilder builder(SpanExporter spanExporter) {
exporterTimeoutNanos,
JcTools.newFixedSizeQueue(maxQueueSize),
maxQueueSize);
Thread workerThread = new DaemonThreadFactory(WORKER_THREAD_NAME).newThread(worker);
workerThread.start();

worker.start();
}

@Override
Expand Down Expand Up @@ -159,7 +157,7 @@ public String toString() {

// Worker is a thread that batches multiple spans and calls the registered SpanExporter to export
// the data.
private static final class Worker implements Runnable {
private static final class Worker extends Thread {

private final SpanProcessorInstrumentation spanProcessorInstrumentation;

Expand All @@ -178,8 +176,7 @@ private static final class Worker implements Runnable {
// Integer.MAX_VALUE is used to imply that exporter thread is not expecting any signal. Since
// exporter thread doesn't expect any signal initially, this value is initialized to
// Integer.MAX_VALUE.
private final AtomicInteger spansNeeded = new AtomicInteger(Integer.MAX_VALUE);
private final BlockingQueue<Boolean> signal;
private volatile int spansNeeded = Integer.MAX_VALUE;
private final AtomicReference<CompletableResultCode> flushRequested = new AtomicReference<>();
private volatile boolean continueWork = true;
private final ArrayList<SpanData> batch;
Expand All @@ -195,12 +192,14 @@ private Worker(
long exporterTimeoutNanos,
Queue<ReadableSpan> queue,
long maxQueueSize) {
super(WORKER_THREAD_NAME);
super.setDaemon(true);

this.spanExporter = spanExporter;
this.scheduleDelayNanos = scheduleDelayNanos;
this.maxExportBatchSize = maxExportBatchSize;
this.exporterTimeoutNanos = exporterTimeoutNanos;
this.queue = queue;
this.signal = new ArrayBlockingQueue<>(1);

spanProcessorInstrumentation =
SpanProcessorInstrumentation.get(telemetryVersion, COMPONENT_ID, meterProvider);
Expand All @@ -215,8 +214,8 @@ private void addSpan(ReadableSpan span) {
spanProcessorInstrumentation.dropSpans(1);
droppedSpanCount.incrementAndGet();
} else {
if (queueSize.incrementAndGet() >= spansNeeded.get()) {
signal.offer(true);
if (queueSize.incrementAndGet() >= spansNeeded) {
LockSupport.unpark(this);
}
}
}
Expand All @@ -236,16 +235,11 @@ public void run() {
updateNextExportTime();
}
if (queue.isEmpty()) {
try {
long pollWaitTime = nextExportTime - System.nanoTime();
if (pollWaitTime > 0) {
spansNeeded.set(maxExportBatchSize - batch.size());
signal.poll(pollWaitTime, TimeUnit.NANOSECONDS);
spansNeeded.set(Integer.MAX_VALUE);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
long pollWaitTime = nextExportTime - System.nanoTime();
if (pollWaitTime > 0) {
spansNeeded = maxExportBatchSize - batch.size();
LockSupport.parkNanos(pollWaitTime);
spansNeeded = Integer.MAX_VALUE;
}
}
}
Expand Down Expand Up @@ -303,7 +297,7 @@ private CompletableResultCode forceFlush() {
CompletableResultCode flushResult = new CompletableResultCode();
// we set the atomic here to trigger the worker loop to do a flush of the entire queue.
if (flushRequested.compareAndSet(null, flushResult)) {
signal.offer(true);
LockSupport.unpark(this);
}
CompletableResultCode possibleResult = flushRequested.get();
// there's a race here where the flush happening in the worker loop could complete before we
Expand Down
Loading