Skip to content

Commit d65eb69

Browse files
committed
Automatically configure Spring Kafka's observation convention beans
This automatically registers KafkaListenerObservationConvention on the container factory, and KafkaTemplateObservationConvention on the Kafka template. Closes gh-48914
1 parent 697c8fe commit d65eb69

File tree

6 files changed

+47
-2
lines changed

6 files changed

+47
-2
lines changed

documentation/spring-boot-docs/src/docs/antora/modules/reference/pages/messaging/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ include-code::MyBean[]
3232
NOTE: If the property configprop:spring.kafka.producer.transaction-id-prefix[] is defined, a javadoc:org.springframework.kafka.transaction.KafkaTransactionManager[] is automatically configured.
3333
Also, if a javadoc:org.springframework.kafka.support.converter.RecordMessageConverter[] bean is defined, it is automatically associated to the auto-configured javadoc:org.springframework.kafka.core.KafkaTemplate[].
3434

35+
If there's a bean of type `KafkaTemplateObservationConvention` in the context, it is automatically registered on the `KafkaTemplate`.
36+
3537

3638

3739
[[messaging.kafka.receiving]]
@@ -52,6 +54,8 @@ If only a javadoc:org.springframework.kafka.support.converter.RecordMessageConve
5254

5355
TIP: A custom javadoc:org.springframework.kafka.transaction.ChainedKafkaTransactionManager[] must be marked javadoc:org.springframework.context.annotation.Primary[format=annotation] as it usually references the auto-configured javadoc:org.springframework.kafka.transaction.KafkaTransactionManager[] bean.
5456

57+
If there's a bean of type `KafkaListenerObservationConvention` in the context, it is automatically registered on the container factory.
58+
5559

5660

5761
[[messaging.kafka.streams]]

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
3838
import org.springframework.kafka.support.converter.BatchMessageConverter;
3939
import org.springframework.kafka.support.converter.RecordMessageConverter;
40+
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
4041
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
4142
import org.springframework.util.Assert;
4243

@@ -82,6 +83,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
8283

8384
private @Nullable SimpleAsyncTaskExecutor listenerTaskExecutor;
8485

