Microservices with Spring Boot : Event driven architecture with Apache Kafka
Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.
Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:
To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.
To store streams of events durably and reliably for as long as you want.
To process streams of events as they occur or retrospectively.
And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner. Kafka can be deployed on bare-metal hardware, virtual machines, and containers, and on-premises as well as in the cloud. You can choose between self-managing your Kafka environments and using fully managed services offered by a variety of vendors.
In this article we shall work on Wikimedia events stream.
We will setup a local Kafka cluster on our local machine and build two microservices, one that will publish messages to Kafka and other will consume the messages and store them in MySQL database.
Set up Kafka on your machine.
Download Kafka from kafka.apache.org/downloads
For instructions to set up the Kafka environment, refer to kafka.apache.org/quickstart
For Mac and Linux users,
Open terminal and navigate to the downloaded kafka directory.
Start the Zookeeper service
bin/zookeeper-server-start.sh config/zookeeper.properties
In another terminal instance, start the Kafka broker service.
bin/kafka-server-start.sh config/server.properties
For Windows users
Open command prompt and navigate to the downloaded kafka directory.
Start the Zookeeper service
bin\wndows\zookeeper-server-start.bat config\zookeeper.properties
In another command prompt instance, start the Kafka broker service.
bin\windows\kafka-server-start.bat config\server.properties
The Kafka broker will be up at port 9092
.
Set up the Producer Microservice
Go to start.spring.io
For this article, we will create a maven project. Add the following dependencies
Spring for Apache Kafka
Lombok
- Generate the project and open it in any IDE (like IntelliJ, Eclipse, VSCode, etc.).
Now in the producer microservice, we have the following dependencies
producer/pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
To process the wikimedia stream, we need to create the event source and event handlers. For this purpose, we will add the the following dependencies in pom.xml of producer microservice.
<!-- https://mvnrepository.com/artifact/com.launchdarkly/okhttp-eventsource -->
<dependency>
<groupId>com.launchdarkly</groupId>
<artifactId>okhttp-eventsource</artifactId>
<version>2.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4.1</version>
</dependency>
Finally, the dependencies in pom.xml for producer microservice will look like :
producer/pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.launchdarkly/okhttp-eventsource -->
<dependency>
<groupId>com.launchdarkly</groupId>
<artifactId>okhttp-eventsource</artifactId>
<version>2.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.4.1</version>
</dependency>
</dependencies>
Next, we will add the Kafka related properties in application.properties
file
spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
For this article, we will serialize and deserialize the messages as Strings, hence we specifies StringSerializer
and StringDeserializer
respectively.
Next we will create a class that will contain the name of the topic which we will publish message to from the publisher , as a static field.
producer/KafkaTopic.java
public class KafkaTopic
{
public static final String WIKIMEDIA_STREAM_TOPIC="wikimedia_stream_topic";
}
Next, we will create a configuration class to create a Topic Bean.
producer/KafkaTopicConfig.java
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfig
{
@Bean
public NewTopic topic()
{
return TopicBuilder.name(KafkaTopic.WIKIMEDIA_STREAM_TOPIC)
.build();
}
}
Next, we will create an implementation of Event Handler
interface, to process the event stream and publish message to Kafka.
This class will have Topic name
and KafkaTemplate
as parameters. For this article, we will only provide implementation for onMessage(..)
overridden method.
producer/WikimediaChangesHandler.java
import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
public class WikimediaChangesHandler implements EventHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesHandler.class);
private KafkaTemplate<String,String> kafkaTemplate;
private String topic;
public WikimediaChangesHandler(KafkaTemplate<String, String> kafkaTemplate, String topic) {
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
@Override
public void onMessage(String s, MessageEvent messageEvent) throws Exception {
kafkaTemplate.send(topic, messageEvent.getData());
LOGGER.info(String.format("Event data published -> %s", messageEvent.getData()));
}
@Override
public void onOpen() throws Exception { }
@Override
public void onClosed() throws Exception { }
@Override
public void onComment(String s) throws Exception { }
@Override
public void onError(Throwable throwable) { }
}
Next, lets create a producer class that will connect to the events stream and start the publishing.
producer/WikimediaProducer.java
import java.net.URI;
import java.util.concurrent.TimeUnit;
@Service
public class WikimediaProducer
{
private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaProducer.class);
private KafkaTemplate<String, String> kafkaTemplate;
public WikimediaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void publishMessageToKafka() throws InterruptedException {
String topic = KafkaTopic.WIKIMEDIA_STREAM_TOPIC;
EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate,topic);
String url = "https://stream.wikimedia.org/v2/stream/recentchange";
EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));
EventSource eventSource = builder.build();
eventSource.start();
// Adding a delay of 5 minutes for simulation purpose.
TimeUnit.MINUTES.sleep(5);
}
}
To test our service, we will create an implementation of CommandLineRunner
, so that when the application starts, our publisher will start publishing.
producer/StreamSimulationRunner.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Component
public class StreamSimulationRunner implements CommandLineRunner {
@Autowired
private WikimediaProducer wikimediaProducer;
@Override
public void run(String... args) throws Exception
{
wikimediaProducer.publishMessageToKafka();
}
}
Set up the Consumer Microservice
Go to start.spring.io
For this article, we will create a maven project. Add the following dependencies
Spring for Apache Kafka
Lombok
Spring Data JPA
MySQL Driver
- Generate the project and open it in any IDE (like IntelliJ, Eclipse, VSCode, etc.).
The consumer microservice will have following dependencies.
consumer/pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Next, we will add the Kafka consumer, MySQL and JPA configurations in application.properties
file.
consumer/application.properties
# Kafka consumer configs
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: dbConsumerGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# MySQL configs
spring.datasource.url=jdbc:mysql://localhost:3306/wikimediadb
spring.datasource.username=<your username>
spring.datasource.password=<your password>
# JPA configs
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.show-sql=true
spring.jpa.properties.hibernate.use_sql_comments=true
spring.jpa.properties.hibernate.format_sql=true
Next we will create a class that will contain the name of the topic which we will publish message to from the publisher , as a static field.
consumer/KafkaTopic.java
public class KafkaTopic
{
public static final String WIKIMEDIA_STREAM_TOPIC="wikimedia_stream_topic";
}
Next, we will create an entity for our Wikimedia stream message.
consumer/entity/WikimediaModel
import lombok.Getter;
import lombok.Setter;
import javax.persistence.*;
@Entity
@Table(name = "wikimedia_recent_changes")
@Getter
@Setter
public class WikimediaModel
{
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Lob
private String wikimediaEventData;
}
Next will create a JPA repository for persisting data in database.
consumer/WikimediaRepository.java
import com.umang345.wikimediaconsumermicroservice.entity.WikimediaModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface WikimediaRepository extends JpaRepository<WikimediaModel,Long>
{ }
Next we will create a consumer class that will consume messages from Kafka, and usinf JPA repository, save them in the database.
consumer/KafkaWikiDBConsumer.java
import com.umang345.wikimediaconsumermicroservice.config.KafkaTopic;
import com.umang345.wikimediaconsumermicroservice.entity.WikimediaModel;
import com.umang345.wikimediaconsumermicroservice.repository.WikimediaRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaWikiDBConsumer
{
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaWikiDBConsumer.class);
private WikimediaRepository dataRepository;
public KafkaWikiDBConsumer(WikimediaRepository dataRepository) {
this.dataRepository = dataRepository;
}
@KafkaListener(topics= KafkaTopic.WIKIMEDIA_STREAM_TOPIC
,groupId = "dbConsumerGroup")
public void consume(String eventMessage)
{
LOGGER.info(String.format("Event message consumed -> %s", eventMessage));
WikimediaModel wikimediaModel = new WikimediaModel();
wikimediaModel.setWikimediaEventData(eventMessage);
dataRepository.save(wikimediaModel);
}
}
Test
Start the Kafka broker and then start both the services. You can verify in the console messages being published to Kafka and then being consumed by the consumer. Also you can verify them being stored in the database.
I hope you found the article useful.
Lets connect :
Happy Coding :) .