Message Queue

プロセス間或いはスレッド間に非同期通信を行う

種類

・Java Messaging Service (JMS)

・Advanced Message Queueing Protocol (AMQP)

・Message Queueing Telemetry Transport (MQTT)

Java Platform

規約

JMS

実装

ActiveMQ

RabbitMQ

DEMO

★メッセージキューのモデル

★MDB(Message Driven Bean)

※MDB != JMS

MDBはEJBの一種、状態を持たないStateless Beanとして管理される

MDBはWebプロファイルに含まれないため、warではなくjarかearにデプロイする

@MessageDriven(mappedName="jms/Topic"

activationConfig = {

@ActivationConfigProperty(propertyName = "destination",

propertyValue = "java:/queues/xxxQueue"),

@ActivationConfigProperty(propertyName = "acknowledgeMode",

propertyValue = "Auto-acknowledge"),

@ActivationConfigProperty(propertyName = "messageSelector",

propertyValue = "orderAmount > 1000"),

@ActivationConfigProperty(propertyName = "destinationType",

propertyValue = "javax.jms.Queue")

})

public class MessageBean implements MessageListener {

@EJB // EJBコンテナによりマルチスレッドで処理

private Messages messages;

@Inject

private ObjectMapper objectMapper;

@Resource

private MessageDrivenContext ctx;

@Override

public void onMessage(Message message) {

try {

if (message instanceof TextMessage) {

final String answer = ((TextMessage) message).getText();

messages.sendMessage(answer);

//T t = objectMapper.readValue(answer, new MyTypeReference());

...

} else {

...

}

} catch (JMSException e) {

ctx.setRollbackOnly(); // 実行時例外を投げてもロールバック

throw new IllegalStateException(e);

}

}

private static class MyTypeReference extends TypeReference<T> { }

}

@Stateless

public class Messages {

@Resource(lookup = "jms/ConnectionFactory")

private ConnectionFactory connectionFactory;

@Resource(name = "jms/AnswerQueue")

private Queue answerQueue;

@Inject

private ObjectMapper objectMapper;

public void sendMessage(String text) throws JMSException {

Connection connection = null;

Session session = null;

MessageProducer producer = null;

try {

connection = connectionFactory.createConnection();

connection.start();

session =

connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

producer = session.createProducer(answerQueue);

producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

//String text = objectMapper.writeValueAsString(t);

TextMessage message = session.createTextMessage(text);

producer.send(message);

あるいは

ObjectMessage message = session.createObjectMessage();

message.setObject(order);

message.setFloatProperty("orderAmount", amount);

producer.send(message);

} finally {

if (producer != null) producer.close();

if (session != null) session.close();

if (connection != null) connection.close();

}

}

public String receiveMessage() throws JMSException {

Connection connection = null;

Session session = null;

MessageConsumer consumer = null;

try {

connection = connectionFactory.createConnection();

connection.start();

session =

connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

consumer = session.createConsumer(answerQueue);

TextMessage message = (TextMessage) consumer.receive(1000);

return message.getText();

} finally {

if (consumer != null) consumer.close();

if (session != null) session.close();

if (connection != null) connection.close();

}

}

}

★JMSのインタフェース

★JMS 2.0 new Features and Enhancements

・Simplified API

JMS 1.1 (Java EE6)

public void sendMessage(ConnectionFactory factory,

Queue queue, String message) {

Connection connection = null;

try {

connection = factory.createConnection();

Session session = connection.createSession(

false, Session.AUTO_ACKNOWLEDGE);

MessageProducer messageProducer = session.createProducer(queue);

TextMessage textMessage = session.createTextMessage(message);

messageProducer.send(textMessage);

} catch (JMSException e) {

...

} finally {

connection.close();

}

}

JMS 2.0 (Java EE7)

public void sendMessage(ConnectionFactory factory,

Queue queue, String message) {

try (JMSContext context = factory.createContext()) {

context.createProducer().send(queue, message);

} catch (JMSException e) {

...

}

}

・Asynchronous send mode

messageProducer.send(message, completionListener);

・Delayed message delivery

MessageProducer#setDeliveryDelay(long deliveryDelay)

JMSProducer#setDeliveryDelay(long deliveryDelay)

・Share the same topic subscription

MessageConsumer messageConsumer =

session.createSharedConsumer(topic, sharedSubscriptionName);

MessageConsumer messageConsumer =

session.createDurableConsumer(topic, durableSubscriptionName);

Application Managed

@Resource

private ConnectionFactory cf;

private JMSContext ctx;

@Resource("jms/emailQ")

private Destination emailQ;

public void send(String email) {

Session session;

try {

ctx = cf.createContext();

ctx.createProducer().send(emailQ, email);

log.info("Sent message to queue({})", ((Queue) emailQ).getQueueName());

} catch (JMSException ex) {

log.error(...);

throw new JMSRuntimeException(ex.getMessage(), ex.getMessage(), ex);

} finally {

// 或は@PreDestroyメソッド

ctx.close();

}

}

Container Managed

@Inject

// @JMSConnectionFactory("jms/myConnectionFactory")

private JMSContext ctx;

@Resource("jms/emailQ")

private Destination emailQ;

public void send(String email) {

Session session;

try {

ctx.createProducer().send(emailQ, email);

log.info("Sent message to queue({})", ((Queue) emailQ).getQueueName());

} catch (JMSException ex) {

log.error(...);

throw new JMSRuntimeException(ex.getMessage(), ex.getMessage(), ex);

}

}

JMS based MDB

JCA based MDB