86+
private @Nullable KafkaListenerObservationConvention observationConvention;
87+
8588
/**
8689
* Set the {@link KafkaProperties} to use.
8790
* @param properties the properties
@@ -186,6 +189,14 @@ void setListenerTaskExecutor(@Nullable SimpleAsyncTaskExecutor listenerTaskExecu
186189
this.listenerTaskExecutor = listenerTaskExecutor;
187190
}
188191

192+
/**
193+
* Sets the observation convention.
194+
* @param observationConvention the observation convention
195+
*/
196+
void setObservationConvention(@Nullable KafkaListenerObservationConvention observationConvention) {
197+
this.observationConvention = observationConvention;
198+
}
199+
189200
/**
190201
* Configure the specified Kafka listener container factory. The factory can be
191202
* further tuned and default settings can be overridden.
@@ -249,6 +260,7 @@ private void configureContainer(ContainerProperties container) {
249260
map.from(this.transactionManager).to(container::setKafkaAwareTransactionManager);
250261
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
251262
map.from(this.listenerTaskExecutor).to(container::setListenerTaskExecutor);
263+
map.from(this.observationConvention).to(container::setObservationConvention);
252264
}
253265

254266
}

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAnnotationDrivenConfiguration.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.springframework.kafka.support.converter.BatchMessageConverter;
4747
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
4848
import org.springframework.kafka.support.converter.RecordMessageConverter;
49+
import org.springframework.kafka.support.micrometer.KafkaListenerObservationConvention;
4950
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
5051

5152
/**
@@ -86,6 +87,8 @@ class KafkaAnnotationDrivenConfiguration {
8687

8788
private final @Nullable Function<MessageListenerContainer, String> threadNameSupplier;
8889

90+
private final @Nullable KafkaListenerObservationConvention observationConvention;
91+
8992
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
9093
ObjectProvider<RecordMessageConverter> recordMessageConverter,
9194
ObjectProvider<RecordFilterStrategy<Object, Object>> recordFilterStrategy,
@@ -97,7 +100,8 @@ class KafkaAnnotationDrivenConfiguration {
97100
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
98101
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor,
99102
ObjectProvider<BatchInterceptor<Object, Object>> batchInterceptor,
100-
ObjectProvider<Function<MessageListenerContainer, String>> threadNameSupplier) {
103+
ObjectProvider<Function<MessageListenerContainer, String>> threadNameSupplier,
104+
ObjectProvider<KafkaListenerObservationConvention> observationConvention) {
101105
this.properties = properties;
102106
this.recordMessageConverter = recordMessageConverter.getIfUnique();
103107
this.recordFilterStrategy = recordFilterStrategy.getIfUnique();
@@ -111,6 +115,7 @@ class KafkaAnnotationDrivenConfiguration {
111115
this.recordInterceptor = recordInterceptor.getIfUnique();
112116
this.batchInterceptor = batchInterceptor.getIfUnique();
113117
this.threadNameSupplier = threadNameSupplier.getIfUnique();
118+
this.observationConvention = observationConvention.getIfUnique();
114119
}
115120

116121
@Bean
@@ -145,6 +150,7 @@ private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() {
145150
configurer.setRecordInterceptor(this.recordInterceptor);
146151
configurer.setBatchInterceptor(this.batchInterceptor);
147152
configurer.setThreadNameSupplier(this.threadNameSupplier);
153+
configurer.setObservationConvention(this.observationConvention);
148154
return configurer;
149155
}
150156

module/spring-boot-kafka/src/main/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfiguration.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.springframework.kafka.support.LoggingProducerListener;
6262
import org.springframework.kafka.support.ProducerListener;
6363
import org.springframework.kafka.support.converter.RecordMessageConverter;
64+
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;
6465
import org.springframework.kafka.transaction.KafkaTransactionManager;
6566
import org.springframework.util.StringUtils;
6667
import org.springframework.util.backoff.BackOff;
@@ -103,10 +104,12 @@ PropertiesKafkaConnectionDetails kafkaConnectionDetails(ObjectProvider<SslBundle
103104
@ConditionalOnMissingBean(KafkaTemplate.class)
104105
KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
105106
ProducerListener<Object, Object> kafkaProducerListener,
106-
ObjectProvider<RecordMessageConverter> messageConverter) {
107+
ObjectProvider<RecordMessageConverter> messageConverter,
108+
ObjectProvider<KafkaTemplateObservationConvention> observationConvention) {
107109
PropertyMapper map = PropertyMapper.get();
108110
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
109111
messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
112+
observationConvention.ifUnique(kafkaTemplate::setObservationConvention);
110113
map.from(kafkaProducerListener).to(kafkaTemplate::setProducerListener);
111114
map.from(this.properties.getTemplate().getDefaultTopic()).to(kafkaTemplate::setDefaultTopic);
112115
map.from(this.properties.getTemplate().getTransactionIdPrefix()).to(kafkaTemplate::setTransactionIdPrefix);

module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
2727
import org.springframework.kafka.core.ConsumerFactory;
2828
import org.springframework.kafka.listener.MessageListenerContainer;
29+
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
2930

3031
import static org.assertj.core.api.Assertions.assertThat;
3132
import static org.mockito.BDDMockito.then;
@@ -89,4 +90,12 @@ void shouldApplyAuthExceptionRetryInterval() {
8990
.isEqualTo(Duration.ofSeconds(10));
9091
}
9192

93+
@Test
94+
void shouldApplyObservationConvention() {
95+
DefaultKafkaListenerObservationConvention convention = new DefaultKafkaListenerObservationConvention();
96+
this.configurer.setObservationConvention(convention);
97+
this.configurer.configure(this.factory, this.consumerFactory);
98+
assertThat(this.factory.getContainerProperties().getObservationConvention()).isSameAs(convention);
99+
}
100+
92101
}

module/spring-boot-kafka/src/test/java/org/springframework/boot/kafka/autoconfigure/KafkaAutoConfigurationTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@
9494
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
9595
import org.springframework.kafka.support.converter.MessagingMessageConverter;
9696
import org.springframework.kafka.support.converter.RecordMessageConverter;
97+
import org.springframework.kafka.support.micrometer.KafkaTemplateObservation.DefaultKafkaTemplateObservationConvention;
98+
import org.springframework.kafka.support.micrometer.KafkaTemplateObservationConvention;
9799
import org.springframework.kafka.transaction.KafkaAwareTransactionManager;
98100
import org.springframework.kafka.transaction.KafkaTransactionManager;
99101
import org.springframework.test.util.ReflectionTestUtils;
@@ -1015,6 +1017,15 @@ void shouldRegisterRuntimeHints() {
10151017
.withMemberCategories(MemberCategory.INVOKE_PUBLIC_CONSTRUCTORS)).accepts(runtimeHints);
10161018
}
10171019

1020+
@Test
1021+
void shouldConfigureObservationConvention() {
1022+
KafkaTemplateObservationConvention convention = new DefaultKafkaTemplateObservationConvention();
1023+
this.contextRunner.withBean(KafkaTemplateObservationConvention.class, () -> convention).run((context) -> {
1024+
KafkaTemplate<?, ?> template = context.getBean(KafkaTemplate.class);
1025+
assertThat(template).hasFieldOrPropertyWithValue("observationConvention", convention);
1026+
});
1027+
}
1028+
10181029
private KafkaConnectionDetails kafkaConnectionDetails() {
10191030
return new KafkaConnectionDetails() {
10201031

0 commit comments

Comments
 (0)