Miscellaneous Components

Home‎ > ‎

Dynamic Push Replication Subscription

Setting up Active-Activen clusters

Active/Active is pretty similar to how we set up Active/Passive clusters but it needs some special classes to be used. First, make sure SafePublishingCacheStore is configured in the cache config and as we register a publisher we use SafeLocalCachePublisher instead of LocalCachePublisher. How does the following sound?



Making use of introduce element in cache configuration

Start using introduce tag while writing coherence cache config. This is one of the very useful features introduced in coherence common incubator pattern. introduce tag allows us re-use of common cache configurations and is as simple as:
<cache-config>
    <introduce-cache-config file="coherence-pushreplicationpattern-cache-config.xml"/>
</cache-config>

Dynamic subscription of subscriber clusters

One architecture that Push replication supports and possibly the most popular one as well, is a hub-n-spoke model. In the hub-n-spoke model not only the spoke clusters know about the hub but the hub knows about all the clusters on the spokes as well at least at the time of deployment. This "knowledge" of the other cluster is in shape of a set of remote-cache-schemes. Recently I came across a requirement where the number of spoke clusters were not known at the time of deployment. This ever expanding subscriber cluster introduces new challenges to Push replication deployments. Coherence is all about 100% up time and stopping the hub every time a new subscriber cluster joins it, is not a preferable deployment architecture. So lets introduce how new clusters can dynamically join the hub so that hubdoes not know about  the subscriber but subscriber knows about the hub.

Lets start with some cache configurations and how it will look in a production environment. The following samples are part of a proof of concept and there is a scope of a few tweakings.

Coherence Cache Configuration on the hub
<xml version="1.0" encoding="windows-1252" ?>
<!DOCTYPE cache-config SYSTEM "cache-config.dtd">
<cache-config>
   <introduce-cache-config file="coherence-pushreplicationpattern-cache-config.xml" />
    <caching-scheme-mapping>
    </caching-scheme-mapping>
   <caching-schemes>
       <proxy-scheme>
           <scheme-name>proxy-scheme</scheme-name>
           <service-name>ProxyCacheScheme</service-name>
           <acceptor-config>
               <tcp-acceptor>
                   <local-address>
                       <address>localhost</address>
                       <port>20000</port>
                       <reusable>true</reusable>
                   </local-address>
                   <keep-alive-enabled>true</keep-alive-enabled>
               </tcp-acceptor>
            </acceptor-config>
           <autostart>true</autostart>
       </proxy-scheme>             
   </caching-schemes>
</cache-config>

Hmm... The only configuration the hub cache config has is proxy-scheme. The hub does not know anything about who the subscribers will be. Will explain later how it will be done, scroll down.

coherence-pushreplicationpattern-cache-config.xml
Its pretty much the same as seen when you download the push replication project, just replace PublishingCacheStore with SafePublishingCacheStore.

Subscriber Cluster Cache Configuration
Cache configuration deployed on the subscriber cluster looks a little more complete as subscriber knows about which hub it has to connect to. The configuration looks like:
<cache-config>
   <introduce-cache-config file="coherence-pushreplicationpattern-cache-config.xml" />
    
   <caching-scheme-mapping>
   </caching-scheme-mapping>
   <caching-schemes>
       <proxy-scheme>
           <scheme-name>proxy-scheme</scheme-name>
           <service-name>ProxyCacheScheme</service-name>
           <acceptor-config>
               <tcp-acceptor>
                   <local-address>
                       <address>localhost</address>
                       <port>9099</port>
                       <reusable>true</reusable>
                   </local-address>
                   <keep-alive-enabled>true</keep-alive-enabled>
               </tcp-acceptor>             
           </acceptor-config>
           <autostart>true</autostart>         
       </proxy-scheme>     
    
       <remote-invocation-scheme>
           <scheme-name>Subscriber1</scheme-name>
           <service-name>Subscriber1</service-name>
           <initiator-config>
               <;tcp-initiator>
                   <remote-addresses>
                       <socket-address>
                           <address>localhost</address>
                           <port>20000</port>
                       </socket-address>
                   </remote-addresses>
               </tcp-initiator>
           </initiator-config>     
       </remote-invocation-scheme>

    
       <invocation-scheme>
           <scheme-name>invocation-scheme</scheme-name>
           <service-name>InvocationService</service-name>
           <autostart>true</autostart>
       </invocation-scheme>
   </caching-schemes>
 
</cache-config>

The subscriber cache configuration should have a location of the hub defined as a remote-invocation-scheme. Make sure that it's service name is unique that identifies the subscriber cluster.

Dynamic Registration of Subscriber Cluster

Run:
 java -Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.clusteraddress=<subscriber-multicast-ip> -Dtangosol.coherence.cacheconfig=subscriber-cache-config.xml SubscriptionRegistration

