Setting up Active-Activen clustersActive/Active is pretty similar to how we set up Active/Passive clusters but it needs some special classes to be used. First, make sure SafePublishingCacheStore is configured in the cache config and as we register a publisher we use SafeLocalCachePublisher instead of LocalCachePublisher. How does the following sound?Making use of introduce element in cache configurationStart using introduce tag while writing coherence cache config. This is one of the very useful features introduced in coherence common incubator pattern. introduce tag allows us re-use of common cache configurations and is as simple as:<cache-config> <introduce-cache-config file="coherence-pushreplicationpattern-cache-config.xml"/> </cache-config> Dynamic subscription of subscriber clustersOne architecture that Push replication supports and possibly the most popular one as well, is a hub-n-spoke model. In the hub-n-spoke model not only the spoke clusters know about the hub but the hub knows about all the clusters on the spokes as well at least at the time of deployment. This "knowledge" of the other cluster is in shape of a set of remote-cache-schemes. Recently I came across a requirement where the number of spoke clusters were not known at the time of deployment. This ever expanding subscriber cluster introduces new challenges to Push replication deployments. Coherence is all about 100% up time and stopping the hub every time a new subscriber cluster joins it, is not a preferable deployment architecture. So lets introduce how new clusters can dynamically join the hub so that hubdoes not know about the subscriber but subscriber knows about the hub.Lets start with some cache configurations and how it will look in a production environment. The following samples are part of a proof of concept and there is a scope of a few tweakings. Coherence Cache Configuration on the hub <xml version="1.0" encoding="windows-1252" ?> <!DOCTYPE cache-config SYSTEM "cache-config.dtd"> <cache-config> <introduce-cache-config file="coherence-pushreplicationpattern-cache-config.xml" /> <caching-scheme-mapping> </caching-scheme-mapping> <caching-schemes> <proxy-scheme> <scheme-name>proxy-scheme</scheme-name> <service-name>ProxyCacheScheme</service-name> <acceptor-config> <tcp-acceptor> <local-address> <address>localhost</address> <port>20000</port> <reusable>true</reusable> </local-address> <keep-alive-enabled>true</keep-alive-enabled> </tcp-acceptor> </acceptor-config> <autostart>true</autostart> </proxy-scheme> </caching-schemes> </cache-config> Hmm... The only configuration the hub cache config has is proxy-scheme. The hub does not know anything about who the subscribers will be. Will explain later how it will be done, scroll down. coherence-pushreplicationpattern-cache-config.xml Its pretty much the same as seen when you download the push replication project, just replace PublishingCacheStore with SafePublishingCacheStore. Subscriber Cluster Cache Configuration Cache configuration deployed on the subscriber cluster looks a little more complete as subscriber knows about which hub it has to connect to. The configuration looks like: <cache-config> <introduce-cache-config file="coherence-pushreplicationpattern-cache-config.xml" /> <caching-scheme-mapping> </caching-scheme-mapping> <caching-schemes> <proxy-scheme> <scheme-name>proxy-scheme</scheme-name> <service-name>ProxyCacheScheme</service-name> <acceptor-config> <tcp-acceptor> <local-address> <address>localhost</address> <port>9099</port> <reusable>true</reusable> </local-address> <keep-alive-enabled>true</keep-alive-enabled> </tcp-acceptor> </acceptor-config> <autostart>true</autostart> </proxy-scheme> <remote-invocation-scheme> <scheme-name>Subscriber1</scheme-name> <service-name>Subscriber1</service-name> <initiator-config> <;tcp-initiator> <remote-addresses> <socket-address> <address>localhost</address> <port>20000</port> </socket-address> </remote-addresses> </tcp-initiator> </initiator-config> </remote-invocation-scheme> <invocation-scheme> <scheme-name>invocation-scheme</scheme-name> <service-name>InvocationService</service-name> <autostart>true</autostart> </invocation-scheme> </caching-schemes> </cache-config> The subscriber cache configuration should have a location of the hub defined as a remote-invocation-scheme. Make sure that it's service name is unique that identifies the subscriber cluster. Dynamic Registration of Subscriber Cluster Run: java -Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.clusteraddress=<subscriber-multicast-ip> -Dtangosol.coherence.cacheconfig=subscriber-cache-config.xml SubscriptionRegistration public class SubscriptionRegistration implements SubscriptionRegistrationMBean, Serializable { public SubscriptionRegistration() { } public void enableRemotePublisher(String cacheName, String socketAddresses) { ..... InvocationService iS = (InvocationService) CacheFactory.getService(<subscriberName>); iS.query(new SubscriberTask(sName, socketAddresses), null); PublisherRegistrationTask rTask = new PublisherRegistrationTask(cacheName, <subscriberName>); iS.query(rTask, null); } } public void enableLocalPublisher(String cacheName, String invocationService) { ... Member sM = CacheFactory.getCluster().getOldestMember(); InvocationService isLocal = (InvocationService) CacheFactory.getService(invocationService); PublisherRegistrationTask rTask = new PublisherRegistrationTask(cacheName, <subscriberName>); isLocal.execute(rTask, new HashSet(Collections.singletonList(sM)), null); } private class PublisherRegistrationTask implements Invocable { private String cacheName; private String subscriberName; public PublisherRegistrationTask(String cacheName, String subscriberName) { this.cacheName = cacheName; this.subscriberName = subscriberName; } public void init(InvocationService invocationService) { } public void run() { PushReplicationManager pM = DefaultPushReplicationMananger.getInstance(); BatchPublisher batchPublisher = new RemoteInvocationPublisher(subscriberName, new BatchPublisherAdapter(new SafeLocalCachePublisher(cacheName)), true, 10000, 100, 10000, 5); pM.registerBatchPublisher(cacheName, subscriberName, batchPublisher); } public Object getResult() { return null; } } private class SubscriberTask implements Invocable { private String serviceName; private String rAs; public SubscriberTask(String serviceName, String rAs) { this.serviceName = serviceName; this.rAs = rAs; } public void init(InvocationService invocationService) { } public void run() { ConfigurableCacheFactory factory = CacheFactory.getConfigurableCacheFactory(); XmlElement root = factory.getConfig(); XmlElement cS = root.findElement("caching-schemes"); XmlElement riS = null; XmlElement rA = null; riS = cS.findElement("remote-invocation-scheme/service-name"); if (riS != null && riS.getString().equals(serviceName)) { rA = riS.getParent().findElement("initiator-config/tcp-initiator/remote-addresses"); } else { riS = cS.addElement("remote-invocation-scheme"); riS.addElement("scheme-name").setString(serviceName); riS.addElement("service-name").setString(serviceName); XmlElement iC = riS.addElement("initiator-config"); XmlElement tI = iC.addElement("tcp-initiator"); rA = tI.addElement("remote-addresses"); } StringTokenizer tokens = new StringTokenizer(rAs, ",:", false); while (tokens.hasMoreTokens()) { XmlElement sA = rA.addElement("socket-address"); sA.addElement("address").setString(tokens.nextToken()); sA.addElement("port").setInt(Integer.parseInt(tokens.nextToken())); } factory.setConfig(root); } public Object getResult() { return null; } } public static void main(String[] args) { Registry registry = CacheFactory.ensureCluster().getManagement(); SubscriptionRegistrationMBean bean = new SubscriptionRegistration(); String sName = registry.ensureGlobalName("type=Subscriber"); registry.register(sName, bean); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } } } There are three parts of this class:
Execute this MBean for each subscriber cluster that needs to join the hub. Change the subscriber cache configuration accordingly. While I was developing this sample I found an issue in SafeLocalCachePublisher as it was missing a default constructor. A JIRA has been opened and will be fixed in the next Incubator release. In the meantime download the push replication source code and add a default constructor in SafeLocalCachePublisher. So thats pretty much it. Running geographically distributed dynamically subscribed multi-clusters in a hub-n-spoke architecture in less than 10 minutes and then staying up 100% of the time. Enjoy! Whats attached?
Whats required?
Whats New?
|

