Message Queue
プロセス間或いはスレッド間に非同期通信を行う
種類
・Java Messaging Service (JMS)
・Advanced Message Queueing Protocol (AMQP)
・Message Queueing Telemetry Transport (MQTT)
Java Platform
規約
JMS
実装
ActiveMQ
RabbitMQ
★メッセージキューのモデル
★MDB(Message Driven Bean)
※MDB != JMS
MDBはEJBの一種、状態を持たないStateless Beanとして管理される
MDBはWebプロファイルに含まれないため、warではなくjarかearにデプロイする
@MessageDriven(mappedName="jms/Topic"
activationConfig = {
@ActivationConfigProperty(propertyName = "destination",
propertyValue = "java:/queues/xxxQueue"),
@ActivationConfigProperty(propertyName = "acknowledgeMode",
propertyValue = "Auto-acknowledge"),
@ActivationConfigProperty(propertyName = "messageSelector",
propertyValue = "orderAmount > 1000"),
@ActivationConfigProperty(propertyName = "destinationType",
propertyValue = "javax.jms.Queue")
})
public class MessageBean implements MessageListener {
@EJB // EJBコンテナによりマルチスレッドで処理
private Messages messages;
@Inject
private ObjectMapper objectMapper;
@Resource
private MessageDrivenContext ctx;
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
final String answer = ((TextMessage) message).getText();
messages.sendMessage(answer);
//T t = objectMapper.readValue(answer, new MyTypeReference());
...
} else {
...
}
} catch (JMSException e) {
ctx.setRollbackOnly(); // 実行時例外を投げてもロールバック
throw new IllegalStateException(e);
}
}
private static class MyTypeReference extends TypeReference<T> { }
}
@Stateless
public class Messages {
@Resource(lookup = "jms/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(name = "jms/AnswerQueue")
private Queue answerQueue;
@Inject
private ObjectMapper objectMapper;
public void sendMessage(String text) throws JMSException {
Connection connection = null;
Session session = null;
MessageProducer producer = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(answerQueue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//String text = objectMapper.writeValueAsString(t);
TextMessage message = session.createTextMessage(text);
producer.send(message);
あるいは
ObjectMessage message = session.createObjectMessage();
message.setObject(order);
message.setFloatProperty("orderAmount", amount);
producer.send(message);
} finally {
if (producer != null) producer.close();
if (session != null) session.close();
if (connection != null) connection.close();
}
}
public String receiveMessage() throws JMSException {
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(answerQueue);
TextMessage message = (TextMessage) consumer.receive(1000);
return message.getText();
} finally {
if (consumer != null) consumer.close();
if (session != null) session.close();
if (connection != null) connection.close();
}
}
}
★JMSのインタフェース
★JMS 2.0 new Features and Enhancements
・Simplified API
JMS 1.1 (Java EE6)
public void sendMessage(ConnectionFactory factory,
Queue queue, String message) {
Connection connection = null;
try {
connection = factory.createConnection();
Session session = connection.createSession(
false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage(message);
messageProducer.send(textMessage);
} catch (JMSException e) {
...
} finally {
connection.close();
}
}
JMS 2.0 (Java EE7)
public void sendMessage(ConnectionFactory factory,
Queue queue, String message) {
try (JMSContext context = factory.createContext()) {
context.createProducer().send(queue, message);
} catch (JMSException e) {
...
}
}
・Asynchronous send mode
messageProducer.send(message, completionListener);
・Delayed message delivery
MessageProducer#setDeliveryDelay(long deliveryDelay)
JMSProducer#setDeliveryDelay(long deliveryDelay)
・Share the same topic subscription
MessageConsumer messageConsumer =
session.createSharedConsumer(topic, sharedSubscriptionName);
MessageConsumer messageConsumer =
session.createDurableConsumer(topic, durableSubscriptionName);
Application Managed
@Resource
private ConnectionFactory cf;
private JMSContext ctx;
@Resource("jms/emailQ")
private Destination emailQ;
public void send(String email) {
Session session;
try {
ctx = cf.createContext();
ctx.createProducer().send(emailQ, email);
log.info("Sent message to queue({})", ((Queue) emailQ).getQueueName());
} catch (JMSException ex) {
log.error(...);
throw new JMSRuntimeException(ex.getMessage(), ex.getMessage(), ex);
} finally {
// 或は@PreDestroyメソッド
ctx.close();
}
}
Container Managed
@Inject
// @JMSConnectionFactory("jms/myConnectionFactory")
private JMSContext ctx;
@Resource("jms/emailQ")
private Destination emailQ;
public void send(String email) {
Session session;
try {
ctx.createProducer().send(emailQ, email);
log.info("Sent message to queue({})", ((Queue) emailQ).getQueueName());
} catch (JMSException ex) {
log.error(...);
throw new JMSRuntimeException(ex.getMessage(), ex.getMessage(), ex);
}
}
JMS based MDB
JCA based MDB