Tag: Rabbitmq microservices

Asynchronous communication between microservices – Part I

Here we want that the two microservices communicate with each other asynchronously. First service will make some changes in its own database, then passes some message to a queue in the message broker. The other service will be notified and reads the message and will make changes in its own database based on the passed message.  We have two microservices order-service and the other one is product-service.

The advantage of having asynchronous communication is even if the other service is down, the first service sends the message to the queue and when the other service is up and running again, it will read the message from the queue and then process the message.

Prerequisites:

  1. You must have installation of Rabbit MQ and MySQL database. To download RabbitMQ, visit: https://www.rabbitmq.com/download.html
  2. Go to the rabbitmq_server-3.8.3\sbin directory and double-click on rabbitmq-server.bat. You will see the output similar to as shown below:

3. Start MySQL and create databases ordersdb and productsdb.

mysql>create database ordersdb;
mysql>create database productsdb;

Create Order Microservice

Step 1: Open https://start.spring.io/ and create a project using the following configuration:

Click on Generate to download the project zip file.

Step 2: Open the project in any IDE like Eclipse or Spring Tool Suite. Check the OrderServiceApplication class:

package com.techiepitch.order;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class OrderServiceApplication {

	public static void main(String[] args) {
		SpringApplication.run(OrderServiceApplication.class, args);
	}

}

Step 3: Put the configuration for Rabbit MQ and MySQL database in the application.properties file:

server.port=9090
spring.datasource.url=jdbc:mysql://localhost:3306/shoppingdb
spring.datasource.username=root
spring.datasource.password=admin
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL5Dialect
spring.jpa.properties.hibernate.globally_quoted_identifiers=true
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username:guest
spring.rabbitmq.password:guest
order.queue.name=orderQueue
product.queue.name=productQueue
fanout.order.exchange=order-exchange
fanout.product.exchange=product-exchange

Step 4: Create the following packages:

Step 5: Create the class OrderServiceMessage in the package com.techiepitch.order.configuration.bean:

package com.techiepitch.order.configuration.bean;

public class OrderServiceMessage {
	
	private Integer orderId;	

	private Integer productId;
	
	private Integer quantity;

	public OrderServiceMessage() {
	}	

	public OrderServiceMessage(Integer orderId, Integer productId, Integer quantity) {
		this.orderId = orderId;
		this.productId = productId;
		this.quantity = quantity;
	}
	
	public Integer getOrderId() {
		return orderId;
	}
	
	public void setOrderId(Integer orderId) {
		this.orderId = orderId;
	}

	public Integer getProductId() {
		return productId;
	}

	public void setProductId(Integer productId) {
		this.productId = productId;
	}
	
	public Integer getQuantity() {
		return quantity;
	}

	public void setQuantity(Integer quantity) {
		this.quantity = quantity;
	}	

}

Step 6: Create the class ProductServiceMessage in the package com.techiepitch.order.configuration.bean:

package com.techiepitch.order.configuration.bean;

public class ProductServiceMessage {
	
	private Integer orderId;
	
	private boolean approved;
	
	private String message;
	
	private Integer availableQuantity;
	
	public ProductServiceMessage() {
	}

	public ProductServiceMessage(Integer orderId, boolean approved, String message, Integer availableQuantity) {
		this.orderId = orderId;
		this.approved = approved;
		this.message = message;
		this.availableQuantity = availableQuantity;
	}

	public Integer getOrderId() {
		return orderId;
	}

	public void setOrderId(Integer orderId) {
		this.orderId = orderId;
	}

	public boolean isApproved() {
		return approved;
	}

	public void setApproved(boolean approved) {
		this.approved = approved;
	}

	public String getMessage() {
		return message;
	}

	public void setMessage(String message) {
		this.message = message;
	}

	public Integer getAvailableQuantity() {
		return availableQuantity;
	}

	public void setAvailableQuantity(Integer availableQuantity) {
		this.availableQuantity = availableQuantity;
	}

}

Step 7: Create class QueueProducer in the package com.techiepitch.order.configuration:

package com.techiepitch.order.configuration;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import com.techiepitch.order.configuration.bean.OrderServiceMessage;
import com.fasterxml.jackson.databind.ObjectMapper;

@Component
public class QueueProducer {

	 @Value("${fanout.order.exchange}")
	 private String fanoutOrderExchange;

	 private final RabbitTemplate rabbitTemplate;

	 @Autowired
	 public QueueProducer(RabbitTemplate rabbitTemplate) {
		 super();
		 this.rabbitTemplate = rabbitTemplate;
	 }

	 public void produce(String msg) throws Exception {
		 rabbitTemplate.setExchange(fanoutOrderExchange);
		 rabbitTemplate.convertAndSend(msg);
	 }

	 public void produce(OrderServiceMessage message) throws Exception {
		 rabbitTemplate.setExchange(fanoutOrderExchange);
		 rabbitTemplate.convertAndSend(new ObjectMapper().writeValueAsString(message));
	 }	

}

