Trident

Trident is a high-level abstraction built on top of Storm.

Trident supports stateful stream processing, while pure Storm is a stateless processing framework.

The main advantage of using Trident is that it guarantees that every message entered into the topology is processed only once, which would be difficult to achieve with vanilla Storm.

The concept of Trident is similar to high-level batch processing tools, such as Cascading.

The Trident tuple is the data model of a Trident topology. The Trident tuple is the basic unit of data that can be processed by a Trident topology. Each tuple consists of a predefined list of fields. The value of each field can be a byte, char, integer, long, float, double, Boolean, or byte array. During the construction of a topology, operations are performed on a tuple, which will either add new fields to the tuple or replace the tuple with a new set of fields.

Each of the fields in a tuple can be accessed by name, (getValueByField(String)), or its positional index, (getValue(int)), in the tuple. The Trident tuple also provides convenience methods, such as getIntegerByField(String), which saves you from typecasting the objects.

TRIDENT FUNCTION

Trident functions contain logic to modify the original tuple. A Trident function gets a set of fields of the tuple as input and emits one or more tuples as output. The fields of the output tuples are merged with the fields of the input tuple to form the complete tuple, which will pass to the next action in the topology. If the Trident function emits no tuples corresponding to the input tuple, then that tuple is removed from the stream.

SUM FUNCTION

We can write a custom Trident function by extending the storm.trident.operation.BaseFunction class and implementing the execute(TridentTuple tuple, TridentCollector collector)method.

public class SumFunction extends BaseFunction {

private static final long serialVersionUID = 5L;

public void execute(TridentTuple tuple, TridentCollector collector) {

int number1 = tuple.getInteger(0);

int number2 = tuple.getInteger(1);

int sum = number1+number2;

// emit the sum of first two fields

collector.emit(new Values(sum));

}

}

Suppose we get dummyStream as input, which contains four fields, a, b, c, d, and only fields a and b are passed as input fields to the SumFunction function. The SumFunction class emits new a field, sum. The sum field emitted by the execute method of the SumFunction class is merged with the input tuple to form the complete tuple. Hence, the total number of fields in the output tuple is 5 (a, b, c, d, sum). Here is a sample piece of code that shows how we can pass the input fields and the name of the new field to the Trident function:

dummyStream.each(new Fields("a","b"), new SumFunction (), new Fields("sum"))

input tuple [1,4, 8,9] -----(sum fn)---> output tuple [1,4, 8,9,5]

(dummy stream) (output stream)

FILTER FUNCTION

A Trident filter gets a set of fields as input and returns either true or false depending on whether a certain condition is satisfied or not. If true is returned, then the tuple is kept in the output stream; otherwise, the tuple is removed from the stream.

We can write a custom Trident filter by extending the storm.trident.operation.BaseFilterclass and implementing the isKeep(TridentTuple tuple) method.

public static class CheckEvenSumFilter extends BaseFilter{

private static final long serialVersionUID = 7L;

public boolean isKeep(TridentTuple tuple) {

int number1 = tuple.getInteger(0);

int number2 = tuple.getInteger(1);

int sum = number1+number2;

if(sum % 2 == 0) {

return true;

}

return false;

}

}

dummyStream.each(new Fields("a","b"), new CheckEvenSumFilter ())

INPUT OUTPUT

[1,1,8,9] [1,1,8,9]

[1,2,4,5] [2,2,1,4]

[2,2,1,4]

[1,4,6,9]