ezMQ is a project to use Oracle Coherence as a JMS message service. The feature of this effort is to run the following code on top of Oracle Coherence without making any changes to the code. Make sure following steps are taken:
- Copy the provided jndi.properties and put it in the classpath.
- update fs.properties to have the name of the Topic and connection factory to the corresponding classes, where Topic is the name of the Topic (Also internally the name of the NamedCache). The key of this resource bundle are the objects that are looked up against the InitialContext.
- TopicConnectionFactory=com.ezduck.ezmq.EzTopicConnectionFactory
- Topic=com.ezduck.ezmq.EzTopic
- QueueConnectionFactory=com.ezduck.ezmq.EzQueueConnectionFactory
- Queue=com.ezduck.ezmq.EzQueue
- Use and deploy the provided coherence-cache-config.xml
How to build JMS Topics?
Write a simple JMS Publisher and Subscriber
# TopicPublisherAndSubscriber:public class Client implements MessageListener {
public Client() {
}
public static void main(String[] args)
throws NamingException, JMSException {
Client client = new Client();
InitialContext ctx = new InitialContext();
// -- Create
TopicConnectionFactory factory =
(TopicConnectionFactory) ctx.lookup("TopicConnectionFactory");
// -- Connecting to Proxy
TopicConnection connection = factory.createTopicConnection();
// -- This is a NamedCache
Topic topic = (Topic) ctx.lookup("Topic");
client.addSubscriber(connection, topic);
client.publishMessage(connection, topic);
}
private void publishMessage(TopicConnection connection, Topic topic)
throws JMSException {
TopicSession pubSession =
connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = pubSession.createPublisher(topic);
TextMessage message = pubSession.createTextMessage();
message.setText("Ashish");
publisher.publish(message);
}
private void addSubscriber(TopicConnection connection, Topic topic)
throws JMSException {
TopicSession subSession =
connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber subscriber = subSession.createSubscriber(topic);
subscriber.setMessageListener(this);
}
public void onMessage(Message message) {
try {
TextMessage tMsg = (TextMessage) message;
String text = tMsg.getText();
System.out.println("On Message: " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
So how does it work?
First, create the Context and ContextFactory that provides the Naming interface:
(Please refer to ezMQ.zip for the latest codebase)
# Context Factory:public class EzContextFactory implements InitialContextFactory {
public EzContextFactory() {
}
public Context getInitialContext(Hashtable<?, ?> environment) {
return new EzContext (environment);
}
}
# Context:// -- Some methods not shown here... and not implementedpublic class EzContext implements Context {
private ResourceBundle rb = PropertyResourceBundle.getBundle("fs");
private Hashtable map;
public EzContext(Hashtable map) {
this.map = map;
}
public Object lookup(String name) {
String className = rb.getString(name);
Object obj = null;
Class clz;
try {
clz = Class.forName(className);
Constructor ctor =
clz.getConstructor(new Class [] {EzContext.class});
obj = ctor.newInstance(new Object [] {this});
} catch (Exception e) {
e.printStackTrace();
}
return obj;
}
public Hashtable<?, ?> getEnvironment() {
return map;
}
...}
# Now JMS for Topics:
The classes implemented are:
- EzTopicConnectionFactory (TopicConnectionFactory)
- EzTopicConnection (TopicConnection)
- EzTopic (Topic)
- EzMessage (TextMessage)
- EzTopicPublisher (TopicPublisher)
- EzTopicSubscriber (TopicSubscriber)
- EzTopicSession (TopicSession)
Look at the source for more details.
How to build JMS Queues?
Building JMS Queue is a little more complex as inherently Oracle Coherence broadcasts the Cache Events to all registered Listeners. Building JMS queue imposes a restriction that one and only one Message Receiver should receive the JMS message. The solution to JMS Queue over Oracle Coherence revolves around the following three components:
- A Custom Named Cache that manages the registration of MapListeners.
- A cleverly placed Event dispatcher and,
- An EntryProcessor that makes sure one and only one MapListener receives the message.
QueueCache - A Custom NamedCache:
QueueCache is built on top of two cache configurations. First Cache Configuration is deployed on the Proxy servers. Proxy servers are also the "JMS Servers". The Cache that manages the JMS Queue is configured to use the QueueCache. Second cache configuration is a configuration that is being used internally by the Cache Provider and provides the real cache topologies. Besides the Cache management one of the key methods is its addMapListener method:
MapListenerSupport m_listenerSupport = new MapListenerSupport ();
public void addMapListener(MapListener listener, Filter filter,
boolean fLite) {
if (singleListener == null) {
singleListener = new InternalListener();
}
m_listenerSupport.addListener(listener, AlwaysFilter.INSTANCE, false);
super.addMapListener(singleListener, filter, fLite);
}
The MapListenerSupport class is used to manage all the registered MapListeners per JVM. This list is used to randomly pick one Listener to dispatch the event.
Event Dispatcher:Event dispatcher is a private method in the QueueCache that randomly picks a MapListener and dispatches the event. The method looks like the
following:
private void dispatchQueueEvent(MapEvent mapEvent) {
int size =
m_listenerSupport.getListeners(AlwaysFilter.INSTANCE).listeners().length;
EventListener[] eList =
m_listenerSupport.getListeners(AlwaysFilter.INSTANCE).listeners();
MapListener mListener =
(MapListener) eList[Base.getRandom().nextInt(size)];
mapEvent.dispatch(mListener);
}
The class retrieves all the Listeners registered by the Cache Server and randomly picks one and invokes dispatch () on the MapEvent.
EntryProcessor:One of the key components of the solution that makes sure only one Message Listener receives the JMS message is MLSEntryProcessor. The class makes sure that even if multiple MapListeners are registered by multiple Cache servers, only one of those listeners receive the message. The class uses an Enum to set the state of JMS delivery that each thread checks for before calling the dispatch () method. The Entry Processor looks like:
private enum STATE {
DISPATCHED,
WAITING,
;
}
private class MLSEntryProcessor implements InvocableMap.EntryProcessor,
Serializable {
private MapEvent mapEvent;
public MLSEntryProcessor(MapEvent mapEvent) {
this.mapEvent = mapEvent;
}
public Object process(InvocableMap.Entry entry) {
String state = (String) entry.getValue();
if (state == null) {
try {
dispatchQueueEvent(mapEvent);
entry.setValue(STATE.DISPATCHED.name(), true);
} catch (Exception exp) {
exp.printStackTrace();
}
}
return null;
}
public Map processAll(Set set) {
return Collections.EMPTY_MAP;
}
}
So here you go the JMS Queue on top of Coherence Data grid is ready. Now run a test Client to send and receive messages:
# Message Receiver:public class MessageSubscriber implements MessageListener {
public MessageSubscriber() {
}
public static void main(String[] args) throws Exception {
MessageSubscriber s = new MessageSubscriber();
InitialContext ctx = new InitialContext();
QueueConnectionFactory factory =
(QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
QueueConnection connection = factory.createQueueConnection();
Queue queue = (Queue) ctx.lookup("Queue");
QueueSession recSession =
connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueReceiver receiver = recSession.createReceiver(queue);
receiver.setMessageListener(s);
System.out.println("Click to end");
System.in.read();
}
public void onMessage(Message message) {
try {
TextMessage tMsg = (TextMessage) message;
String text = tMsg.getText();
System.out.println("On Message: " + text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
# Message Sender:public class MessagePublisher {
public MessagePublisher() {
}
public static void main(String[] args)
throws NamingException, JMSException {
MessagePublisher client = new MessagePublisher();
InitialContext ctx = new InitialContext();
QueueConnectionFactory factory =
(QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
QueueConnection connection = factory.createQueueConnection();
Queue queue = (Queue) ctx.lookup("Queue");
client.sendMessage(connection, queue);
}
private void sendMessage(QueueConnection connection, Queue queue)
throws JMSException {
QueueSession queSession =
connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = queSession.createSender(queue);
TextMessage message = queSession.createTextMessage();
message.setText("Ashish");
sender.send(message);
}
}
Limitations:
- Only File system context supported.
- Only TextMessage is implemented. It is easy to expand to other message types.
- Only Basic JMS features are implemented for proof of concept and technology demonstration.
- Due to the use of sequencer EntryProcessor to assign unique JMS Message ID the throughput would not exceed more than 200-300 messages per second.
- If a cache server is killed where MapListeners are registered the system takes a few seconds to recover. Messages sent during this window even though recoverable are not delivered. The restriction is due to the Extend behavior and the way ezMQ uses it.
- Queue names must start with name "Queue". This is a simple enhancement to address.
Features:
- Even though I do not guarantee the application to work in all use cases but the clients as shown above does not need to change to leverage Oracle Coherence as a JMS provider.
- Simple and straight forward. No bells and whistles.
Whats new?:What's Next?- Plug JMX MBean to monitor the JMS traffic
- Remove the restriction of naming all Queue types with Queue in front of its name.
- Address recovering Events when a cache server dies.
- Implement other JMS behaviors.
How to run?- Start the cache server
- java -Dtangosol.coherence.cacheconfig=proxy-cache-config.xml com.tangosol.net.DefaultCacheServer
- Start the Subscriber
- java com.ezduck.ezmq.client.MessageSubscriber <q - if testing for Queue>
- Start another....
- Start the Publisher
- java com.ezduck.ezmq.client.MessagePublisher