Show List

Direct Exchange Demo

RabbitMQ direct exchange routes the message to the queues with key exactly matching the routing key of the message. For this demo, we are going to create an exchange and then bind four different message queues using the routing keys.

Create Direct Exchange

From the RabbitMQ management console, create a direct exchange.

Create Queues

Now create queues "Red", "Green", "Yellow", "Blue"

Binding Queues to Exchange

Now go to exchange tab and click on "direct-exchange-demo" exchange and bind the four queues providing routing keys.

Creating Publisher

We are going to use maven projects for publisher and consumer. Rabbitmq is the dependency that has been added to the poms.

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>RabbitMQDemo</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>QPublisher</artifactId>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
</dependencies>
</project>
Publisher.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Publisher {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String message = "Hello. Message to queue";
channel.basicPublish("direct-exchange-demo","r-1",null, message.getBytes());
System.out.println("Message sent to the r-1");

channel.basicPublish("direct-exchange-demo","g-1",null, message.getBytes());
System.out.println("Message sent to the g-1");

channel.close();
}
}
When the application is run, messages can be seen on the Red and Green queues.

Creating Consumers

All the consumers have rabbitmq dependency in their pom.xml.

pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>RabbitMQDemo</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>QPublisher</module>
<module>QConsumer</module>
<module>DirectExchangePublisher</module>
<module>DirectExchangeGreenQConsumer</module>
<module>DirectExchangeYellowQConsumer</module>
<module>DirectExchangeRedQConsumer</module>
</modules>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
</dependencies>
</project>
BlueQConsumer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class BlueQConsumer {

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String (delivery.getBody());
System.out.println("Message received by Blue consumer = " + message);
};

channel.basicConsume("Blue", true, deliverCallback, consumerTag -> {} );


}
}
GreenQConsumer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class GreenQConsumer {

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String (delivery.getBody());
System.out.println("Message received by Green consumer = " + message);
};

channel.basicConsume("Green", true, deliverCallback, consumerTag -> {} );


}
}
YellowQConsumer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class YellowQConsumer {

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String (delivery.getBody());
System.out.println("Message received by Yellow consumer = " + message);
};

channel.basicConsume("Yellow", true, deliverCallback, consumerTag -> {} );


}
}
RedQConsumer.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class RedQConsumer {

public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String (delivery.getBody());
System.out.println("Message received by Red consumer = " + message);
};

channel.basicConsume("Red", true, deliverCallback, consumerTag -> {} );


}
}
When the consumer applications are run, we can see that the messages are consumed only from Red and Green queues to which the messages were sent using routing key.

Source code:
https://github.com/it-code-lab/RabbitMQDemo

    Leave a Comment


  • captcha text