diff --git a/CHANGELOG.md b/CHANGELOG.md index be35e99df6d..dace9390975 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features +- Add Kafka queue tracing for Spring Boot 4 ([#5348](https://github.com/getsentry/sentry-java/pull/5348)) - Add `sentry-kafka` module for Kafka queue instrumentation without Spring ([#5288](https://github.com/getsentry/sentry-java/pull/5288)) - Add Kafka queue tracing for Spring Boot 3 ([#5254](https://github.com/getsentry/sentry-java/pull/5254)), ([#5255](https://github.com/getsentry/sentry-java/pull/5255)), ([#5256](https://github.com/getsentry/sentry-java/pull/5256)) - Add `enableQueueTracing` option and messaging span data conventions ([#5250](https://github.com/getsentry/sentry-java/pull/5250)) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2238800c53f..bdbd5a0c9f0 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -184,6 +184,7 @@ springboot3-starter-jdbc = { module = "org.springframework.boot:spring-boot-star springboot3-starter-actuator = { module = "org.springframework.boot:spring-boot-starter-actuator", version.ref = "springboot3" } springboot3-starter-cache = { module = "org.springframework.boot:spring-boot-starter-cache", version.ref = "springboot3" } spring-kafka3 = { module = "org.springframework.kafka:spring-kafka", version = "3.3.5" } +spring-kafka4 = { module = "org.springframework.kafka:spring-kafka" } kafka-clients = { module = "org.apache.kafka:kafka-clients", version = "3.8.1" } springboot4-otel = { module = "io.opentelemetry.instrumentation:opentelemetry-spring-boot-starter", version.ref = "otelInstrumentation" } springboot4-resttestclient = { module = "org.springframework.boot:spring-boot-resttestclient", version.ref = "springboot4" } diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/build.gradle.kts index 71ff985d67c..3e911740311 100644 --- a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/build.gradle.kts @@ -58,6 +58,10 @@ dependencies { implementation(projects.sentryOpentelemetry.sentryOpentelemetryAgentlessSpring) implementation(projects.sentryAsyncProfiler) + // kafka + implementation(libs.spring.kafka4) + implementation(projects.sentryKafka) + // database query tracing implementation(projects.sentryJdbc) runtimeOnly(libs.hsqldb) diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaConsumer.java b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaConsumer.java new file mode 100644 index 00000000000..0c3bea3b757 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaConsumer.java @@ -0,0 +1,19 @@ +package io.sentry.samples.spring.boot4.queues.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Profile("kafka") +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(topics = "sentry-topic", groupId = "sentry-sample-group") + public void listen(String message) { + logger.info("Received message: {}", message); + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaController.java b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaController.java new file mode 100644 index 00000000000..8c7b166fd33 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaController.java @@ -0,0 +1,26 @@ +package io.sentry.samples.spring.boot4.queues.kafka; + +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Profile("kafka") +@RequestMapping("/kafka") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @GetMapping("/produce") + String produce(@RequestParam(defaultValue = "hello from sentry!") String message) { + kafkaTemplate.send("sentry-topic", message); + return "Message sent: " + message; + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/main/resources/application-kafka.properties new file mode 100644 index 00000000000..e0abadf5f9c --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/main/resources/application-kafka.properties @@ -0,0 +1,12 @@ +# Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=sentry-sample-group +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer + +logging.level.org.apache.kafka=warn diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt new file mode 100644 index 00000000000..0f85e81a0a6 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt @@ -0,0 +1,41 @@ +package io.sentry.systemtest + +import io.sentry.systemtest.util.TestHelper +import kotlin.test.Test +import kotlin.test.assertEquals +import org.junit.Before + +class KafkaOtelCoexistenceSystemTest { + lateinit var testHelper: TestHelper + + @Before + fun setup() { + testHelper = TestHelper("http://localhost:8080") + testHelper.reset() + } + + @Test + fun `Sentry Kafka integration is suppressed when OTel is active`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("otel-coexistence-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.transaction == "GET /kafka/produce" && + transaction.sdk?.integrationSet?.contains("SpringKafka") != true && + transaction.spans.any { span -> + span.op == "queue.publish" && + span.origin == "auto.opentelemetry" && + span.data?.get("messaging.system") == "kafka" + } + } + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.contexts.trace?.operation == "queue.process" && + transaction.contexts.trace?.origin == "auto.opentelemetry" && + transaction.contexts.trace?.data?.get("messaging.system") == "kafka" && + transaction.sdk?.integrationSet?.contains("SpringKafka") != true + } + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/build.gradle.kts index c3e8ba06fae..8443cbd4aa5 100644 --- a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/build.gradle.kts @@ -59,6 +59,10 @@ dependencies { implementation(projects.sentryAsyncProfiler) implementation(libs.otel) + // kafka + implementation(libs.spring.kafka4) + implementation(projects.sentryKafka) + // cache tracing implementation(libs.springboot4.starter.cache) implementation(libs.caffeine) diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaConsumer.java b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaConsumer.java new file mode 100644 index 00000000000..0c3bea3b757 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaConsumer.java @@ -0,0 +1,19 @@ +package io.sentry.samples.spring.boot4.queues.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Profile("kafka") +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(topics = "sentry-topic", groupId = "sentry-sample-group") + public void listen(String message) { + logger.info("Received message: {}", message); + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaController.java b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaController.java new file mode 100644 index 00000000000..8c7b166fd33 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaController.java @@ -0,0 +1,26 @@ +package io.sentry.samples.spring.boot4.queues.kafka; + +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Profile("kafka") +@RequestMapping("/kafka") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @GetMapping("/produce") + String produce(@RequestParam(defaultValue = "hello from sentry!") String message) { + kafkaTemplate.send("sentry-topic", message); + return "Message sent: " + message; + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/main/resources/application-kafka.properties new file mode 100644 index 00000000000..e0abadf5f9c --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/main/resources/application-kafka.properties @@ -0,0 +1,12 @@ +# Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=sentry-sample-group +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer + +logging.level.org.apache.kafka=warn diff --git a/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt new file mode 100644 index 00000000000..0f85e81a0a6 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt @@ -0,0 +1,41 @@ +package io.sentry.systemtest + +import io.sentry.systemtest.util.TestHelper +import kotlin.test.Test +import kotlin.test.assertEquals +import org.junit.Before + +class KafkaOtelCoexistenceSystemTest { + lateinit var testHelper: TestHelper + + @Before + fun setup() { + testHelper = TestHelper("http://localhost:8080") + testHelper.reset() + } + + @Test + fun `Sentry Kafka integration is suppressed when OTel is active`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("otel-coexistence-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.transaction == "GET /kafka/produce" && + transaction.sdk?.integrationSet?.contains("SpringKafka") != true && + transaction.spans.any { span -> + span.op == "queue.publish" && + span.origin == "auto.opentelemetry" && + span.data?.get("messaging.system") == "kafka" + } + } + + testHelper.ensureTransactionReceived { transaction, _ -> + transaction.contexts.trace?.operation == "queue.process" && + transaction.contexts.trace?.origin == "auto.opentelemetry" && + transaction.contexts.trace?.data?.get("messaging.system") == "kafka" && + transaction.sdk?.integrationSet?.contains("SpringKafka") != true + } + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-4/build.gradle.kts b/sentry-samples/sentry-samples-spring-boot-4/build.gradle.kts index f43cc47cc6d..9a8ddc65a86 100644 --- a/sentry-samples/sentry-samples-spring-boot-4/build.gradle.kts +++ b/sentry-samples/sentry-samples-spring-boot-4/build.gradle.kts @@ -61,6 +61,10 @@ dependencies { implementation(libs.springboot4.starter.cache) implementation(libs.caffeine) + // kafka + implementation(libs.spring.kafka4) + implementation(projects.sentryKafka) + // database query tracing implementation(projects.sentryJdbc) runtimeOnly(libs.hsqldb) diff --git a/sentry-samples/sentry-samples-spring-boot-4/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaConsumer.java b/sentry-samples/sentry-samples-spring-boot-4/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaConsumer.java new file mode 100644 index 00000000000..0c3bea3b757 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaConsumer.java @@ -0,0 +1,19 @@ +package io.sentry.samples.spring.boot4.queues.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +@Profile("kafka") +public class KafkaConsumer { + + private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(topics = "sentry-topic", groupId = "sentry-sample-group") + public void listen(String message) { + logger.info("Received message: {}", message); + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-4/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaController.java b/sentry-samples/sentry-samples-spring-boot-4/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaController.java new file mode 100644 index 00000000000..8c7b166fd33 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4/src/main/java/io/sentry/samples/spring/boot4/queues/kafka/KafkaController.java @@ -0,0 +1,26 @@ +package io.sentry.samples.spring.boot4.queues.kafka; + +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@Profile("kafka") +@RequestMapping("/kafka") +public class KafkaController { + + private final KafkaTemplate kafkaTemplate; + + public KafkaController(KafkaTemplate kafkaTemplate) { + this.kafkaTemplate = kafkaTemplate; + } + + @GetMapping("/produce") + String produce(@RequestParam(defaultValue = "hello from sentry!") String message) { + kafkaTemplate.send("sentry-topic", message); + return "Message sent: " + message; + } +} diff --git a/sentry-samples/sentry-samples-spring-boot-4/src/main/resources/application-kafka.properties b/sentry-samples/sentry-samples-spring-boot-4/src/main/resources/application-kafka.properties new file mode 100644 index 00000000000..eaaa62af13b --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4/src/main/resources/application-kafka.properties @@ -0,0 +1,10 @@ +# Kafka — activate with: --spring.profiles.active=kafka +sentry.enable-queue-tracing=true + +spring.kafka.bootstrap-servers=localhost:9092 +spring.kafka.consumer.group-id=sentry-sample-group +spring.kafka.consumer.auto-offset-reset=earliest +spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer +spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer +spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer diff --git a/sentry-samples/sentry-samples-spring-boot-4/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-4/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt new file mode 100644 index 00000000000..43781cf2c56 --- /dev/null +++ b/sentry-samples/sentry-samples-spring-boot-4/src/test/kotlin/io/sentry/systemtest/KafkaQueueSystemTest.kt @@ -0,0 +1,117 @@ +package io.sentry.systemtest + +import io.sentry.systemtest.util.TestHelper +import kotlin.test.Test +import kotlin.test.assertEquals +import org.junit.Before + +/** + * System tests for Kafka queue instrumentation. + * + * Requires: + * - The sample app running with `--spring.profiles.active=kafka` + * - A Kafka broker at localhost:9092 + * - The mock Sentry server at localhost:8000 + */ +class KafkaQueueSystemTest { + lateinit var testHelper: TestHelper + + @Before + fun setup() { + testHelper = TestHelper("http://localhost:8080") + testHelper.reset() + } + + @Test + fun `producer endpoint creates queue publish span`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("test-message") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish") + } + } + + @Test + fun `consumer creates queue process transaction`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("test-consumer-message") + assertEquals(200, restClient.lastKnownStatusCode) + + // The consumer runs asynchronously, so wait for the queue.process transaction + testHelper.ensureTransactionReceived { transaction, _ -> + testHelper.doesTransactionHaveOp(transaction, "queue.process") + } + } + + @Test + fun `producer and consumer share same trace`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("trace-test-message") + assertEquals(200, restClient.lastKnownStatusCode) + + // Capture the trace ID from the producer transaction (has queue.publish span) + var producerTraceId: String? = null + testHelper.ensureTransactionReceived { transaction, _ -> + if (testHelper.doesTransactionContainSpanWithOp(transaction, "queue.publish")) { + producerTraceId = transaction.contexts.trace?.traceId?.toString() + true + } else { + false + } + } + + // Verify the consumer transaction has the same trace ID + // Use retryCount=3 since the consumer may take a moment to process + testHelper.ensureEnvelopeReceived(retryCount = 3) { envelopeString -> + val envelope = + testHelper.jsonSerializer.deserializeEnvelope(envelopeString.byteInputStream()) + ?: return@ensureEnvelopeReceived false + val txItem = + envelope.items.firstOrNull { it.header.type == io.sentry.SentryItemType.Transaction } + ?: return@ensureEnvelopeReceived false + val tx = + txItem.getTransaction(testHelper.jsonSerializer) ?: return@ensureEnvelopeReceived false + + tx.contexts.trace?.operation == "queue.process" && + tx.contexts.trace?.traceId?.toString() == producerTraceId + } + } + + @Test + fun `queue publish span has messaging attributes`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("attrs-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + val span = transaction.spans.firstOrNull { it.op == "queue.publish" } + if (span == null) return@ensureTransactionReceived false + + val data = span.data ?: return@ensureTransactionReceived false + data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic" + } + } + + @Test + fun `queue process transaction has messaging attributes`() { + val restClient = testHelper.restClient + + restClient.produceKafkaMessage("process-attrs-test") + assertEquals(200, restClient.lastKnownStatusCode) + + testHelper.ensureTransactionReceived { transaction, _ -> + if (!testHelper.doesTransactionHaveOp(transaction, "queue.process")) { + return@ensureTransactionReceived false + } + + val data = transaction.contexts.trace?.data ?: return@ensureTransactionReceived false + data["messaging.system"] == "kafka" && data["messaging.destination.name"] == "sentry-topic" + } + } +} diff --git a/sentry-spring-7/api/sentry-spring-7.api b/sentry-spring-7/api/sentry-spring-7.api index 71a8a022bf6..c9250b550fd 100644 --- a/sentry-spring-7/api/sentry-spring-7.api +++ b/sentry-spring-7/api/sentry-spring-7.api @@ -244,6 +244,29 @@ public final class io/sentry/spring7/graphql/SentrySpringSubscriptionHandler : i public fun onSubscriptionResult (Ljava/lang/Object;Lio/sentry/IScopes;Lio/sentry/graphql/ExceptionReporter;Lgraphql/execution/instrumentation/parameters/InstrumentationFieldFetchParameters;)Ljava/lang/Object; } +public final class io/sentry/spring7/kafka/SentryKafkaConsumerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { + public fun ()V + public fun getOrder ()I + public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; +} + +public final class io/sentry/spring7/kafka/SentryKafkaProducerBeanPostProcessor : org/springframework/beans/factory/config/BeanPostProcessor, org/springframework/core/PriorityOrdered { + public fun ()V + public fun getOrder ()I + public fun postProcessAfterInitialization (Ljava/lang/Object;Ljava/lang/String;)Ljava/lang/Object; +} + +public final class io/sentry/spring7/kafka/SentryKafkaRecordInterceptor : org/springframework/kafka/listener/RecordInterceptor { + public fun (Lio/sentry/IScopes;)V + public fun (Lio/sentry/IScopes;Lorg/springframework/kafka/listener/RecordInterceptor;)V + public fun afterRecord (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord; + public fun setupThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V + public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V +} + public class io/sentry/spring7/opentelemetry/SentryOpenTelemetryAgentWithoutAutoInitConfiguration { public fun ()V public fun sentryOpenTelemetryOptionsConfiguration ()Lio/sentry/Sentry$OptionsConfiguration; diff --git a/sentry-spring-7/build.gradle.kts b/sentry-spring-7/build.gradle.kts index 8102909afb0..ae8269e7825 100644 --- a/sentry-spring-7/build.gradle.kts +++ b/sentry-spring-7/build.gradle.kts @@ -43,10 +43,12 @@ dependencies { compileOnly(libs.slf4j.api) compileOnly(libs.springboot4.starter.graphql) compileOnly(libs.springboot4.starter.quartz) + compileOnly(libs.spring.kafka4) compileOnly(Config.Libs.springWebflux) compileOnly(projects.sentryGraphql) compileOnly(projects.sentryGraphql22) + compileOnly(projects.sentryKafka) compileOnly(projects.sentryQuartz) compileOnly(projects.sentryOpentelemetry.sentryOpentelemetryAgentcustomization) compileOnly(projects.sentryOpentelemetry.sentryOpentelemetryBootstrap) @@ -60,6 +62,7 @@ dependencies { // tests testImplementation(projects.sentryTestSupport) testImplementation(projects.sentryGraphql) + testImplementation(projects.sentryKafka) testImplementation(kotlin(Config.kotlinStdLib)) testImplementation(libs.awaitility.kotlin.spring7) testImplementation(libs.context.propagation) @@ -69,6 +72,7 @@ dependencies { testImplementation(libs.mockito.inline) testImplementation(libs.springboot4.starter.aspectj) testImplementation(libs.springboot4.starter.graphql) + testImplementation(libs.spring.kafka4) testImplementation(libs.springboot4.starter.security) testImplementation(libs.springboot4.starter.test) testImplementation(libs.springboot4.starter.web) diff --git a/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaConsumerBeanPostProcessor.java b/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaConsumerBeanPostProcessor.java new file mode 100644 index 00000000000..069330a4247 --- /dev/null +++ b/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaConsumerBeanPostProcessor.java @@ -0,0 +1,98 @@ +package io.sentry.spring7.kafka; + +import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; +import java.lang.reflect.Field; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.Ordered; +import org.springframework.core.PriorityOrdered; +import org.springframework.kafka.config.AbstractKafkaListenerContainerFactory; +import org.springframework.kafka.listener.RecordInterceptor; + +/** + * Registers {@link SentryKafkaRecordInterceptor} on {@link AbstractKafkaListenerContainerFactory} + * beans. If an existing {@link RecordInterceptor} is already set, it is composed as a delegate. + */ +@ApiStatus.Internal +public final class SentryKafkaConsumerBeanPostProcessor + implements BeanPostProcessor, PriorityOrdered { + + private static final @NotNull String RECORD_INTERCEPTOR_FIELD_NAME = "recordInterceptor"; + + private final @NotNull String recordInterceptorFieldName; + + public SentryKafkaConsumerBeanPostProcessor() { + this(RECORD_INTERCEPTOR_FIELD_NAME); + } + + SentryKafkaConsumerBeanPostProcessor(final @NotNull String recordInterceptorFieldName) { + this.recordInterceptorFieldName = recordInterceptorFieldName; + } + + private static final class InterceptorReadFailedException extends Exception { + private static final long serialVersionUID = 1L; + + InterceptorReadFailedException(final @NotNull Throwable cause) { + super(cause); + } + } + + @Override + @SuppressWarnings("unchecked") + public @NotNull Object postProcessAfterInitialization( + final @NotNull Object bean, final @NotNull String beanName) throws BeansException { + if (bean instanceof AbstractKafkaListenerContainerFactory) { + final @NotNull AbstractKafkaListenerContainerFactory factory = + (AbstractKafkaListenerContainerFactory) bean; + + final @Nullable RecordInterceptor existing; + try { + existing = getExistingInterceptor(factory); + } catch (InterceptorReadFailedException e) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.ERROR, + e, + "Sentry Kafka consumer tracing disabled for factory '%s' \u2014 could not read " + + "existing recordInterceptor via reflection. Refusing to install Sentry's " + + "interceptor to avoid overwriting a customer-configured RecordInterceptor.", + beanName); + return bean; + } + + if (existing instanceof SentryKafkaRecordInterceptor) { + return bean; + } + + @SuppressWarnings("rawtypes") + final RecordInterceptor sentryInterceptor = + new SentryKafkaRecordInterceptor<>(ScopesAdapter.getInstance(), existing); + factory.setRecordInterceptor(sentryInterceptor); + } + return bean; + } + + private @Nullable RecordInterceptor getExistingInterceptor( + final @NotNull AbstractKafkaListenerContainerFactory factory) + throws InterceptorReadFailedException { + try { + final @NotNull Field field = + AbstractKafkaListenerContainerFactory.class.getDeclaredField(recordInterceptorFieldName); + field.setAccessible(true); + return (RecordInterceptor) field.get(factory); + } catch (NoSuchFieldException | IllegalAccessException | RuntimeException e) { + throw new InterceptorReadFailedException(e); + } + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } +} diff --git a/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaProducerBeanPostProcessor.java b/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaProducerBeanPostProcessor.java new file mode 100644 index 00000000000..eff0b4154bb --- /dev/null +++ b/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaProducerBeanPostProcessor.java @@ -0,0 +1,76 @@ +package io.sentry.spring7.kafka; + +import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; +import io.sentry.kafka.SentryKafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.Ordered; +import org.springframework.core.PriorityOrdered; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.ProducerPostProcessor; + +/** + * Installs a {@link ProducerPostProcessor} on every {@link ProducerFactory} bean so that each + * {@link Producer} created by Spring Kafka is wrapped via {@link SentryKafkaProducer#wrap + * SentryKafkaProducer.wrap(Producer)}. + * + *

The wrapper records a {@code queue.publish} span around each {@code send(...)} that finishes + * when the broker ack callback fires, giving a real producer-send lifecycle span. {@code + * KafkaTemplate} beans are left untouched, so all customer-configured listeners, interceptors and + * observation settings are preserved. + * + *

Note: {@link ProducerFactory#addPostProcessor(ProducerPostProcessor)} is a default method on + * the interface that is a no-op unless overridden. Custom factories that do not extend {@code + * DefaultKafkaProducerFactory} will not receive Sentry producer instrumentation; a warning is + * logged at startup in that case. + */ +@ApiStatus.Internal +public final class SentryKafkaProducerBeanPostProcessor + implements BeanPostProcessor, PriorityOrdered { + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public @NotNull Object postProcessAfterInitialization( + final @NotNull Object bean, final @NotNull String beanName) throws BeansException { + if (bean instanceof ProducerFactory) { + final @NotNull ProducerFactory factory = (ProducerFactory) bean; + final @NotNull SentryProducerPostProcessor pp = new SentryProducerPostProcessor<>(); + factory.addPostProcessor(pp); + if (!factory.getPostProcessors().contains(pp)) { + ScopesAdapter.getInstance() + .getOptions() + .getLogger() + .log( + SentryLevel.WARNING, + "Sentry Kafka producer tracing not active for ProducerFactory '%s' (%s). " + + "addPostProcessor() was not honored — the factory may not extend " + + "DefaultKafkaProducerFactory. Wrap producers manually with " + + "SentryKafkaProducer.wrap(producer).", + beanName, + factory.getClass().getName()); + } + } + return bean; + } + + @Override + public int getOrder() { + return Ordered.LOWEST_PRECEDENCE; + } + + /** + * Marker {@link ProducerPostProcessor} that wraps the freshly created Kafka {@link Producer} via + * {@link SentryKafkaProducer#wrap}. + */ + static final class SentryProducerPostProcessor implements ProducerPostProcessor { + @Override + public @NotNull Producer apply(final @NotNull Producer producer) { + return SentryKafkaProducer.wrap( + producer, ScopesAdapter.getInstance(), "auto.queue.spring7.kafka.producer"); + } + } +} diff --git a/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaRecordInterceptor.java new file mode 100644 index 00000000000..a49e8473c4f --- /dev/null +++ b/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaRecordInterceptor.java @@ -0,0 +1,292 @@ +package io.sentry.spring7.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.DateUtils; +import io.sentry.IScopes; +import io.sentry.ISentryLifecycleToken; +import io.sentry.ITransaction; +import io.sentry.SentryLevel; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanStatus; +import io.sentry.TransactionContext; +import io.sentry.TransactionOptions; +import io.sentry.kafka.SentryKafkaProducer; +import io.sentry.util.SpanUtils; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.springframework.kafka.listener.RecordInterceptor; +import org.springframework.kafka.support.KafkaHeaders; + +/** + * A {@link RecordInterceptor} that creates {@code queue.process} transactions for incoming Kafka + * records with distributed tracing support. + */ +@ApiStatus.Internal +public final class SentryKafkaRecordInterceptor implements RecordInterceptor { + + static final String TRACE_ORIGIN = "auto.queue.spring7.kafka.consumer"; + + private final @NotNull IScopes scopes; + private final @Nullable RecordInterceptor delegate; + + private static final @NotNull ThreadLocal currentContext = + new ThreadLocal<>(); + + public SentryKafkaRecordInterceptor(final @NotNull IScopes scopes) { + this(scopes, null); + } + + public SentryKafkaRecordInterceptor( + final @NotNull IScopes scopes, final @Nullable RecordInterceptor delegate) { + this.scopes = scopes; + this.delegate = delegate; + } + + @Override + public @Nullable ConsumerRecord intercept( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { + return delegateIntercept(record, consumer); + } + + try { + finishStaleContext(); + + final @NotNull IScopes forkedScopes = scopes.forkedRootScopes("SentryKafkaRecordInterceptor"); + final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); + currentContext.set(new SentryRecordContext(lifecycleToken, null)); + + final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record); + + final @Nullable ITransaction transaction = + startTransaction(forkedScopes, record, transactionContext); + currentContext.set(new SentryRecordContext(lifecycleToken, transaction)); + } catch (Throwable t) { + scopes.getOptions().getLogger().log(SentryLevel.ERROR, "Unable to wrap Kafka consumer.", t); + } + return delegateIntercept(record, consumer); + } + + @Override + public void success( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.success(record, consumer); + } + } finally { + finishSpan(SpanStatus.OK, null); + } + } + + @Override + public void failure( + final @NotNull ConsumerRecord record, + final @NotNull Exception exception, + final @NotNull Consumer consumer) { + try { + if (delegate != null) { + delegate.failure(record, exception, consumer); + } + } finally { + finishSpan(SpanStatus.INTERNAL_ERROR, exception); + } + } + + @Override + public void afterRecord( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (delegate != null) { + delegate.afterRecord(record, consumer); + } + } + + @Override + public void setupThreadState(final @NotNull Consumer consumer) { + if (delegate != null) { + delegate.setupThreadState(consumer); + } + } + + @Override + public void clearThreadState(final @NotNull Consumer consumer) { + try { + finishStaleContext(); + } finally { + if (delegate != null) { + delegate.clearThreadState(consumer); + } + } + } + + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), TRACE_ORIGIN); + } + + private @Nullable ConsumerRecord delegateIntercept( + final @NotNull ConsumerRecord record, final @NotNull Consumer consumer) { + if (delegate != null) { + return delegate.intercept(record, consumer); + } + return record; + } + + private @Nullable TransactionContext continueTrace( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); + final @Nullable List baggageHeaders = + headerValues(record, BaggageHeader.BAGGAGE_HEADER); + return forkedScopes.continueTrace(sentryTrace, baggageHeaders); + } + + private @Nullable ITransaction startTransaction( + final @NotNull IScopes forkedScopes, + final @NotNull ConsumerRecord record, + final @Nullable TransactionContext transactionContext) { + if (!forkedScopes.getOptions().isTracingEnabled()) { + return null; + } + + final @NotNull TransactionContext txContext = + transactionContext != null + ? transactionContext + : new TransactionContext("queue.process", "queue.process"); + txContext.setName("queue.process"); + txContext.setOperation("queue.process"); + + final @NotNull TransactionOptions txOptions = new TransactionOptions(); + txOptions.setOrigin(TRACE_ORIGIN); + txOptions.setBindToScope(true); + + final @NotNull ITransaction transaction = forkedScopes.startTransaction(txContext, txOptions); + + if (transaction.isNoOp()) { + return null; + } + + transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + + final @Nullable String messageId = headerValue(record, "messaging.message.id"); + if (messageId != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); + } + + final int bodySize = record.serializedValueSize(); + if (bodySize >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize); + } + + final @Nullable Integer retryCount = retryCount(record); + if (retryCount != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount); + } + + final @Nullable String enqueuedTimeStr = + headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER); + if (enqueuedTimeStr != null) { + try { + final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr); + final double nowSeconds = DateUtils.millisToSeconds(System.currentTimeMillis()); + final long latencyMs = (long) ((nowSeconds - enqueuedTimeSeconds) * 1000); + if (latencyMs >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs); + } + } catch (NumberFormatException ignored) { + // ignore malformed header + } + } + + return transaction; + } + + private @Nullable Integer retryCount(final @NotNull ConsumerRecord record) { + final @Nullable Header header = record.headers().lastHeader(KafkaHeaders.DELIVERY_ATTEMPT); + if (header == null) { + return null; + } + + final byte[] value = header.value(); + if (value == null || value.length != Integer.BYTES) { + return null; + } + + final int attempt = ByteBuffer.wrap(value).getInt(); + if (attempt <= 0) { + return null; + } + + return attempt - 1; + } + + private void finishStaleContext() { + if (currentContext.get() != null) { + finishSpan(SpanStatus.UNKNOWN, null); + } + } + + private void finishSpan(final @NotNull SpanStatus status, final @Nullable Throwable throwable) { + final @Nullable SentryRecordContext ctx = currentContext.get(); + if (ctx == null) { + return; + } + currentContext.remove(); + + try { + final @Nullable ITransaction transaction = ctx.transaction; + if (transaction != null) { + transaction.setStatus(status); + if (throwable != null) { + transaction.setThrowable(throwable); + } + transaction.finish(); + } + } finally { + ctx.lifecycleToken.close(); + } + } + + private @Nullable String headerValue( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + final @Nullable Header header = record.headers().lastHeader(headerName); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } + + private @Nullable List headerValues( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + @Nullable List values = null; + for (final @NotNull Header header : record.headers().headers(headerName)) { + if (header.value() != null) { + if (values == null) { + values = new ArrayList<>(); + } + values.add(new String(header.value(), StandardCharsets.UTF_8)); + } + } + return values; + } + + private static final class SentryRecordContext { + final @NotNull ISentryLifecycleToken lifecycleToken; + final @Nullable ITransaction transaction; + + SentryRecordContext( + final @NotNull ISentryLifecycleToken lifecycleToken, + final @Nullable ITransaction transaction) { + this.lifecycleToken = lifecycleToken; + this.transaction = transaction; + } + } +} diff --git a/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt b/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt new file mode 100644 index 00000000000..1efabac1422 --- /dev/null +++ b/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaConsumerBeanPostProcessorTest.kt @@ -0,0 +1,124 @@ +package io.sentry.spring7.kafka + +import kotlin.test.Test +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.mockito.kotlin.mock +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.listener.RecordInterceptor + +class SentryKafkaConsumerBeanPostProcessorTest { + + @Test + fun `wraps ConcurrentKafkaListenerContainerFactory with SentryKafkaRecordInterceptor`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.setConsumerFactory(consumerFactory) + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + // Verify via reflection that the interceptor was set + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val interceptor = field.get(factory) + assertTrue(interceptor is SentryKafkaRecordInterceptor<*, *>) + } + + @Test + fun `does not double-wrap when SentryKafkaRecordInterceptor already set`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.setConsumerFactory(consumerFactory) + + val processor = SentryKafkaConsumerBeanPostProcessor() + // First wrap + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val firstInterceptor = field.get(factory) + + // Second wrap — should be idempotent + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + val secondInterceptor = field.get(factory) + + assertSame(firstInterceptor, secondInterceptor) + } + + @Test + fun `does not wrap non-factory beans`() { + val someBean = "not a factory" + val processor = SentryKafkaConsumerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(someBean, "someBean") + + assertSame(someBean, result) + } + + @Test + fun `chains existing customer RecordInterceptor as delegate`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.setConsumerFactory(consumerFactory) + + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord, + consumer: Consumer, + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + val processor = SentryKafkaConsumerBeanPostProcessor() + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + val installed = field.get(factory) + assertTrue( + installed is SentryKafkaRecordInterceptor<*, *>, + "expected SentryKafkaRecordInterceptor, got ${installed?.javaClass}", + ) + + val delegateField = SentryKafkaRecordInterceptor::class.java.getDeclaredField("delegate") + delegateField.isAccessible = true + assertSame( + customerInterceptor, + delegateField.get(installed), + "customer interceptor must be preserved as delegate", + ) + } + + @Test + fun `skips installation when reflection fails and preserves customer interceptor`() { + val consumerFactory = mock>() + val factory = ConcurrentKafkaListenerContainerFactory() + factory.setConsumerFactory(consumerFactory) + val customerInterceptor = + object : RecordInterceptor { + override fun intercept( + record: ConsumerRecord, + consumer: Consumer, + ): ConsumerRecord? = record + } + factory.setRecordInterceptor(customerInterceptor) + + val field = factory.javaClass.superclass.getDeclaredField("recordInterceptor") + field.isAccessible = true + assertSame(customerInterceptor, field.get(factory)) + + val processor = SentryKafkaConsumerBeanPostProcessor("missingRecordInterceptor") + processor.postProcessAfterInitialization(factory, "kafkaListenerContainerFactory") + + assertSame( + customerInterceptor, + field.get(factory), + "customer interceptor must remain installed when Sentry cannot read it", + ) + } +} diff --git a/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaProducerBeanPostProcessorTest.kt b/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaProducerBeanPostProcessorTest.kt new file mode 100644 index 00000000000..e0317e74447 --- /dev/null +++ b/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaProducerBeanPostProcessorTest.kt @@ -0,0 +1,95 @@ +package io.sentry.spring7.kafka + +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.producer.Producer +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.mock +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.kafka.core.DefaultKafkaProducerFactory +import org.springframework.kafka.core.ProducerFactory +import org.springframework.kafka.core.ProducerPostProcessor + +class SentryKafkaProducerBeanPostProcessorTest { + + @Test + fun `registers Sentry post-processor on ProducerFactory`() { + val factory = mock>() + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + whenever(factory.postProcessors).thenReturn(listOf(pp)) + val processor = SentryKafkaProducerBeanPostProcessor() + + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") + + val captor = argumentCaptor>() + verify(factory).addPostProcessor(captor.capture()) + assertTrue( + captor.firstValue is SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor<*, *> + ) + } + + @Test + fun `does not throw when addPostProcessor is a no-op (default interface method)`() { + // Factory using the default no-op addPostProcessor / getPostProcessors + val factory = mock>() + whenever(factory.postProcessors).thenReturn(emptyList()) + val processor = SentryKafkaProducerBeanPostProcessor() + + // Should complete without throwing, and log a warning via ScopesAdapter + processor.postProcessAfterInitialization(factory, "myFactory") + + verify(factory).addPostProcessor(any()) + } + + @Test + fun `does not modify non-ProducerFactory beans`() { + val someBean = "not a producer factory" + val processor = SentryKafkaProducerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(someBean, "someBean") + + assertSame(someBean, result) + } + + @Test + fun `returns the same bean instance`() { + val factory = mock>() + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + whenever(factory.postProcessors).thenReturn(listOf(pp)) + val processor = SentryKafkaProducerBeanPostProcessor() + + val result = processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") + + assertSame(factory, result, "BPP must return the same bean, not a replacement") + } + + @Test + fun `registered post-processor wraps producers via SentryKafkaProducer wrap`() { + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + val raw = mock>() + + val wrapped = pp.apply(raw) + + assertTrue(java.lang.reflect.Proxy.isProxyClass(wrapped.javaClass)) + } + + @Test + fun `integrates with DefaultKafkaProducerFactory addPostProcessor contract`() { + // Sanity check against the real Spring Kafka API surface — DefaultKafkaProducerFactory + // honors addPostProcessor and exposes it via getPostProcessors(). + val factory = DefaultKafkaProducerFactory(emptyMap()) + val processor = SentryKafkaProducerBeanPostProcessor() + + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") + + assertEquals(1, factory.postProcessors.size) + assertTrue( + factory.postProcessors.first() + is SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor<*, *> + ) + } +} diff --git a/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaRecordInterceptorTest.kt new file mode 100644 index 00000000000..9d1162e60f2 --- /dev/null +++ b/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaRecordInterceptorTest.kt @@ -0,0 +1,465 @@ +package io.sentry.spring7.kafka + +import io.sentry.BaggageHeader +import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken +import io.sentry.Sentry +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.SpanDataConvention +import io.sentry.TransactionContext +import io.sentry.kafka.SentryKafkaProducer +import io.sentry.test.initForTest +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.Optional +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertNull +import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.apache.kafka.common.record.TimestampType +import org.mockito.kotlin.any +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.times +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.kafka.listener.RecordInterceptor +import org.springframework.kafka.support.KafkaHeaders + +class SentryKafkaRecordInterceptorTest { + + private lateinit var scopes: IScopes + private lateinit var forkedScopes: IScopes + private lateinit var options: SentryOptions + private lateinit var consumer: Consumer + private lateinit var lifecycleToken: ISentryLifecycleToken + private lateinit var transaction: SentryTracer + + @BeforeTest + fun setup() { + initForTest { it.dsn = "https://key@sentry.io/proj" } + scopes = mock() + consumer = mock() + lifecycleToken = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + tracesSampleRate = 1.0 + } + whenever(scopes.options).thenReturn(options) + whenever(scopes.isEnabled).thenReturn(true) + + forkedScopes = mock() + whenever(scopes.forkedRootScopes(any())).thenReturn(forkedScopes) + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + + transaction = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes) + whenever(forkedScopes.startTransaction(any(), any())) + .thenReturn(transaction) + } + + @AfterTest + fun teardown() { + Sentry.close() + } + + private fun createRecord( + topic: String = "my-topic", + headers: RecordHeaders = RecordHeaders(), + serializedValueSize: Int = -1, + ): ConsumerRecord { + return ConsumerRecord( + topic, + 0, + 0L, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + 3, + serializedValueSize, + "key", + "value", + headers, + Optional.empty(), + ) + } + + private fun createRecordWithHeaders( + sentryTrace: String? = null, + baggage: String? = null, + baggageHeaders: List? = null, + enqueuedTime: String? = null, + deliveryAttempt: Int? = null, + ): ConsumerRecord { + val headers = RecordHeaders() + sentryTrace?.let { + headers.add(SentryTraceHeader.SENTRY_TRACE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + baggage?.let { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + baggageHeaders?.forEach { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + enqueuedTime?.let { + headers.add( + SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER, + it.toByteArray(StandardCharsets.UTF_8), + ) + } + deliveryAttempt?.let { + headers.add( + KafkaHeaders.DELIVERY_ATTEMPT, + ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array(), + ) + } + val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") + headers.forEach { record.headers().add(it) } + return record + } + + @Test + fun `intercept forks root scopes`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor") + verify(forkedScopes).makeCurrent() + } + + @Test + fun `intercept continues trace from headers`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val record = createRecordWithHeaders(sentryTrace = sentryTraceValue) + + interceptor.intercept(record, consumer) + + verify(forkedScopes) + .continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull()) + } + + @Test + fun `intercept calls continueTrace with null when no headers`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) + } + + @Test + fun `intercept passes all baggage headers to continueTrace`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val record = + createRecordWithHeaders( + sentryTrace = sentryTraceValue, + baggageHeaders = listOf("third=party", "sentry-sample_rate=1"), + ) + + interceptor.intercept(record, consumer) + + verify(forkedScopes) + .continueTrace( + org.mockito.kotlin.eq(sentryTraceValue), + org.mockito.kotlin.eq(listOf("third=party", "sentry-sample_rate=1")), + ) + } + + @Test + fun `sets body size from serializedValueSize`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord(serializedValueSize = 42) + + interceptor.intercept(record, consumer) + + assertEquals(42, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE)) + } + + @Test + fun `does not set body size when serializedValueSize is negative`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord(serializedValueSize = -1) + + interceptor.intercept(record, consumer) + + assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE)) + } + + @Test + fun `sets retry count from delivery attempt header`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecordWithHeaders(deliveryAttempt = 3) + + interceptor.intercept(record, consumer) + + assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + + @Test + fun `does not set retry count when delivery attempt header is missing`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) + } + + @Test + fun `sets receive latency from enqueued time in epoch seconds`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString() + val record = createRecordWithHeaders(enqueuedTime = enqueuedTime) + + interceptor.intercept(record, consumer) + + val latency = transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY) + assertTrue(latency is Long && latency >= 0) + } + + @Test + fun `does not create span when queue tracing is disabled`() { + options.isEnableQueueTracing = false + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + val result = interceptor.intercept(record, consumer) + + verify(scopes, never()).forkedRootScopes(any()) + verify(forkedScopes, never()).makeCurrent() + assertEquals(record, result) + } + + @Test + fun `does not create span when origin is ignored`() { + options.setIgnoredSpanOrigins(listOf(SentryKafkaRecordInterceptor.TRACE_ORIGIN)) + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + val result = interceptor.intercept(record, consumer) + + verify(scopes, never()).forkedRootScopes(any()) + verify(forkedScopes, never()).makeCurrent() + assertEquals(record, result) + } + + @Test + fun `delegates to existing interceptor`() { + val delegate = mock>() + val record = createRecord() + whenever(delegate.intercept(record, consumer)).thenReturn(record) + + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + interceptor.intercept(record, consumer) + + verify(delegate).intercept(record, consumer) + } + + @Test + fun `success finishes transaction and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.intercept(record, consumer) + interceptor.success(record, consumer) + + verify(delegate).success(record, consumer) + } + + @Test + fun `failure finishes transaction with error and delegates`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + val exception = RuntimeException("processing failed") + + interceptor.intercept(record, consumer) + interceptor.failure(record, exception, consumer) + + verify(delegate).failure(record, exception, consumer) + } + + @Test + fun `afterRecord delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.afterRecord(record, consumer) + + verify(delegate).afterRecord(record, consumer) + } + + @Test + fun `trace origin is set correctly`() { + assertEquals("auto.queue.spring7.kafka.consumer", SentryKafkaRecordInterceptor.TRACE_ORIGIN) + } + + @Test + fun `clearThreadState cleans up stale context`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + interceptor.intercept(record, consumer) + + interceptor.clearThreadState(consumer) + + verify(lifecycleToken).close() + } + + @Test + fun `clearThreadState is no-op when no context exists`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + + // should not throw + interceptor.clearThreadState(consumer) + } + + @Test + fun `setupThreadState delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + + verify(delegate).setupThreadState(consumer) + } + + @Test + fun `setupThreadState is no-op without delegate`() { + val interceptor = SentryKafkaRecordInterceptor(scopes) + + // should not throw + interceptor.setupThreadState(consumer) + } + + @Test + fun `clearThreadState delegates to existing interceptor`() { + val delegate = mock>() + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.clearThreadState(consumer) + + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `clearThreadState delegates to existing interceptor even when sentry cleanup throws`() { + val delegate = mock>() + whenever(lifecycleToken.close()).thenThrow(RuntimeException("boom")) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + val record = createRecord() + + interceptor.intercept(record, consumer) + + try { + interceptor.clearThreadState(consumer) + } catch (ignored: RuntimeException) { + // expected + } + + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `full lifecycle intercept success clearThreadState closes token exactly once`() { + val delegate = mock>() + val record = createRecord() + whenever(delegate.intercept(record, consumer)).thenReturn(record) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + interceptor.intercept(record, consumer) + interceptor.success(record, consumer) + interceptor.clearThreadState(consumer) + + // token closed once by success(); clearThreadState must not re-close it + verify(lifecycleToken, times(1)).close() + assertTrue(transaction.isFinished) + // delegate hooks still delegated across the full lifecycle + verify(delegate).setupThreadState(consumer) + verify(delegate).success(record, consumer) + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `when delegate intercept returns null clearThreadState still finishes transaction and closes token`() { + val delegate = mock>() + val record = createRecord() + // delegate filters the record — per Spring Kafka contract, success/failure will not be invoked + whenever(delegate.intercept(record, consumer)).thenReturn(null) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + val result = interceptor.intercept(record, consumer) + interceptor.clearThreadState(consumer) + + assertNull(result) + verify(lifecycleToken, times(1)).close() + assertTrue(transaction.isFinished) + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `when delegate intercept throws clearThreadState still finishes transaction and closes token`() { + val delegate = mock>() + val record = createRecord() + val boom = RuntimeException("delegate boom") + whenever(delegate.intercept(record, consumer)).thenThrow(boom) + val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) + + interceptor.setupThreadState(consumer) + val thrown = assertFailsWith { interceptor.intercept(record, consumer) } + assertEquals(boom, thrown) + + interceptor.clearThreadState(consumer) + + verify(lifecycleToken, times(1)).close() + assertTrue(transaction.isFinished) + verify(delegate).clearThreadState(consumer) + } + + @Test + fun `intercept cleans up stale context from previous record`() { + val lifecycleToken2 = mock() + val forkedScopes2 = mock() + whenever(forkedScopes2.options).thenReturn(options) + whenever(forkedScopes2.makeCurrent()).thenReturn(lifecycleToken2) + val tx2 = SentryTracer(TransactionContext("queue.process", "queue.process"), forkedScopes2) + whenever(forkedScopes2.startTransaction(any(), any())).thenReturn(tx2) + + var callCount = 0 + + val interceptor = SentryKafkaRecordInterceptor(scopes) + val record = createRecord() + + whenever(scopes.forkedRootScopes(any())).thenAnswer { + callCount++ + if (callCount == 1) forkedScopes else forkedScopes2 + } + + // First intercept sets up context + interceptor.intercept(record, consumer) + + // Second intercept without success/failure — should clean up stale context first + interceptor.intercept(record, consumer) + + // First lifecycle token should have been closed by the defensive cleanup + verify(lifecycleToken).close() + } +} diff --git a/sentry-spring-boot-4/build.gradle.kts b/sentry-spring-boot-4/build.gradle.kts index 69a40f7b64f..3b0b3be8630 100644 --- a/sentry-spring-boot-4/build.gradle.kts +++ b/sentry-spring-boot-4/build.gradle.kts @@ -36,6 +36,7 @@ dependencies { compileOnly(projects.sentryGraphql) compileOnly(projects.sentryGraphql22) compileOnly(projects.sentryQuartz) + compileOnly(libs.spring.kafka4) compileOnly(Config.Libs.springWeb) compileOnly(Config.Libs.springWebflux) compileOnly(libs.context.propagation) @@ -68,6 +69,7 @@ dependencies { testImplementation(projects.sentryApacheHttpClient5) testImplementation(projects.sentryGraphql) testImplementation(projects.sentryGraphql22) + testImplementation(projects.sentryKafka) testImplementation(projects.sentryOpentelemetry.sentryOpentelemetryCore) testImplementation(projects.sentryOpentelemetry.sentryOpentelemetryAgent) testImplementation(projects.sentryOpentelemetry.sentryOpentelemetryAgentcustomization) @@ -96,6 +98,7 @@ dependencies { testImplementation(libs.springboot4.starter) testImplementation(libs.springboot4.starter.aspectj) testImplementation(libs.springboot4.starter.graphql) + testImplementation(libs.spring.kafka4) testImplementation(libs.springboot4.starter.quartz) testImplementation(libs.springboot4.starter.security) testImplementation(libs.springboot4.starter.test) diff --git a/sentry-spring-boot-4/src/main/java/io/sentry/spring/boot4/SentryAutoConfiguration.java b/sentry-spring-boot-4/src/main/java/io/sentry/spring/boot4/SentryAutoConfiguration.java index ae9e3ac50fe..2429c1e7446 100644 --- a/sentry-spring-boot-4/src/main/java/io/sentry/spring/boot4/SentryAutoConfiguration.java +++ b/sentry-spring-boot-4/src/main/java/io/sentry/spring/boot4/SentryAutoConfiguration.java @@ -31,6 +31,8 @@ import io.sentry.spring7.checkin.SentryQuartzConfiguration; import io.sentry.spring7.exception.SentryCaptureExceptionParameterPointcutConfiguration; import io.sentry.spring7.exception.SentryExceptionParameterAdviceConfiguration; +import io.sentry.spring7.kafka.SentryKafkaConsumerBeanPostProcessor; +import io.sentry.spring7.kafka.SentryKafkaProducerBeanPostProcessor; import io.sentry.spring7.opentelemetry.SentryOpenTelemetryAgentWithoutAutoInitConfiguration; import io.sentry.spring7.opentelemetry.SentryOpenTelemetryNoAgentConfiguration; import io.sentry.spring7.tracing.CombinedTransactionNameProvider; @@ -244,6 +246,34 @@ static class SentryCacheConfiguration { } } + @Configuration(proxyBeanMethods = false) + @ConditionalOnClass( + name = { + "org.springframework.kafka.core.KafkaTemplate", + "io.sentry.kafka.SentryKafkaProducer" + }) + @ConditionalOnProperty(name = "sentry.enable-queue-tracing", havingValue = "true") + @ConditionalOnMissingClass({ + "io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider", + "io.sentry.opentelemetry.agent.AgentMarker" + }) + @Open + static class SentryKafkaQueueConfiguration { + + @Bean + public static @NotNull SentryKafkaProducerBeanPostProcessor + sentryKafkaProducerBeanPostProcessor() { + SentryIntegrationPackageStorage.getInstance().addIntegration("SpringKafka"); + return new SentryKafkaProducerBeanPostProcessor(); + } + + @Bean + public static @NotNull SentryKafkaConsumerBeanPostProcessor + sentryKafkaConsumerBeanPostProcessor() { + return new SentryKafkaConsumerBeanPostProcessor(); + } + } + @Configuration(proxyBeanMethods = false) @ConditionalOnClass(ProceedingJoinPoint.class) @ConditionalOnProperty( diff --git a/sentry-spring-boot-4/src/test/kotlin/io/sentry/spring/boot4/SentryKafkaAutoConfigurationTest.kt b/sentry-spring-boot-4/src/test/kotlin/io/sentry/spring/boot4/SentryKafkaAutoConfigurationTest.kt new file mode 100644 index 00000000000..d4d2b439427 --- /dev/null +++ b/sentry-spring-boot-4/src/test/kotlin/io/sentry/spring/boot4/SentryKafkaAutoConfigurationTest.kt @@ -0,0 +1,125 @@ +package io.sentry.spring.boot4 + +import io.sentry.kafka.SentryKafkaProducer +import io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider +import io.sentry.opentelemetry.agent.AgentMarker +import io.sentry.spring7.kafka.SentryKafkaConsumerBeanPostProcessor +import io.sentry.spring7.kafka.SentryKafkaProducerBeanPostProcessor +import kotlin.test.Test +import org.assertj.core.api.Assertions.assertThat +import org.springframework.boot.autoconfigure.AutoConfigurations +import org.springframework.boot.test.context.FilteredClassLoader +import org.springframework.boot.test.context.runner.ApplicationContextRunner +import org.springframework.kafka.core.KafkaTemplate + +class SentryKafkaAutoConfigurationTest { + + private val contextRunner = + ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(SentryAutoConfiguration::class.java)) + .withPropertyValues( + "sentry.dsn=http://key@localhost/proj", + "sentry.traces-sample-rate=1.0", + "sentry.shutdownTimeoutMillis=0", + "sentry.sessionFlushTimeoutMillis=0", + "sentry.flushTimeoutMillis=0", + "sentry.readTimeoutMillis=50", + "sentry.connectionTimeoutMillis=50", + "sentry.send-modules=false", + "sentry.debug=false", + ) + + private val noOtelClassLoader = + FilteredClassLoader( + SentryAutoConfigurationCustomizerProvider::class.java, + AgentMarker::class.java, + ) + + private val noOtelCustomizerClassLoader = + FilteredClassLoader(SentryAutoConfigurationCustomizerProvider::class.java) + + private val noSentryKafkaClassLoader = + FilteredClassLoader( + SentryKafkaProducer::class.java, + SentryAutoConfigurationCustomizerProvider::class.java, + AgentMarker::class.java, + ) + + private val noSpringKafkaClassLoader = + FilteredClassLoader( + KafkaTemplate::class.java, + SentryAutoConfigurationCustomizerProvider::class.java, + AgentMarker::class.java, + ) + + @Test + fun `registers Kafka BPPs when queue tracing is enabled`() { + contextRunner + .withClassLoader(noOtelClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=true") + .run { context -> + assertThat(context).hasSingleBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).hasSingleBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when queue tracing is disabled`() { + contextRunner.withClassLoader(noOtelClassLoader).run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when sentry-kafka is not present`() { + contextRunner + .withClassLoader(noSentryKafkaClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=true") + .run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when spring-kafka is not present`() { + contextRunner + .withClassLoader(noSpringKafkaClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=true") + .run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when queue tracing is explicitly false`() { + contextRunner + .withClassLoader(noOtelClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=false") + .run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when OpenTelemetry agent is present`() { + contextRunner + .withClassLoader(noOtelCustomizerClassLoader) + .withPropertyValues("sentry.enable-queue-tracing=true") + .run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } + + @Test + fun `does not register Kafka BPPs when OpenTelemetry integration is present`() { + contextRunner.withPropertyValues("sentry.enable-queue-tracing=true").run { context -> + assertThat(context).doesNotHaveBean(SentryKafkaProducerBeanPostProcessor::class.java) + assertThat(context).doesNotHaveBean(SentryKafkaConsumerBeanPostProcessor::class.java) + } + } +} diff --git a/test/system-test-runner.py b/test/system-test-runner.py index f20a9bd8d62..6c6a8604f94 100644 --- a/test/system-test-runner.py +++ b/test/system-test-runner.py @@ -73,11 +73,17 @@ "sentry-samples-spring-boot-jakarta", "sentry-samples-spring-boot-jakarta-opentelemetry", "sentry-samples-spring-boot-jakarta-opentelemetry-noagent", + "sentry-samples-spring-boot-4", + "sentry-samples-spring-boot-4-opentelemetry", + "sentry-samples-spring-boot-4-opentelemetry-noagent", } KAFKA_PROFILE_REQUIRED_MODULES = { "sentry-samples-spring-boot-jakarta", "sentry-samples-spring-boot-jakarta-opentelemetry", "sentry-samples-spring-boot-jakarta-opentelemetry-noagent", + "sentry-samples-spring-boot-4", + "sentry-samples-spring-boot-4-opentelemetry", + "sentry-samples-spring-boot-4-opentelemetry-noagent", } class ServerType(Enum):