Storm

Apache Storm is a free and open source distributed real-time computation system that is scalable, reliable and easy to setup/maintain. Apache Storm cluster is made up of two types of processes - Nimbus and Supervisor. Nimbus is a process running on master node that is responsible for tracking the progress of data processing while Supervisor process runs on worker nodes and is responsible for executing the data processing logic

Batch V realtime

Batch - Fixed frequency i.e. runs jobs on any fixed time/day etc

1. Compute metrics over a period for building metrics for high, low, average, momentum etc

2. Build models from historic data

Realtime - Best example could be stock price

1. Monitor the stock price on real-time

2. Apply trade models to identify actions

3. Alert/trigger for specific trades

Storm is a real-time processing engine originally open sourced by Twitter

Storm can do Analytics, transformation, machine learning in real-time.

It has a very low-latency due to distributed architecture.

Storm is polyglot, you can implement one bolt in java another in python and so on.

Spark V Storm

Spark is more suited to systems which require the massaging of data into a form which allows analytical questions. This is due tot he MapReduce nature of Streams.

Stors is much more natural, as it gives the user more control over the computational paradigm he wishes to follow. Storm is a whole distributed computation layer, which abstractions make it suitable for targeting more use cases online analytical systems are faced with.

Understanding the Storm Components

Storm application which proceed data in real time is called topology

Storm topology is made of two components

1. Spouts - That received data from data sources and emits data to the rest of the topologies. Central node in the topology and it know if the data has completed processed.

2. Bolts - Process the data. Bolts should be as granular as possible as this could help in parallel processing.

Data passing through the storm is called tuple. Each storm components emits tuple with a fixed schema.

Declare Fields - Declare the output of the storm components.

declareOutputFields(OutputFieldDeclarer declarer){

declarer.declar(new Fields("symbol", "data", "price"));}

Implement this method in every store component.

Emit tuples(values) - All components use a collector component to emit values.

collector.emit(new Values("value11", "values2", "values3" ))

emitting values should match the datatype of the declared values.

i.e. Declare the field outputs and implement a method to do data processing and emit the data in the form of values

Setting up a storm topology requires

a. Setup component classes - Spouts and Bolts

b. Express processing logic

c. Manage Reliability - By implementing appropriate Bolt classes and tuning

d. Setup Parallelism - Decide number of tasks, stream grouping

Pom file:

Add dependency of storm. In the scope section provide value of compile or provided if compile the dependency jars will be included in the part of build to run locally. If you are running the topology to remote cluster, the cluster will have its own dependency, in such cases, use 'provided', so as to avoid dependency jars in the build.

<dependency>

<groupId>org.apache.storm</groupId>

<artifactId>storm-core</artifactId>

<version>1.1.1</version>

<scope>provided</scope> // or compile if you want to run locally.

</dependency>

Implementation

Spout Class require implementation of 3 methods

1. To Initialize Spout

Initialize connection to external datasource, for authentication and to open files

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){}

Map: Carries information needed by all components

ToplogyContext: Keeps a track of all the tasks running in the topology. component name, id etc.

Collector: Used to communicate between two components. Spout to bold or vice versa. The communication could be actual communication of data or acknowledge message of success/failure

When initialising spout you can take the collector provided by the storm backend and assign to a member variable of a spout, that way it is available to other methods in spout.

2. ReadTuples

This is where the action happens. This method will receive data from the source and emit data in the form of tuple to the bold. As long as the spout is alive this method will be continuously called and each time it is called a tuple will be emitted using the collector object which will then be passed to the bolt.

public void nextTuple(){}

3. Declare Fields

To set the schema of the what the output schema would be

public void declareOutputFields(OutputFieldDeclarer declarer){}

Setting up Cluster in Local Machine

    1. Download storm and set the STORM_HOME

export STORM_HOME=/Users/sureshbabu/Programs/apache-storm-1.1.1

2. Download zookeeper and set ZOOKEEPER_HOME

