Roadmap to RabbitMQ 3.1.1 Federation Plugin with Java Examples

Post date: Jun 15, 2013 9:38:32 AM

When using RabbitMQ across distinct data centers separated by a WAN, clustering solutions are not recommended because they are sensitive to network problems. In this case, either the Federation Plugin or the Shovel Plugin come as reliable solutions. Since the Shovel Plugin allows a fine grained control that in most situations is not needed, it comes that the Federation Plugin has more widespread usage with regards to this usage scenario and this plugin constitutes the focus of this article.

This entry presents the steps to be performed in order to configure, run and use the RabbitMQ federation plugin. It also explains how to check that federation works

indeed and what needs to be present on each node before federation takes place. The assumption is that the federation plugin is run on the local machine with two RabbitMQ brokers that are federating to each other.

1) You can start from the following GitHub project: https://github.com/jamescarr/rabbitmq-federation-example, download the files on your local machine in a directory of your choice. I performed the following changes to the files provided for both nodes:

rabbitmq.config - configured port for RabbitMQ Management Plugin:

{rabbitmq_management,

[{listener, [{port, 15672},

{ip, "127.0.0.1"}

]}

]},

on node1 and for node2 I configured port 15673 for this plugin explicitly.

For both nodes the enable_plugins file needs to be updated to consider the "rabbitmq_federation_management" plugin too, since this one will be useful in order to analyse federation status, federation upstreams and defined policies. Note that the AMQP ports on which the aforementioned nodes run are:

node 1 with node name "us" on AMQP port 25672

node 2 with node name "europe1" on AMQP port 35672

Start these nodes as mentioned on the GitHub link for this sample:

sudo ./start-node1.sh

sudo ./start-node2.sh

These commands will start two RabbitMQ brokers on the local machine. Ensure that they run successfully, e.g. ending with the message "Starting broker... completed with 9 plugins."

2) We will consider that the "us" node (node 1) acts as downstream for "europe1" (node 2). This means that "europe1" contains the upstream exchanges for the "us" node.

In order to configure this, go to http://localhost:15672/ which is the Management Plugin for RabbitMQ. Login with guest/guest. Go to Admin-> Policies and define a new federation policy with the following data:

Name: "federate-user-defined"

Pattern: "^user\."

Definition: federation-upstream-set: all

This means that we federate all exchanges starting with "user." from all brokers to this particular broker (AMQP port is 25672).

Go to Admin -> Federation Upstreams and add a new upstream with the following data:

Name: europe1

URI: amqp://localhost:35672

Expire: 3600000 (ms)

Alternatively you can define the policy and the federation upstreams using the following command line:

sudo rabbitmqctl -n us set_parameter federation local-nodename '"us"'

sudo rabbitmqctl -n us set_parameter federation-upstream europe1 '{"uri":"amqp://localhost:35672"}'

sudo rabbitmqctl -n us set_policy federate-me "^user\." '{"federation-upstream-set":"all"}

3) Consider sending messages to node2(upstream node) to the "user.direct" exchange using java and a direct exchange called "user.direct" that binds to a dynamically built queue as follows (slightly modified java samples from the RabbitMQ Tutorial):

package routing;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

public class EmitLogDirect {

//direct exchange means that routing is performed based on the binding key that exists between

//the exchange and the queue, so in this case routing_key=binding_key

public static final String EXCHANGE_NAME = "user.direct";

public static void main(String[] args) throws IOException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(25672);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

String severity = getSeverity(args);

String message = getMessage(args);

String routingKey = severity;

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

channel.close();

connection.close();

}

private static String getMessage(String[] args) {

if(args.length < 1) {

return "Hello World!";

}

return joinStrings(args, " ");

}

private static String joinStrings(String[] args, String delimiter) {

int length = args.length;

if(length == 0) {

return "";

}

StringBuilder words = new StringBuilder(args[0]);

for(int i = 1; i < args.length - 1; i++){

words.append(delimiter).append(args[i]);

}

return words.toString();

}

private static String getSeverity(String[] args) {

if(args.length > 1) {

return args[args.length - 1];

}

return "not_applicable";

}

}

Output is:

[x] Sent 'warning':'hello'

Run it three times.

After running this program the "user.direct" exchange and its dynamic queue binding were created on node2 (europe1) and don't need to be created on the other node (they will be automatically propagated there via federation).

4) Go to the management GUI of node 1 (us) - http://localhost:15672/, select Admin -> Federation Status and there you need to see the list of federated connections in status running (green), such as:

europe1 amqp://localhost:35672 user.direct

or use alternatively the following command line:

sudo rabbitmqctl -n us eval 'rabbit_federation_status:status().'

and the response should contain among others:

[{exchange,<<"user.direct">>},

{vhost,<<"/">>},

{connection,<<"europe1">>},

{uri,<<"amqp://localhost:35672">>},

{upstream_exchange,<<"user.direct">>},

{status,{running,<<"<us@host012.2.18153.0>">>}},

{timestamp,{{2013,6,11},{13,23,23}}}]

which means that federation has been setup correctly.

5) We receive the messages just sent to the aforementioned broker(node2 - europe1) by coding the receive logic in another java file which receives it from the downstream node (node1 - us) - being connected to it. This means that federation worked: all messages sent were federated to the other broker.

package routing;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;

public class ReceiveLogsDirect {

public static final String EXCHANGE_NAME = "user.direct";

public static void main(String[] args) throws IOException, InterruptedException{

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(35672);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

//anonymous, automatically generated queue name non-persistent and auto-removable

String queueName = channel.queueDeclare().getQueue();

if (args.length < 1){

System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");

System.exit(1);

}

//creating binding keys for every severity

for(String severity : args){

channel.queueBind(queueName, EXCHANGE_NAME, severity);

}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(queueName, true, consumer);

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

String routingKey = delivery.getEnvelope().getRoutingKey();

System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");

}

}

}

This program receives all messages sent using the EmitLogDirect.java file connected to the other RabbitMQ broker instance.

[*] Waiting for messages. To exit press CTRL+C

[x] Received 'warning':'hello'

[x] Received 'warning':'hello'

[x] Received 'warning':'hello'

will display the same number of messages that were sent using the other java program.

6) Now we can consider the bidirectional federation so that node 2 (europe1) receives the messages from node 1(us) from the upstream exchanges.

We will consider that the "europe1" node (node 2) acts as downstream for "us" (node 1). This means that "europe1" contains the upstream exchanges for the "us" node.

In order to configure this, go to http://localhost:15673/ which is the Management Plugin for RabbitMQ. Login with guest/guest. Go to Admin-> Policies and define a new federation policy with the following data:

Name: "federate-user-defined"

Pattern: "^user\."

Definition: federation-upstream-set: all

This means that we federate all exchanges starting with "user." from all brokers to this particular broker (AMQP port is 35672).

Go to Admin -> Federation Upstreams and add a new upstream with the following data:

Name: us

URI: amqp://localhost:25672

Expire: 3600000 (ms)

Alternatively you can define the policy and the federation upstreams using the following command line:

sudo rabbitmqctl -n europe1 set_parameter federation local-nodename '"europe1"'

sudo rabbitmqctl -n europe1 set_parameter federation-upstream us '{"uri":"amqp://localhost:25672"}'

sudo rabbitmqctl -n europe1 set_policy federate-me "^user\." '{"federation-upstream-set":"all"}

In order to check that federation in the other direction works analogous commands as presented above should be run (for the other node). The sending and receiving java programs can be changed in order to reflect the correct ports (inverse ports on sending/receiving sides).