Messaging Queues in Microservices using RabbitMQ and Spring Boot

In this article we will discuss about basics of Messaging queues and how microservices using asynchronous messaging to communicate with each other. We will use Rabbit MQ for AMPQ (Advanced Messaging Queuing Protocol) and Spring Boot for Microservices.

In RabbitMQ , each queue is bound to a topic exchange via a unique routing key. Publisher publishes messages to the topic exchange and based on the routing key, the message is sent to the correct queue for the consumers to consume.

RabbitMQ DIG.png

Prerequisites

1) Docker (for running RabbitMQ instance on the machine)

Run the RabbitMQ instance on docker

run -d --hostname rabbit-mq-instance --name rabbit-1 -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Open localhost:15672 in browser to see the RabbitMQ dashboard

Username : guest
Password : guest

Create the producer service

Generate the project

  1. Go to start.spring.io
  2. Select Maven project and add following dependencies

     - Spring Web
     - Lombok
     - Spring For RabbitMQ
    
  3. Generate Project and open it in any IDE

Create class QueueMessage.java . This will contain the structure of our message to be passed in the Queue

producer-service/QueueMessage.java

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.Date;

@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class QueueMessage
{
    private String queueMessageId;
    private String queueMessage;
    private Date queueMessageDate;
}

Create class MessageQueueConfig.java . This will contain the configuration for binding the queues to topic exchange.

Add the topic name, queue name and routing keys as constants.

producer-service/MessageQueueConfig.java

@Configuration
public class MessageQueueConfig
{
 .
 .
 .
    public static final String QUEUE_NAME = "rbtmq_message_queue";
    public static final String EXCHANGE_NAME = "rbtmq_message_exchange";
    public static final String ROUTING_KEY = "rbtmq_routing_key";
 .
 .
}

Note : Ideally these values should be read from a local properties file or from a centralized configuration server to share them among all services.

Add Beans for Queue and TopicExchange

producer-service/MessageQueueConfig.java

@Configuration
public class MessageQueueConfig
{
 .
 .
 .
    @Bean
    public Queue queue()
    {
         return new Queue(QUEUE_NAME);
    }

    @Bean
    public TopicExchange topicExchange()
    {
         return new TopicExchange(EXCHANGE_NAME);
    }
 .
 .
}

Add a bean for Binding Queue with the topic exchange

producer-service/MessageQueueConfig.java

@Configuration
public class MessageQueueConfig
{
 .
 .
 .
    @Bean
    public Binding binding(Queue queue, TopicExchange topicExchange)
    {
         return BindingBuilder
                 .bind(queue)
                 .to(topicExchange)
                 .with(ROUTING_KEY);
    }
 .
 .
}

Finally, add Beans for AmpqTemplate and MessageConverter

producer-service/MessageQueueConfig.java

@Configuration
public class MessageQueueConfig
{
 .
 .
 .
    @Bean
    public MessageConverter messageConverter()
    {
         return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate template(ConnectionFactory connectionFactory)
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }
 .
 .
}

The complete code for MessageQueueConfig.java will look like

