This article is about how to use "avroSource" of flume. At the time of writing this article, the version of flume is 0.9.4. In the below example, I have used java standalone program to send data to flume agent. The source of flume agent has been set to "avroSource" to receive the AvroFlumeEvent object. package avro;import java.io.IOException;import java.net.URL;import java.nio.ByteBuffer;import java.util.Date;import java.util.HashMap;import java.util.Map;import org.apache.avro.ipc.HttpTransceiver;import org.apache.avro.ipc.specific.SpecificRequestor;import com.cloudera.flume.handlers.avro.AvroFlumeEvent;import com.cloudera.flume.handlers.avro.FlumeEventAvroServer;import com.cloudera.flume.handlers.avro.Priority;public class AvroRPCClient { public static void main(String[] args) throws IOException { URL url = new URL("http", "localhost", 8800, "/"); HttpTransceiver transport = new HttpTransceiver(url); FlumeEventAvroServer avroClient = (FlumeEventAvroServer) SpecificRequestor .getClient(FlumeEventAvroServer.class, transport); AvroFlumeEvent afe = new AvroFlumeEvent(); Map<java.lang.CharSequence, java.nio.ByteBuffer> fields = new HashMap<java.lang.CharSequence, java.nio.ByteBuffer>(); fields.put("field1", ByteBuffer.wrap("value1".getBytes())); fields.put("field2", ByteBuffer.wrap("value2".getBytes())); fields.put("field3", ByteBuffer.wrap("value3".getBytes())); fields.put("field4", ByteBuffer.wrap("value4".getBytes())); afe.fields = fields; afe.priority = Priority.INFO; afe.body = ByteBuffer.wrap("HELLO WORLD!!!!".getBytes()); afe.timestamp = new Date().getTime(); afe.host = "localhost"; avroClient.append(afe); }}In the agent node, FlumeEvent object is created by copying the data from AvroFlumeEvent object:
|