Step 8: Create the class Order in the package com.techiepitch.order.entity:

package com.techiepitch.order.entity;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class Order {

	@Column(name = "order_id")
	@Id
	@GeneratedValue(strategy=GenerationType.AUTO)
	private Integer orderId;	
	
	@Column(name = "product_id")
	private Integer productId;
	
	private Integer quantity;
	
	private String status;

	public Integer getOrderId() {
		return orderId;
	}

	public void setOrderId(Integer orderId) {
		this.orderId = orderId;
	}

	public Integer getProductId() {
		return productId;
	}

	public void setProductId(Integer productId) {
		this.productId = productId;
	}

	public Integer getQuantity() {
		return quantity;
	}

	public void setQuantity(Integer quantity) {
		this.quantity = quantity;
	}

	public String getStatus() {
		return status;
	}

	public void setStatus(String status) {
		this.status = status;
	}
}

Step 9: Create the interface OrderRepository in the package com.techiepitch.order.repository:

package com.techiepitch.order.repository;

import org.springframework.data.jpa.repository.JpaRepository;
import com.techiepitch.order.entity.Order;

public interface OrderRepository extends JpaRepository<Order, Integer> {
}

Step 10: Create the class OrderController in the package com.techiepitch.order.controller:

package com.techiepitch.order.controller;

import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.techiepitch.order.configuration.QueueProducer;
import com.techiepitch.order.configuration.bean.OrderServiceMessage;
import com.techiepitch.order.configuration.bean.ProductServiceMessage;
import com.techiepitch.order.entity.Order;
import com.techiepitch.order.repository.OrderRepository;

@RestController
@RequestMapping("/orders")
@Component
public class OrderController {

	Logger logger = LoggerFactory.getLogger(OrderController.class);

	@Autowired
	private OrderRepository orderRepository;

	@Autowired
	private QueueProducer queueProducer;

	@PostMapping("/createorder")
	public Order createOrder(@RequestBody Order order) {

		Order createdOrder = orderRepository.save(order);

		Integer orderId = order.getOrderId();
		Integer productId = order.getProductId();
		Integer quantity = order.getQuantity();
		try {
			queueProducer.produce(new OrderServiceMessage(orderId, productId, quantity));
		} catch (Exception exception) {
			logger.error(exception.getMessage());
		}

		return createdOrder;
	}

	public void receiveMessage(String message) throws Exception {
		logger.debug("Received =>" + message);
		processMessage(message);
	}

	private void processMessage(String msg) throws Exception {

		ProductServiceMessage message = new ObjectMapper().readValue(msg, ProductServiceMessage.class);
		logger.debug("OrderId::" + message.getOrderId());

		if (message.getOrderId() != null) {
			Optional<Order> order = orderRepository.findById(message.getOrderId());
			if (order.isPresent()) {
				Order selectedOrder = order.get();
				if (message.isApproved()) {
					selectedOrder.setStatus("Order Approved");
				} else {
					selectedOrder.setStatus("Order Rejected");
				}
				orderRepository.save(selectedOrder);
			}
		}
	}
}

Step 11: Create the class RabbitMQConfiguration in the package com.techiepitch.order.configuration:

package com.techiepitch.order.configuration;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.techiepitch.order.controller.OrderController;

@Configuration
public class RabbitMQConfiguration {

	private static final String LISTENER_METHOD = "receiveMessage";

	@Value("${fanout.order.exchange}")
	private String fanoutOrderExchange;

	@Value("${order.queue.name}")
	private String orderQueueName;

	@Value("${fanout.product.exchange}")
	private String fanoutProductExchange;

	@Value("${product.queue.name}")
	private String productQueueName;

	@Bean
	Queue orderQueue() {
		return new Queue(orderQueueName, true);
	}

	@Bean
	Queue productQueue() {
		return new Queue(productQueueName, true);
	}

	@Bean
	FanoutExchange orderExchange() {
		return new FanoutExchange(fanoutOrderExchange);
	}

	@Bean
	FanoutExchange productExchange() {
		return new FanoutExchange(fanoutProductExchange);
	}

	@Bean
	Binding bindingOrder(Queue orderQueue, FanoutExchange orderExchange) {
		return BindingBuilder.bind(orderQueue).to(orderExchange);
	}

	@Bean
	Binding bindingProduct(Queue productQueue, FanoutExchange productExchange) {
		return BindingBuilder.bind(productQueue).to(productExchange);
	}

	@Bean
	SimpleMessageListenerContainer listenerContainer(ConnectionFactory connectionFactory,
			MessageListenerAdapter listenerAdapter) {
		SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
		listenerContainer.setConnectionFactory(connectionFactory);
		listenerContainer.setQueueNames(productQueueName);
		listenerContainer.setMessageListener(listenerAdapter);
		return listenerContainer;
	}

	@Bean
	MessageListenerAdapter listenerAdapter(OrderController consumer) {
		return new MessageListenerAdapter(consumer, LISTENER_METHOD);
	}

}