producer-service/MessageQueueConfig.java

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageQueueConfig
{
    public static final String QUEUE_NAME = "rbtmq_message_queue";
    public static final String EXCHANGE_NAME = "rbtmq_message_exchange";
    public static final String ROUTING_KEY = "rbtmq_routing_key";


    @Bean
    public Queue queue()
    {
         return new Queue(QUEUE_NAME);
    }

    @Bean
    public TopicExchange topicExchange()
    {
         return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange topicExchange)
    {
         return BindingBuilder
                 .bind(queue)
                 .to(topicExchange)
                 .with(ROUTING_KEY);
    }

    @Bean
    public MessageConverter messageConverter()
    {
         return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate template(ConnectionFactory connectionFactory)
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

}

Add the port and RabbitMQ address configs in application.properties file

producer-service/application.properties

server.port=9000
spring.rabbitmq.addresses = localhost:5672

Create a Rest Controller and add an endpoint to publish message to the queue. Create the QueueMessagePublisher.java class.

producer-service/QueueMessagePublisher.java

@RestController
@RequestMapping("/queue")
public class QueueMessagePublisher
{

    @PostMapping("/publish")
    public ResponseEntity<String> publishMessageToQueue(@RequestBody QueueMessage queueMessage)
    {
        .
        .
        .
        .
    }
}

Set the message Id and Message Date in the QueueMessage Object.

producer-service/QueueMessagePublisher.java

@RestController
@RequestMapping("/queue")
public class QueueMessagePublisher
{

    @PostMapping("/publish")
    public ResponseEntity<String> publishMessageToQueue(@RequestBody QueueMessage queueMessage)
    {
        queueMessage.setQueueMessageId(UUID.randomUUID().toString());
        queueMessage.setQueueMessageDate(new Date());
        .
        .
    }
}

Add RabbitTemplate Object as Dependency in the Publisher class. Using this object, convert and send the message to the exchange with the routing key. Finally, the QueueMessagePublisher.java class will look like this.

producer-service/QueueMessagePublisher.java

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Date;
import java.util.UUID;

@RestController
@RequestMapping("/queue")
public class QueueMessagePublisher
{
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostMapping("/publish")
    public ResponseEntity<String> publishMessageToQueue(@RequestBody QueueMessage queueMessage)
    {
        queueMessage.setQueueMessageId(UUID.randomUUID().toString());
        queueMessage.setQueueMessageDate(new Date());
        rabbitTemplate.convertAndSend(
                MessageQueueConfig.EXCHANGE_NAME,
                MessageQueueConfig.ROUTING_KEY,
                queueMessage
        );

        return new ResponseEntity("Message Published", HttpStatus.OK);
    }


}

Next,

Create the Consumer Service

Generate the project

  1. Go to start.spring.io
  2. Select Maven project and add following dependencies

     - Spring Web
     - Lombok
     - Spring For RabbitMQ
    
  3. Generate Project and open it in any IDE

Copy the MessageQueueConfig.java and QueueMessage.java classes in the consumer service

consumer-service/QueueMessage.java

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.util.Date;

@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class QueueMessage
{
    private String queueMessageId;
    private String queueMessage;
    private Date queueMessageDate;
}

consumer-service/MessageQueueConfig.java

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageQueueConfig
{
    public static final String QUEUE_NAME = "rbtmq_message_queue";
    public static final String EXCHANGE_NAME = "rbtmq_message_exchange";
    public static final String ROUTING_KEY = "rbtmq_routing_key";


    @Bean
    public Queue queue()
    {
         return new Queue(QUEUE_NAME);
    }

    @Bean
    public TopicExchange topicExchange()
    {
         return new TopicExchange(EXCHANGE_NAME);
    }

    @Bean
    public Binding binding(Queue queue, TopicExchange topicExchange)
    {
         return BindingBuilder
                 .bind(queue)
                 .to(topicExchange)
                 .with(ROUTING_KEY);
    }

    @Bean
    public MessageConverter messageConverter()
    {
         return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate template(ConnectionFactory connectionFactory)
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(messageConverter());
        return rabbitTemplate;
    }

}

Add the port and address configs in application.properties file

consumer-service/application.properties

server.port=9001
spring.rabbitmq.addresses = localhost:5672

Create a Consumer class to consume messages from the Queue. We use the annotation RabbitListner to annotate the method that will consume the messages.

consumer-service/QueueMessageConsumer.java

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class QueueMessageConsumer
{
    @RabbitListener(queues = MessageQueueConfig.QUEUE_NAME)
    public void listener(QueueMessage queueMessage)
    {
        System.out.println(queueMessage);
    }
}

Now lets simulate the system. After setting up the RabbitMQ instance in docker, now run the producer and consumer service. Open any API testing application, like Postman.

Hit the Endpoint a couple of times

POST localhost:9000/queue/publish

{
    "queueMessageId" : "",
    "queueMessage" : "Test Message",
    "queueMessageDate": ""
}

Check the RabbitMQ dashboard and the console of consumer service to see the messages being published to the queue and consumed by the consumer service.

This article is just a basic simulation of asynchronous messaging between microservices, focussing on the implementation and may not contain the standard practices for the sake of simplicity for beginners.

I hope you found the article useful.

Lets connect :

Happy Coding :) .