Messaging Queues in Microservices using RabbitMQ and Spring Boot
In this article we will discuss about basics of Messaging queues and how microservices using asynchronous messaging to communicate with each other. We will use Rabbit MQ for AMPQ (Advanced Messaging Queuing Protocol) and Spring Boot for Microservices.
In RabbitMQ , each queue is bound to a topic exchange via a unique routing key. Publisher publishes messages to the topic exchange and based on the routing key, the message is sent to the correct queue for the consumers to consume.
Prerequisites
1) Docker (for running RabbitMQ instance on the machine)
Run the RabbitMQ instance on docker
run -d --hostname rabbit-mq-instance --name rabbit-1 -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Open localhost:15672
in browser to see the RabbitMQ dashboard
Username : guest
Password : guest
Create the producer service
Generate the project
- Go to start.spring.io
Select Maven project and add following dependencies
- Spring Web - Lombok - Spring For RabbitMQ
- Generate Project and open it in any IDE
Create class QueueMessage.java . This will contain the structure of our message to be passed in the Queue
producer-service/QueueMessage.java
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class QueueMessage
{
private String queueMessageId;
private String queueMessage;
private Date queueMessageDate;
}
Create class MessageQueueConfig.java . This will contain the configuration for binding the queues to topic exchange.
Add the topic name, queue name and routing keys as constants.
producer-service/MessageQueueConfig.java
@Configuration
public class MessageQueueConfig
{
.
.
.
public static final String QUEUE_NAME = "rbtmq_message_queue";
public static final String EXCHANGE_NAME = "rbtmq_message_exchange";
public static final String ROUTING_KEY = "rbtmq_routing_key";
.
.
}
Note : Ideally these values should be read from a local properties file or from a centralized configuration server to share them among all services.
Add Beans for Queue and TopicExchange
producer-service/MessageQueueConfig.java
@Configuration
public class MessageQueueConfig
{
.
.
.
@Bean
public Queue queue()
{
return new Queue(QUEUE_NAME);
}
@Bean
public TopicExchange topicExchange()
{
return new TopicExchange(EXCHANGE_NAME);
}
.
.
}
Add a bean for Binding Queue with the topic exchange
producer-service/MessageQueueConfig.java
@Configuration
public class MessageQueueConfig
{
.
.
.
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange)
{
return BindingBuilder
.bind(queue)
.to(topicExchange)
.with(ROUTING_KEY);
}
.
.
}
Finally, add Beans for AmpqTemplate and MessageConverter
producer-service/MessageQueueConfig.java
@Configuration
public class MessageQueueConfig
{
.
.
.
@Bean
public MessageConverter messageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate template(ConnectionFactory connectionFactory)
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
.
.
}
The complete code for MessageQueueConfig.java will look like
producer-service/MessageQueueConfig.java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageQueueConfig
{
public static final String QUEUE_NAME = "rbtmq_message_queue";
public static final String EXCHANGE_NAME = "rbtmq_message_exchange";
public static final String ROUTING_KEY = "rbtmq_routing_key";
@Bean
public Queue queue()
{
return new Queue(QUEUE_NAME);
}
@Bean
public TopicExchange topicExchange()
{
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange)
{
return BindingBuilder
.bind(queue)
.to(topicExchange)
.with(ROUTING_KEY);
}
@Bean
public MessageConverter messageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate template(ConnectionFactory connectionFactory)
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
}
Add the port and RabbitMQ address configs in application.properties file
producer-service/application.properties
server.port=9000
spring.rabbitmq.addresses = localhost:5672
Create a Rest Controller and add an endpoint to publish message to the queue. Create the QueueMessagePublisher.java class.
producer-service/QueueMessagePublisher.java
@RestController
@RequestMapping("/queue")
public class QueueMessagePublisher
{
@PostMapping("/publish")
public ResponseEntity<String> publishMessageToQueue(@RequestBody QueueMessage queueMessage)
{
.
.
.
.
}
}
Set the message Id and Message Date in the QueueMessage Object.
producer-service/QueueMessagePublisher.java
@RestController
@RequestMapping("/queue")
public class QueueMessagePublisher
{
@PostMapping("/publish")
public ResponseEntity<String> publishMessageToQueue(@RequestBody QueueMessage queueMessage)
{
queueMessage.setQueueMessageId(UUID.randomUUID().toString());
queueMessage.setQueueMessageDate(new Date());
.
.
}
}
Add RabbitTemplate Object as Dependency in the Publisher class. Using this object, convert and send the message to the exchange with the routing key. Finally, the QueueMessagePublisher.java class will look like this.
producer-service/QueueMessagePublisher.java
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.UUID;
@RestController
@RequestMapping("/queue")
public class QueueMessagePublisher
{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/publish")
public ResponseEntity<String> publishMessageToQueue(@RequestBody QueueMessage queueMessage)
{
queueMessage.setQueueMessageId(UUID.randomUUID().toString());
queueMessage.setQueueMessageDate(new Date());
rabbitTemplate.convertAndSend(
MessageQueueConfig.EXCHANGE_NAME,
MessageQueueConfig.ROUTING_KEY,
queueMessage
);
return new ResponseEntity("Message Published", HttpStatus.OK);
}
}
Next,
Create the Consumer Service
Generate the project
- Go to start.spring.io
Select Maven project and add following dependencies
- Spring Web - Lombok - Spring For RabbitMQ
- Generate Project and open it in any IDE
Copy the MessageQueueConfig.java and QueueMessage.java classes in the consumer service
consumer-service/QueueMessage.java
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.Date;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class QueueMessage
{
private String queueMessageId;
private String queueMessage;
private Date queueMessageDate;
}
consumer-service/MessageQueueConfig.java
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageQueueConfig
{
public static final String QUEUE_NAME = "rbtmq_message_queue";
public static final String EXCHANGE_NAME = "rbtmq_message_exchange";
public static final String ROUTING_KEY = "rbtmq_routing_key";
@Bean
public Queue queue()
{
return new Queue(QUEUE_NAME);
}
@Bean
public TopicExchange topicExchange()
{
return new TopicExchange(EXCHANGE_NAME);
}
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange)
{
return BindingBuilder
.bind(queue)
.to(topicExchange)
.with(ROUTING_KEY);
}
@Bean
public MessageConverter messageConverter()
{
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate template(ConnectionFactory connectionFactory)
{
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(messageConverter());
return rabbitTemplate;
}
}
Add the port and address configs in application.properties file
consumer-service/application.properties
server.port=9001
spring.rabbitmq.addresses = localhost:5672
Create a Consumer class to consume messages from the Queue. We use the annotation RabbitListner
to annotate the method that will consume the messages.
consumer-service/QueueMessageConsumer.java
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class QueueMessageConsumer
{
@RabbitListener(queues = MessageQueueConfig.QUEUE_NAME)
public void listener(QueueMessage queueMessage)
{
System.out.println(queueMessage);
}
}
Now lets simulate the system. After setting up the RabbitMQ instance in docker, now run the producer and consumer service. Open any API testing application, like Postman.
Hit the Endpoint a couple of times
POST localhost:9000/queue/publish
{
"queueMessageId" : "",
"queueMessage" : "Test Message",
"queueMessageDate": ""
}
Check the RabbitMQ dashboard and the console of consumer service to see the messages being published to the queue and consumed by the consumer service.
This article is just a basic simulation of asynchronous messaging between microservices, focussing on the implementation and may not contain the standard practices for the sake of simplicity for beginners.
I hope you found the article useful.
Lets connect :
Happy Coding :) .