Page authors

  • Kamal Bahadur
    December 28, 2011
Home‎ > ‎Java‎ > ‎

Flume - Avro Source

This article is about how to use "avroSource" of flume. At the time of writing this article, the version of flume is 0.9.4.

In the below example, I have used java standalone program to send data to flume agent. The source of flume agent has been set to "avroSource" to receive the AvroFlumeEvent object.

package avro;

import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;

import com.cloudera.flume.handlers.avro.AvroFlumeEvent;
import com.cloudera.flume.handlers.avro.FlumeEventAvroServer;
import com.cloudera.flume.handlers.avro.Priority;

public class AvroRPCClient {
    public static void main(String[] args) throws IOException {
        URL url = new URL("http", "localhost", 8800, "/");
        HttpTransceiver transport = new HttpTransceiver(url);
        FlumeEventAvroServer avroClient = (FlumeEventAvroServer) SpecificRequestor
                .getClient(FlumeEventAvroServer.class, transport);

        AvroFlumeEvent afe = new AvroFlumeEvent();
        Map<java.lang.CharSequence, java.nio.ByteBuffer> fields = new HashMap<java.lang.CharSequence, java.nio.ByteBuffer>();
        fields.put("field1", ByteBuffer.wrap("value1".getBytes()));
        fields.put("field2", ByteBuffer.wrap("value2".getBytes()));
        fields.put("field3", ByteBuffer.wrap("value3".getBytes()));
        fields.put("field4", ByteBuffer.wrap("value4".getBytes()));
        afe.fields = fields;

        afe.priority = Priority.INFO;
        afe.body = ByteBuffer.wrap("HELLO WORLD!!!!".getBytes());
        afe.timestamp = new Date().getTime();
        afe.host = "localhost";

        avroClient.append(afe);

    }
}



In the agent node, FlumeEvent object is created by copying the data from AvroFlumeEvent object:

  • "fields" map data from AvroFlumeEvent is copied to "attrs" map in FlumeEvent object.
  • "priority" is copied to "pri"
  • "body" is copied to "body"
  • "timestamp" is copied to "timestamp"
  • "host" is copied to "host"