public class SubscriptionRegistration
implements SubscriptionRegistrationMBean, Serializable {

    public SubscriptionRegistration() {
    }

    public void enableRemotePublisher(String cacheName, String socketAddresses) {
        .....
        InvocationService iS = (InvocationService) CacheFactory.getService(<subscriberName>);
            iS.query(new SubscriberTask(sName, socketAddresses), null);
            PublisherRegistrationTask rTask = new PublisherRegistrationTask(cacheName, <subscriberName>);
            iS.query(rTask, null);
        }
    }

    public void enableLocalPublisher(String cacheName, String invocationService) {
        ...
        Member sM = CacheFactory.getCluster().getOldestMember();
        InvocationService isLocal = (InvocationService) CacheFactory.getService(invocationService);
        PublisherRegistrationTask rTask = new PublisherRegistrationTask(cacheName, <subscriberName>);
        isLocal.execute(rTask, new HashSet(Collections.singletonList(sM)), null);

    }

    private class PublisherRegistrationTask implements Invocable {

        private String cacheName;
        private String subscriberName;

        public PublisherRegistrationTask(String cacheName, String subscriberName) {
            this.cacheName = cacheName;
            this.subscriberName = subscriberName;
        }

        public void init(InvocationService invocationService) {

        }

        public void run() {
            PushReplicationManager pM = DefaultPushReplicationMananger.getInstance();
            BatchPublisher batchPublisher = new RemoteInvocationPublisher(subscriberName,
                                              new BatchPublisherAdapter(new SafeLocalCachePublisher(cacheName)),
                                              true, 10000, 100, 10000, 5);
            pM.registerBatchPublisher(cacheName, subscriberName, batchPublisher);
        }

        public Object getResult() {
            return null;
        }

    }

    private class SubscriberTask implements Invocable {

        private String serviceName;
        private String rAs;

        public SubscriberTask(String serviceName, String rAs) {
            this.serviceName = serviceName;
            this.rAs = rAs;
        }

        public void init(InvocationService invocationService) {

        }

        public void run() {
            ConfigurableCacheFactory factory = CacheFactory.getConfigurableCacheFactory();
            XmlElement root = factory.getConfig();
            XmlElement cS = root.findElement("caching-schemes");
            XmlElement riS = null;
            XmlElement rA = null;
            riS = cS.findElement("remote-invocation-scheme/service-name");
            if (riS != null && riS.getString().equals(serviceName)) {
                rA =  riS.getParent().findElement("initiator-config/tcp-initiator/remote-addresses");
            } else {
                riS = cS.addElement("remote-invocation-scheme");
                riS.addElement("scheme-name").setString(serviceName);
                riS.addElement("service-name").setString(serviceName);
                XmlElement iC = riS.addElement("initiator-config");
                XmlElement tI = iC.addElement("tcp-initiator");
                rA = tI.addElement("remote-addresses");
            }
            StringTokenizer tokens = new StringTokenizer(rAs, ",:", false);
            while (tokens.hasMoreTokens()) {
                XmlElement sA = rA.addElement("socket-address");
                sA.addElement("address").setString(tokens.nextToken());
                sA.addElement("port").setInt(Integer.parseInt(tokens.nextToken()));
            }
            factory.setConfig(root);
        }

        public Object getResult() {
            return null;
        }

    }

    public static void main(String[] args) {
        Registry registry = CacheFactory.ensureCluster().getManagement();
        SubscriptionRegistrationMBean bean = new SubscriptionRegistration();
        String sName = registry.ensureGlobalName("type=Subscriber");
        registry.register(sName, bean);

        try {
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

There are three parts of this class:
  1. Updates the cache configuration deployed on the hub to register itself.
  2. Registers a Publisher on the hub so that changes made in the hub is pushed to this subscriber cluster.
  3. Registers a publisher in the local cluster so that the replication is in Active-Active mode. Changes made in the subscriber cluster also is pushed to the hub's cache and thereafter to other subscriber clusters.

Execute this MBean for each subscriber cluster that needs to join the hub. Change the subscriber cache configuration accordingly. While I was developing this sample I found an issue in SafeLocalCachePublisher as it was missing a default constructor. A JIRA has been opened and will be fixed in the next Incubator release. In the meantime download the push replication source code and add a default constructor in SafeLocalCachePublisher. So thats pretty much it. Running geographically distributed dynamically subscribed multi-clusters in a hub-n-spoke architecture in less than 10 minutes and then staying up 100% of the time. Enjoy!

Whats attached?

  • A complete JDeveloper project with source code for the MBean to be executed on the subscriber cluster
  • Sample cache configurations for hub and subscriber clusters.
  • JDeveloper run profiles to test the application with three separate clusters.

Whats required?

Whats New?

  • After a cluster registers itself with the hub, the latest cache configuration is put in a XML Cache.
  • A new DynamicCacheServer class.
    • DynamicCacheServer extends DefaultCacheServer but after it starts the CacheServer it waits for 10s - a good enough time for a cache server to start with in, and then loads the latest cache configuration from the XML Cache after replacing the proxy port information if available.
  • SubscriptionRegistration MBean uses InvocationService with in InvocationService to run the Invocable on all the nodes.
  • Updated JDeveloper Run profiles to start all the HUB Cache servers using DynamicCacheServer.

Attachments (1)

  • dynamic-push-replication.zip - on May 6, 2009 12:42 AM by ashish srivastava (version 4 / earlier versions)
    175k Download