We typically use a shared Kafka instance if we are working on Kafka in our local environment. Using a shared Kafka instance is an issue as there could be Kafka consumers (listeners), which other development team members could launch on the same topic, or another listener could be running somewhere else. So, to circumvent this, we typically launch our own Kafka instance and work against it. Setting up a local Kafka instance is relatively easy; if you have experience working with Kafka, we must download and launch the binaries, factoring in all dependencies like ZooKeeper. If you don't have experience, setting up Kafka's local instance can become tedious and time-consuming. Instead, we could consider using Kafka Docker container leveraging test containers and the docker desktop app.
This article covers detailed steps for setting up a Kafka test container in the local development environment for Spring Microservice.
Pre-Requisites
- Docker Desktop installed.
- Springboot Application
Configure Test Container for Local Development
First step is to configure all the required packages, we add following maven packages to the pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.17.3</version> <!-- We use the latest version -->
<scope>compile</scope> <!-- Change to 'compile' -->
</dependency>
We need to change the scope of the spring-boot-testcontainers
and org.testcontainers
to compile
instead of test
Once we have added the required packages, we must ensure that the Kafka test container is launched only in the local development environment and not in a higher environment like our UAT, integration Testing, or Production Environments. For this, we can use Spring Profiles. We add an application property file with a ‘local’ Spring profile.
Now we add the Kafka container configuration class annotated with @Profile("local"), ensuring the configuration is only active for the local profile.
@Configuration
@Profile("local") // This makes sure the configuration is only active for the local profile
public class KafkaTestContainerConfig {
@Bean
public KafkaContainer kafkaContainer() {
KafkaContainer kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
kafka.start();
System.setProperty("spring.kafka.bootstrap-servers", kafka.getBootstrapServers());
return kafka;
}
}
We will now configure the Kafka Producer and the Consumer.
Producer
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, MessageRequest> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, MessageRequest> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Consumer
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageRequest> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MessageRequest> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public DefaultKafkaConsumerFactory<String, MessageRequest> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.mytechnetknowhows.kafka.testcontainers");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(MessageRequest.class));
}
}
Send Receive Message
We then add the Spring Service, which will send messages to Kafka and configure Kafka Listerner in the same process. In the real world, the listener would be a separate Spring application.
@Service
public class KafkaMessageService {
public static final String TOPIC = "message_request_multi_partition_topic";
private final KafkaTemplate<String, MessageRequest> kafkaTemplate;
private static final Logger logger = LoggerFactory.getLogger(KafkaMessageService.class);
@Autowired
public KafkaMessageService(KafkaTemplate<String, MessageRequest> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Autowired
MessageStore messageStore;
public String sendMessage(MessageRequest messageRequest) {
String message = null;
try {
kafkaTemplate.send(TOPIC, messageRequest);
message = "Message sent";
} catch (Exception ex){
message = "Error occurred while sending message";
logger.error("KafkaMessageService::sendMessage - An error occurred while sending message, detail error: ", ex);
}
return message;
}
@KafkaListener(topics = TOPIC, groupId = "test-group",
containerFactory = "kafkaListenerContainerFactory")
public void processMessageRequest(MessageRequest messageRequest) {
logger.info("Received Message Request: " + messageRequest.getOrderNumber());
messageStore.addMessage(messageRequest);
}
}
We will then add the API endpoint which provides an entry for sending messages
@RestController
@RequestMapping("/api/v1/message")
public class KafkaMessageController {
@Autowired
KafkaMessageService kafkaMessageService;
@PostMapping("/send")
public MessageResponse postMessage(@RequestBody MessageRequest messageRequest) {
String response = this.kafkaMessageService.sendMessage(messageRequest);
MessageResponse messageResponse = new MessageResponse();
messageResponse.setMessage(response);
return messageResponse;
}
}
Now we have all the required code set up, when we launch the spring boot application we will launch with spring profile of 'local'.
Once the application is launched, we will see all the Kafka logs, as shown below.
2024-06-08T12:29:59.375-04:00 INFO 31740 --- [kafka.testcontainers] [ main] tc.testcontainers/ryuk:0.7.0 : Creating container for image: testcontainers/ryuk:0.7.0
2024-06-08T12:29:59.572-04:00 INFO 31740 --- [kafka.testcontainers] [ main] o.t.utility.RegistryAuthLocator : Credential helper/store (docker-credential-desktop) does not have credentials for https://index.docker.io/v1/
2024-06-08T12:29:59.658-04:00 INFO 31740 --- [kafka.testcontainers] [ main] tc.testcontainers/ryuk:0.7.0 : Container testcontainers/ryuk:0.7.0 is starting: a0a414e62aac002e123702196d20cf64c1aae5373713eb66100b5d8d9f0539aa
2024-06-08T12:29:59.988-04:00 INFO 31740 --- [kafka.testcontainers] [ main] tc.testcontainers/ryuk:0.7.0 : Container testcontainers/ryuk:0.7.0 started in PT0.6123508S
2024-06-08T12:29:59.992-04:00 INFO 31740 --- [kafka.testcontainers] [ main] o.t.utility.RyukResourceReaper : Ryuk started - will monitor and terminate Testcontainers containers on JVM exit
2024-06-08T12:29:59.992-04:00 INFO 31740 --- [kafka.testcontainers] [ main] org.testcontainers.DockerClientFactory : Checking the system...
2024-06-08T12:29:59.992-04:00 INFO 31740 --- [kafka.testcontainers] [ main] org.testcontainers.DockerClientFactory : ✔︎ Docker server version should be at least 1.6.0
2024-06-08T12:29:59.992-04:00 INFO 31740 --- [kafka.testcontainers] [ main] tc.confluentinc/cp-kafka:latest : Creating container for image: confluentinc/cp-kafka:latest
2024-06-08T12:30:00.026-04:00 INFO 31740 --- [kafka.testcontainers] [ main] tc.confluentinc/cp-kafka:latest : Container confluentinc/cp-kafka:latest is starting: 59b261961e20f66e2f44b72a2125570814ae695d993766ea8a18acf315a4e205
2024-06-08T12:30:03.252-04:00 INFO 31740 --- [kafka.testcontainers] [ main] tc.confluentinc/cp-kafka:latest : Container confluentinc/cp-kafka:latest started in PT3.2597275S
2024-06-08T12:30:04.810-04:00 INFO 31740 --- [kafka.testcontainers] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8080 (http) with context path '/'
We then now can invoke the endpoint through Postman
We will see the message listener has been invoked
2024-06-08T12:02:36.779-04:00 INFO 30488 --- [kafka.testcontainers] [ntainer#0-0-C-1] c.m.k.t.service.KafkaMessageService : Received Message Request: IC-91911
Now, if we launch with a different profile like UAT or PROD, we will not see the Kafka container being launched, and the consumer will attempt to connect to Kafka instance in the UAT profile.
Conclusion
In conclusion, utilizing Kafka Test containers for local development within Spring Boot applications streamlines the development process and enhances consistency and isolation across local development environments. This approach minimizes potential disruptions caused by shared Kafka instances and offers a scalable, flexible solution that adapts seamlessly to the needs of individual developers and teams.
This source code for this is available on GitHub