diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 594d3e6c4c0..402030e3702 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -13,6 +13,7 @@ import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; import static datadog.trace.util.AgentThreadFactory.newAgentThread; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import datadog.common.queue.Queues; @@ -112,6 +113,30 @@ public ConflatingMetricsAggregator( config.isTraceResourceRenamingEnabled()); } + /** + * OTLP span-metrics export variant. Reuses the same span selection + DDSketch aggregation as the + * native path, but emits via the injected {@link OtlpStatsMetricWriter} instead of msgpack to. No + * agent {@link Sink} is needed, so a {@link NoOpSink} satisfies the register()/backpressure + * contract, and the reporting interval comes from {@code trace.stats.interval} (milliseconds). + */ + public ConflatingMetricsAggregator( + Config config, + SharedCommunicationObjects sharedCommunicationObjects, + HealthMetrics healthMetrics, + OtlpStatsMetricWriter metricWriter) { + this( + config.getMetricsIgnoredResources(), + sharedCommunicationObjects.featuresDiscovery(config), + healthMetrics, + NoOpSink.INSTANCE, + metricWriter, + config.getTracerMetricsMaxAggregates(), + config.getTracerMetricsMaxPending(), + config.getTraceStatsInterval(), + MILLISECONDS, + config.isTraceResourceRenamingEnabled()); + } + ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java index 09464310113..9e4e703ddcb 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricsAggregatorFactory.java @@ -13,6 +13,14 @@ public static MetricsAggregator createMetricsAggregator( Config config, SharedCommunicationObjects sharedCommunicationObjects, HealthMetrics healthMetrics) { + // OTLP span-metrics export and native msgpack stats are mutually exclusive (XOR): both hang off + // the same ConflatingMetricsAggregator span selection + DDSketch aggregation, differing only in + // the injected MetricWriter. + if (config.isTracesSpanMetricsEnabled()) { + log.debug("OTLP trace span metrics enabled"); + return new ConflatingMetricsAggregator( + config, sharedCommunicationObjects, healthMetrics, new OtlpStatsMetricWriter(config)); + } if (config.isTracerMetricsEnabled()) { log.debug("tracer metrics enabled"); return new ConflatingMetricsAggregator(config, sharedCommunicationObjects, healthMetrics); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/NoOpSink.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/NoOpSink.java new file mode 100644 index 00000000000..aaa6a46e9eb --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/NoOpSink.java @@ -0,0 +1,23 @@ +package datadog.trace.common.metrics; + +import java.nio.ByteBuffer; + +/** + * A {@link Sink} that discards everything. Used by the OTLP trace-metrics export path, where {@link + * OtlpStatsMetricWriter} sends payloads directly via its own OTLP sender in {@code finishBucket()} + * rather than through the aggregator's {@code Sink}. {@link ConflatingMetricsAggregator} still + * requires a {@code Sink} for {@code register()}/backpressure wiring, so this satisfies that + * contract without performing any I/O. + */ +public final class NoOpSink implements Sink { + + public static final NoOpSink INSTANCE = new NoOpSink(); + + private NoOpSink() {} + + @Override + public void accept(int messageCount, ByteBuffer buffer) {} + + @Override + public void register(EventListener listener) {} +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java index 3f38d45881f..fc8a1ef00cc 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/WriterFactory.java @@ -187,7 +187,7 @@ public static Writer createWriter( commObjects.agentUrl, featuresDiscovery, commObjects.monitoring, - config.isTracerMetricsEnabled()); + config.isTracerMetricsEnabled() || config.isTracesSpanMetricsEnabled()); if (sampler instanceof RemoteResponseListener) { ddAgentApi.addResponseListener((RemoteResponseListener) sampler); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java index ffd2810a996..ef15009c75e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/common/OtlpResourceProto.java @@ -36,8 +36,14 @@ private OtlpResourceProto() {} /** Prefix applied to {@code datadog.runtime_id} and process-tag resource attributes. */ private static final String DATADOG_PREFIX = "datadog."; - /** Vendor-neutral resource (no {@code datadog.*}). Used by the OTLP trace/metric export. */ - public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get()); + /** + * Resource attribute added to OTLP trace metrics to ensure calculations are not re-computed in + * the Agent + */ + private static final String STATS_COMPUTED_KEY = "_dd.stats_computed"; + + /** Vendor-neutral resource (no {@code datadog.*}). Used by the OTLP metric export. */ + public static final byte[] RESOURCE_MESSAGE = buildResourceMessage(Config.get(), false, false); /** * Resource that additionally carries {@code datadog.runtime_id} and process tags (each prefixed @@ -45,13 +51,18 @@ private OtlpResourceProto() {} * mode. */ public static final byte[] RESOURCE_MESSAGE_WITH_DATADOG_ATTRS = - buildResourceMessage(Config.get(), true); + buildResourceMessage(Config.get(), true, false); - static byte[] buildResourceMessage(Config config) { - return buildResourceMessage(config, false); - } + /** + * Resource used by the OTLP trace export. Identical to {@link #RESOURCE_MESSAGE} but adds the + * {@code _dd.stats_computed} marker when the SDK is computing OTLP span metrics, so a downstream + * Agent does not recompute them from the exported spans. + */ + public static final byte[] TRACE_RESOURCE_MESSAGE = + buildResourceMessage(Config.get(), false, Config.get().isTracesSpanMetricsEnabled()); - static byte[] buildResourceMessage(Config config, boolean includeDatadogResourceAttributes) { + static byte[] buildResourceMessage( + Config config, boolean includeDatadogResourceAttributes, boolean includeStatsComputed) { GrowableBuffer buf = new GrowableBuffer(512); String serviceName = config.getServiceName(); @@ -89,6 +100,10 @@ static byte[] buildResourceMessage(Config config, boolean includeDatadogResource writeDatadogResourceAttributes(buf, config); } + if (includeStatsComputed) { + writeResourceAttribute(buf, STATS_COMPUTED_KEY, "true"); + } + OtlpProtoBuffer protobuf = new OtlpProtoBuffer(buf.capacity()); int numBytes = protobuf.recordMessage(buf, 1); byte[] resourceMessage = new byte[numBytes]; diff --git a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java index f37b7a5aea2..fa1efa87fba 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/otlp/trace/OtlpTraceProtoCollector.java @@ -1,6 +1,6 @@ package datadog.trace.core.otlp.trace; -import static datadog.trace.core.otlp.common.OtlpResourceProto.RESOURCE_MESSAGE; +import static datadog.trace.core.otlp.common.OtlpResourceProto.TRACE_RESOURCE_MESSAGE; import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordScopedSpansMessage; import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordSpanLinkMessage; import static datadog.trace.core.otlp.trace.OtlpTraceProto.recordSpanMessage; @@ -138,7 +138,7 @@ private OtlpPayload completePayload() { } // prepend the canned resource chunk - payloadBytes += protobuf.recordMessage(RESOURCE_MESSAGE); + payloadBytes += protobuf.recordMessage(TRACE_RESOURCE_MESSAGE); // finally prepend the total length of all collected chunks protobuf.recordMessage(buf, 1, payloadBytes); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy deleted file mode 100644 index 07f246bf9a9..00000000000 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.groovy +++ /dev/null @@ -1,33 +0,0 @@ -package datadog.trace.common.metrics - -import datadog.communication.ddagent.SharedCommunicationObjects -import datadog.trace.api.Config -import datadog.trace.core.monitor.HealthMetrics -import datadog.trace.test.util.DDSpecification -import okhttp3.HttpUrl - -class MetricsAggregatorFactoryTest extends DDSpecification { - - def "when metrics disabled no-op aggregator created"() { - setup: - Config config = Mock(Config) - config.isTracerMetricsEnabled() >> false - def sco = Mock(SharedCommunicationObjects) - sco.agentUrl = HttpUrl.parse("http://localhost:8126") - expect: - def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco, HealthMetrics.NO_OP,) - assert aggregator instanceof NoOpMetricsAggregator - } - - def "when metrics enabled conflating aggregator created"() { - setup: - Config config = Spy(Config.get()) - config.isTracerMetricsEnabled() >> true - def sco = Mock(SharedCommunicationObjects) - sco.agentUrl = HttpUrl.parse("http://localhost:8126") - expect: - def aggregator = MetricsAggregatorFactory.createMetricsAggregator(config, sco, HealthMetrics.NO_OP, - ) - assert aggregator instanceof ConflatingMetricsAggregator - } -} diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy index d20bd475cac..59cc60c3820 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/WriterFactoryTest.groovy @@ -207,6 +207,51 @@ class WriterFactoryTest extends DDSpecification { OtlpConfig.Protocol.GRPC | OtlpConfig.Compression.NONE | "http://otel-collector:4317" | OtlpGrpcSender | "http://otel-collector:4317/opentelemetry.proto.collector.trace.v1.TraceService/Export" | false } + def "DDAgentApi marks client-computed stats when either stats pipeline is enabled (spanMetrics=#spanMetrics, nativeStats=#nativeStats)"() { + setup: + // metricsEnabled gates the Datadog-Client-Computed-Stats header on the native trace transport. + // It must be true whenever the SDK computes stats by EITHER pipeline: native msgpack stats + // (isTracerMetricsEnabled) or OTLP span metrics (isTracesSpanMetricsEnabled). The OTLP-stats + // case arises when OTEL_TRACES_SPAN_METRICS_ENABLED is set while traces still export natively; + // without the header the Agent would recompute stats from spans already exported via OTLP. + def config = Mock(Config) + config.apiKey >> "my-api-key" + config.agentUrl >> "http://my-agent.url" + config.getEnumValue(PRIORITIZATION_TYPE, _, _) >> Prioritization.FAST_LANE + config.tracerMetricsEnabled >> nativeStats + config.tracesSpanMetricsEnabled >> spanMetrics + + def mockCall = Mock(Call) + def mockHttpClient = Mock(OkHttpClient) + // advertise only v0.4 (no EVP proxy) so "DDAgentWriter" resolves to a real DDAgentWriter + mockCall.execute() >> { buildHttpResponse(false, false, HttpUrl.parse(config.agentUrl + "/info")) } + mockHttpClient.newCall(_ as Request) >> mockCall + + def sharedComm = new SharedCommunicationObjects() + sharedComm.agentHttpClient = mockHttpClient + sharedComm.agentUrl = HttpUrl.parse(config.agentUrl) + sharedComm.createRemaining(config) + + def sampler = Mock(Sampler) + + when: + def writer = WriterFactory.createWriter(config, sharedComm, sampler, null, HealthMetrics.NO_OP, "DDAgentWriter") + def api = ((RemoteWriter) writer).apis.find { it instanceof DDAgentApi } + def metricsEnabledField = DDAgentApi.getDeclaredField("metricsEnabled") + metricsEnabledField.setAccessible(true) + + then: + writer instanceof DDAgentWriter + metricsEnabledField.getBoolean(api) == expectedComputesStats + + where: + spanMetrics | nativeStats | expectedComputesStats + true | false | true // gap case: OTLP span metrics on, native stats off + false | false | false // neither pipeline computes stats + false | true | true // native stats on (regression guard) + true | true | true // both on + } + Response buildHttpResponse(boolean hasEvpProxy, boolean evpProxySupportsCompression, HttpUrl agentUrl) { def endpoints = [] if (hasEvpProxy && evpProxySupportsCompression) { diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.java new file mode 100644 index 00000000000..023f6591bef --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/MetricsAggregatorFactoryTest.java @@ -0,0 +1,133 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_COMPUTATION_ENABLED; +import static datadog.trace.api.config.OtlpConfig.OTLP_METRICS_PROTOCOL; +import static datadog.trace.api.config.OtlpConfig.TRACES_SPAN_METRICS_ENABLED; +import static datadog.trace.api.config.OtlpConfig.TRACE_OTEL_EXPORTER; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.communication.ddagent.SharedCommunicationObjects; +import datadog.trace.api.Config; +import datadog.trace.core.monitor.HealthMetrics; +import java.lang.reflect.Field; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import okhttp3.HttpUrl; +import org.junit.jupiter.api.Test; + +/** + * Tests the native-vs-OTLP XOR writer selection in {@link MetricsAggregatorFactory}. The selected + * {@link MetricWriter} is not exposed publicly, so it is read via reflection through {@code + * ConflatingMetricsAggregator.aggregator -> Aggregator.writer}. + */ +class MetricsAggregatorFactoryTest { + + private static SharedCommunicationObjects sharedCommunicationObjects() { + SharedCommunicationObjects sco = mock(SharedCommunicationObjects.class); + sco.agentUrl = HttpUrl.parse("http://localhost:8126"); + when(sco.featuresDiscovery(any())).thenReturn(mock(DDAgentFeaturesDiscovery.class)); + return sco; + } + + private static Properties props(String... keyValues) { + Properties props = new Properties(); + for (int i = 0; i < keyValues.length; i += 2) { + props.setProperty(keyValues[i], keyValues[i + 1]); + } + return props; + } + + @Test + void whenAllMetricsDisabledNoOpAggregatorCreated() { + Config config = Config.get(props(TRACE_STATS_COMPUTATION_ENABLED, "false")); + + MetricsAggregator aggregator = + MetricsAggregatorFactory.createMetricsAggregator( + config, sharedCommunicationObjects(), HealthMetrics.NO_OP); + + assertInstanceOf(NoOpMetricsAggregator.class, aggregator); + } + + @Test + void whenNativeTracerMetricsEnabledSerializingWriterSelected() throws Exception { + // tracer metrics default to enabled; OTLP span metrics default off (no OTLP trace export). + Config config = Config.get(props()); + + MetricsAggregator aggregator = + MetricsAggregatorFactory.createMetricsAggregator( + config, sharedCommunicationObjects(), HealthMetrics.NO_OP); + + assertInstanceOf(ConflatingMetricsAggregator.class, aggregator); + assertInstanceOf(SerializingMetricWriter.class, writerOf(aggregator)); + } + + @Test + void whenOtlpSpanMetricsEnabledOtlpWriterSelectedWithConfiguredInterval() throws Exception { + // OTEL_TRACES_EXPORTER=otlp satisfies FR16 (traces not routed through an Agent). http/json has + // no protobuf-free encoder, so the writer's sender is null -- keeps construction lightweight + // (no + // real OTLP sender / network) while still exercising writer selection. + Config config = + Config.get( + props( + TRACES_SPAN_METRICS_ENABLED, "true", + TRACE_OTEL_EXPORTER, "otlp", + OTLP_METRICS_PROTOCOL, "http/json")); + + MetricsAggregator aggregator = + MetricsAggregatorFactory.createMetricsAggregator( + config, sharedCommunicationObjects(), HealthMetrics.NO_OP); + + assertInstanceOf(ConflatingMetricsAggregator.class, aggregator); + assertInstanceOf(OtlpStatsMetricWriter.class, writerOf(aggregator)); + // reporting interval comes from getTraceStatsInterval() (ms), default 10s. + assertEquals(config.getTraceStatsInterval(), reportingIntervalOf(aggregator)); + assertEquals(MILLISECONDS, reportingIntervalUnitOf(aggregator)); + } + + @Test + void explicitSpanMetricsEnabledSelectsOtlpWriterEvenWithoutOtlpTraceExport() throws Exception { + // An explicit OTEL_TRACES_SPAN_METRICS_ENABLED=true is honored verbatim regardless of trace + // exporter (matching the dd-trace-py / dd-trace-go reference impls). Double-counting if these + // OTLP spans reach an Agent is handled by the FR15 _dd.stats_computed marker, not by disabling + // here. http/json keeps the writer's sender null (no network at construction). + Config config = + Config.get(props(TRACES_SPAN_METRICS_ENABLED, "true", OTLP_METRICS_PROTOCOL, "http/json")); + + MetricsAggregator aggregator = + MetricsAggregatorFactory.createMetricsAggregator( + config, sharedCommunicationObjects(), HealthMetrics.NO_OP); + + assertInstanceOf(ConflatingMetricsAggregator.class, aggregator); + assertInstanceOf(OtlpStatsMetricWriter.class, writerOf(aggregator)); + } + + // ── reflection helpers ───────────────────────────────────────────────────── + + private static MetricWriter writerOf(MetricsAggregator aggregator) throws Exception { + Field aggregatorField = ConflatingMetricsAggregator.class.getDeclaredField("aggregator"); + aggregatorField.setAccessible(true); + Object inner = aggregatorField.get(aggregator); + Field writerField = Aggregator.class.getDeclaredField("writer"); + writerField.setAccessible(true); + return (MetricWriter) writerField.get(inner); + } + + private static long reportingIntervalOf(MetricsAggregator aggregator) throws Exception { + Field f = ConflatingMetricsAggregator.class.getDeclaredField("reportingInterval"); + f.setAccessible(true); + return f.getLong(aggregator); + } + + private static TimeUnit reportingIntervalUnitOf(MetricsAggregator aggregator) throws Exception { + Field f = ConflatingMetricsAggregator.class.getDeclaredField("reportingIntervalTimeUnit"); + f.setAccessible(true); + return (TimeUnit) f.get(aggregator); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java index da3071f450d..69a54477073 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/otlp/common/OtlpResourceProtoTest.java @@ -154,14 +154,14 @@ void testBuildResourceMessage( String caseName, Properties properties, Map expectedAttributes) throws IOException { Config config = Config.get(properties); - byte[] bytes = OtlpResourceProto.buildResourceMessage(config); + byte[] bytes = OtlpResourceProto.buildResourceMessage(config, false, false); Map actualAttributes = parseResourceAttributes(bytes); assertEquals(expectedAttributes, actualAttributes, "For case: " + caseName); } /** - * The datadog-attrs variant ({@code buildResourceMessage(config, true)}) carries {@code + * The datadog-attrs variant ({@code buildResourceMessage(config, true, false)}) carries {@code * datadog.runtime_id}; the plain variant omits it. (Process tags are emitted only when the * experimental process-tag propagation is enabled, so they aren't asserted here.) */ @@ -170,9 +170,9 @@ void datadogResourceAttributesVariantCarriesRuntimeId() throws IOException { Config config = Config.get(props(SERVICE_NAME, "my-service")); Map withDatadog = - parseResourceAttributes(OtlpResourceProto.buildResourceMessage(config, true)); + parseResourceAttributes(OtlpResourceProto.buildResourceMessage(config, true, false)); Map plain = - parseResourceAttributes(OtlpResourceProto.buildResourceMessage(config, false)); + parseResourceAttributes(OtlpResourceProto.buildResourceMessage(config, false, false)); assertTrue( withDatadog.containsKey("datadog.runtime_id"), @@ -184,6 +184,24 @@ void datadogResourceAttributesVariantCarriesRuntimeId() throws IOException { assertFalse(plain.containsKey("datadog.runtime_id"), "plain variant omits datadog.runtime_id"); } + /** + * FR15: the {@code includeStatsComputed} variant adds the {@code _dd.stats_computed=true} marker + * (used on the OTLP trace payload so a downstream Agent skips recompute); the default omits it. + */ + @Test + void statsComputedVariantCarriesMarker() throws IOException { + Config config = Config.get(props(SERVICE_NAME, "my-service")); + + Map withMarker = + parseResourceAttributes(OtlpResourceProto.buildResourceMessage(config, false, true)); + Map without = + parseResourceAttributes(OtlpResourceProto.buildResourceMessage(config, false, false)); + + assertEquals( + "true", withMarker.get("_dd.stats_computed"), "marker present when stats computed"); + assertFalse(without.containsKey("_dd.stats_computed"), "marker absent when stats not computed"); + } + // ── parsing helpers ─────────────────────────────────────────────────────── /**