Roadmap nach RabbitMQ 3.1.1 Federation Plugin mit Java Beispielen

Post date: Jun 15, 2013 9:41:22 AM

Wenn man RabbitMQ in zwei verschiedenen Datenzentren die durch WAN getrennt sind, einsetzen will sind Clustering-Lösungen nicht empfehlenswert weil sie mit Netzwerkproblemen nicht optimal umgehen können. In diesem Fall können entweder das Federation Plugin oder das Shovel Plugin als zuverlässige Ansätze verwendet werden. Weil das Shovel Plugin detailgenaue Kontrolle anbietet die in den meisten Fällen nicht nötig ist, kann man davon ableiten dass das Federation Plugin eine verbreitete Verwendung in diesen Szenarien hat. Dieser Eintrag fokussiert auf diesem Thema.

Der Artikel präsentiert die Schritte die notwendig sind um das RabbitMQ Plugin zu konfigurieren, zu verwenden und zum Laufen zu bringen. Er erklärt auch wie man überprüfen kann ob Federation tatsächlich funktioniert und präsentiert auch die Strukturen die auf jedem Knoten vorhanden werden müssen bevor Federation stattfindet.Die Annahme ist, dass das Federation Plugin aud einer lokalen Maschine mit 2 RabbitMQ Brokers läuft und dass Federation zwischen diesen Brokers stattfindet.

1) Du kannst mit dem folgenden GitHub Projekt starten: https://github.com/jamescarr/rabbitmq-federation-example, die Dateien auf deine lokale Maschine in einem beliebigen Verzeichnis herunterladen. Ich habe folgende Änderungen zu den Dateien auf beiden Knoten durchgeführt:

rabbitmq.config - den Port für das RabbitMQ Management Plugin konfiguriert:

{rabbitmq_management,

[{listener, [{port, 15672},

{ip, "127.0.0.1"}

]}

]},

Auf node1 und auf node2 habe ich den Port 15673 für dieses Plugin explizit eingestellt.

Für beide Knoten ist es notwendig die enable_plugins Datei zu aktualisieren und das "rabbitmq_federation_management" Plugin einzubeziehen weil es nützlich sein wird

um den Federation-Zustand, die Federation Upstreams und die Federation-Policy zu überprüfen. Es lohnt sich zu merken, dass die AMQP Ports die für die obengennanten Knoten gültig sind, sind folgende:

node 1 mit dem Namen "us" auf dem AMQP Port 25672

node 2 mit dem Namen "europe1" auf dem AMQP Port 35672

Starte diese Knoten wie auf dem Github-Link erklärt ist:

sudo ./start-node1.sh

sudo ./start-node2.sh

Diese Kommandozeilen werden zwei RabbitMQ Brokers auf der lokalen Maschine starten. Es ist emfehlenswert jetzt sicherzustellen, dass sie erfolgreich laufen, z.B. zu überprüfen dass die Nachricht in der Konsole mit "Starting broker... completed with 9 plugins." beendet wird.

2) Wir werden annehmen dass der "us" Knoten (node 1) als Downstream für den zweiten Knoten "europe1" (node 2) agiert. Das bedeutet dass der "europe1" Knoten die Upstream Exchanges für den "us"-Knoten beinhaltet. Um es zu konfigurieren, ruf den http://localhost:15672/ Link für das Management Plugin für RabbitMQ auf und gebe die guest/guest Credentials ein um Dich einzuloggen. Rufe dann Admin->Policies auf und definiere die neue Federation-Policy mit folgenden Daten:

Name: "federate-user-defined"

Pattern: "^user\."

Definition: federation-upstream-set: all

Das bedeutet dass wir alle Exchanges von allen RabbitMQ Brokern deren Name mit dem "user." Muster starten nach diesem einzelnen Knoten föderieren (der AMQP Port ist 25672).

Rufe den Admin -> Federation Upstreams Link auf und füge einen neuen Upstream mit folgenden Daten hinzu:

Name: europe1

URI: amqp://localhost:35672

Expire: 3600000 (ms)

Alternativ kann man eine Policy und die dazu-gehörenden Federation Upstreams mit folgender Kommandozeile definieren:

sudo rabbitmqctl -n us set_parameter federation local-nodename '"us"'

sudo rabbitmqctl -n us set_parameter federation-upstream europe1 '{"uri":"amqp://localhost:35672"}'

sudo rabbitmqctl -n us set_policy federate-me "^user\." '{"federation-upstream-set":"all"}

3) Wir gehen von der Annahme aus, dass wir Nachrichten an den zweiten Knoten (node2) - Upstream node durch die "user.direct" Exchange mit Hilfe einem Java-Client senden. Die Exchange die verwendet wird ist eine direkte Exchange die mit einem dynamischen Binding zu einer Queue verbunden ist (gering-geänderte Beispiele aus dem RabbitMQ Java Tutorial).

package routing;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

public class EmitLogDirect {

//direct exchange means that routing is performed based on the binding key that exists between

//the exchange and the queue, so in this case routing_key=binding_key

public static final String EXCHANGE_NAME = "user.direct";

public static void main(String[] args) throws IOException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(25672);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

String severity = getSeverity(args);

String message = getMessage(args);

String routingKey = severity;

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());

System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

channel.close();

connection.close();

}

private static String getMessage(String[] args) {

if(args.length < 1) {

return "Hello World!";

}

return joinStrings(args, " ");

}