export ZOOKEEPER_HOME=~/Programs/zookeeper-3.4.6/

3. Go to Zookeeper home and start .zkServer start

./zkServer start

4. Go to Storm home bin directory and start the nimbus in master node

./storm nimbus

5. Once the nimbus server is started start the Supervisor. In production normally both nimbus and supervisor reside in different machines.

./storm supervisor

6. You check the status of storm, you can bring up storm ui which can be access via browser

./storm ui open browser with <ip>:8080

7. Submit the job to nimbus using ./storm jar <jar path> <TopologyMainClass>

./storm jar ~/Projects//target/RealStockTicket.jar TopologyMain

8. Navigate to localhost:8080 and check the Storm UI. Clicking on the topology name will show the details of spouts and bolts of the Topology.

General

/bin/storm has many options, some important options are

- numbus to start storm nimbus node

- supervisor to start supervisor node

- rebalance to rebalance the load to newly added server

- activate/deactivate to activate of deactivate the tasks

- jar to submit the topology

DATA FLOW

Controls how the data flow happens between spout and bolt, this is managed by storm using StreamGrouping

Grouping Strategies

1. Shuffle - Randomly which is default

2. Fields - Based on the value of the field

3. ALL - sent to all the tasks

4. Custom

Shuffle Grouping - Chosen at random. Task id is also chosen randomly. distribute workload evenly.

Field Grouping - Task chosen based on the value of specified fields. Say you have a tuple which carries a particular work, and it can be set to send the tuple to a particular bolt based on the field value. All tuples having same values of the field are sent to the same task. We can use it for sum count and min/max scenario etc.

All Grouping - All tuples are sent to all tasks. Ex could be for clearing/update a cache in all servers

Custom - Implement your own custom grouping. With custom grouping the onus of load balancing shifts to the user.

Reliability

Storm makes sure that the message it received are processed at least once. This is done by implementing the Spout and Bolt classes to act upon failure and success messages.

Note: By using Trident API we can manage the reliability automatically. We use the reliability only if we need to control the failures manually.

Steps:

1. In Spout, extent BaseRichSpout class and implement ack() and fail() methods

ack() method is called when a tuple is fully processed by the topology and fail() is called when any child tuple in the tuple tree fails. Note: BasicRichSpout class is used irrespective of whether you want to manage reliability or not (unlike Bolt).

2. Emit tuple with ID i.e when you are emitting tuple

collector.emit( new Values("values"), messageID)

The 'messageID' will be passed to all the components in the topology so storm can control the message replay upon failures

3. In Bolt, there are 2 methods, BaseBasicBolt and BaseRichBolt - If you use BaseBasicBolt, Storm will manage the reliability automatically which is inbuilt. If you want to manage reliability implement BaseRichBolt class where you can control the reliability. In the BaseRichBolt class unlike the BasicBolt class we have the collector object in the prepare() method instead of execute() method, so we can use collector object in the execute() method later on to control the ack() and failure() messages.

4. When implementing the bolt, we pass the input and out put tuple when msgs are emitted. 'input' is the Tuple object and passing this along the tuple tree will help in getting the msgId to be replayed at the event of failure.

collector.emit(input, new Values("values")

Here 'input' is the Tuple object which we are passing on to the next bolt along with values.

TRIDENT TOPOLOGY API

Trident API takes care to component classes (bolts and spouts), reliability and parallelism. Only this to be done is implementing Expression processing logic(using function objects) and apply a series of functions on the input stream.

Trident topology will simply take a input data stream and apply series of operation on the data stream.

Apply a trident operations on stream of data like

- each

-filter

- aggregate ( sum, count etc)

which will result in new Trident stream.

-----------

General Guidelines

When running zoo keep in production use

zkServer.sh start-foreground instead of zkServer.sh start.

To check the zookeeper stats

echo stat |nc localhost 2181

Storm UI explained - http://www.malinga.me/reading-and-understanding-the-storm-ui-storm-ui-explained/