private static String joinStrings(String[] args, String delimiter) {

int length = args.length;

if(length == 0) {

return "";

}

StringBuilder words = new StringBuilder(args[0]);

for(int i = 1; i < args.length - 1; i++){

words.append(delimiter).append(args[i]);

}

return words.toString();

}

private static String getSeverity(String[] args) {

if(args.length > 1) {

return args[args.length - 1];

}

return "not_applicable";

}

}

Ausgabe folgt:

[x] Sent 'warning':'hello'

Führe das Programm dreimal aus.

Nach dem Ausführen dieses Programms wird die "user.direct" Exchange und deren dynamisches Queue-Binding auf dem node2 (europe1) automatisch durch Federation erstellt so ist es nicht notwendig diese Artefakte auf dieser Broker-Instanz zu definieren.

4) Rufe die Management GUI von node 1 (us) - http://localhost:15672/ auf, wähle Admin -> Federation Status aus und dort wirst Du eine Liste der Federated Connections und deren Zustand (sollte grün sein) sehen:

europe1 amqp://localhost:35672 user.direct

oder kannst Du alternativ folgende Kommandozeile verwenden:

sudo rabbitmqctl -n us eval 'rabbit_federation_status:status().'

und die Antwort sollte, unter Anderen, folgenden Eintrag enthalten:

[{exchange,<<"user.direct">>},

{vhost,<<"/">>},

{connection,<<"europe1">>},

{uri,<<"amqp://localhost:35672">>},

{upstream_exchange,<<"user.direct">>},

{status,{running,<<"<us@host012.2.18153.0>">>}},

{timestamp,{{2013,6,11},{13,23,23}}}]

was bedeutet, dass die Federation erfolgreich funktioniert.

5) Wir erhalten die Nachrichten die an den obengennanten Broker gesendet wurden (node2 - europe1) mit diesem Java-File der dass Coding der Receive-Logik enthält. Die Nachrichten werden von dem Downstream Knoten (node1 - us) gekriegt. Das bedeutet dass alle Messages die an diesen anderen federated Broker gesendet wurden wurden auf diesem Knoten erfolgreich propagiert.

package routing;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

import java.io.IOException;

public class ReceiveLogsDirect {

public static final String EXCHANGE_NAME = "user.direct";

public static void main(String[] args) throws IOException, InterruptedException{

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(35672);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

//anonymous, automatically generated queue name non-persistent and auto-removable

String queueName = channel.queueDeclare().getQueue();

if (args.length < 1){

System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");

System.exit(1);

}

//creating binding keys for every severity

for(String severity : args){

channel.queueBind(queueName, EXCHANGE_NAME, severity);

}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(queueName, true, consumer);

while (true) {

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

String routingKey = delivery.getEnvelope().getRoutingKey();

System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");

}

}

}

Das Programm erhält alle Nachrichten die von dem EmitLogDirect.java File (zu der anderen RabbitMQ Broker-Instanz) gesendet wurden.

[*] Waiting for messages. To exit press CTRL+C

[x] Received 'warning':'hello'

[x] Received 'warning':'hello'

[x] Received 'warning':'hello'

Die Ausgabe wird dieselbe Anzahl der Messages die durch das andere Java-Programm gesendet wurden, anzeigen.

6) Jetzt können wir die bidirektionale Federation betrachten so dass node 2(europe1) bekommt die Nachrichten von node 1(us) durch die Upstream Exchanges.

Wir werden annehmen dass der "europe1" Knoten (node 2) als Downstream für den ersten Knoten "us" (node 1) agiert. Das bedeutet dass der "us" Knoten die Upstream Exchanges für den "europe1"-Knoten beinhaltet. Um es zu konfigurieren, ruf den http://localhost:15673/ Link für das Management Plugin für RabbitMQ auf und gebe die guest/guest Credentials ein um Dich einzuloggen. Rufe dann Admin->Policies auf und definiere die neue Federation-Policy mit folgenden Daten:

Name: "federate-user-defined"

Pattern: "^user\."

Definition: federation-upstream-set: all

Das bedeutet dass wir alle Exchanges von allen RabbitMQ Brokern deren Name mit dem "user." Muster starten nach diesem einzelnen Knoten föderieren (der AMQP Port ist 25672).

Rufe den Admin -> Federation Upstreams Link auf und füge einen neuen Upstream mit folgenden Daten hinzu:

Name: us

URI: amqp://localhost:25672

Expire: 3600000 (ms)

Alternativ kann man eine Policy und die dazu-gehörenden Federation Upstreams mit folgender Kommandozeile definieren:

sudo rabbitmqctl -n europe1 set_parameter federation local-nodename '"europe1"'

sudo rabbitmqctl -n europe1 set_parameter federation-upstream us '{"uri":"amqp://localhost:25672"}'

sudo rabbitmqctl -n europe1 set_policy federate-me "^user\." '{"federation-upstream-set":"all"}

Um zu überprüfen dass die Federation in die andere Richtung tatsächlich funktioniert werden entsprechende Kommandozeilen wie oben präsentiert ausgeführt (für den anderen Knoten). Die Java Programme für das Senden und das Erhalten von Nachrichten können geändert werden um die korrekten Ports zu widerspiegeln (umgekehrte Ports für die Senden- und Erhalten-Seiten).