Event Hub

 

Event Hubs

 

 

Basic concepts

Event producers send events through Azure Message Queue Protocol (AMQP), HTTP or Apache Kafka to an Event Hub.

An event hub can have multiple partitions, and use load-balanced distribution to allocate events into different partitions. This is pretty much like multi-threading to improve throughput. The idea is that each worker reads from its assigned partition, so with multiple partitions, multiple workers can receive events from the event hub at the same time.

Each event hub represents one unique stream of data. If need to process another stream of data, one can create another event hub.

The logical container of multiple event hubs is called namespace.

 

Create Event Hub

On Azure, search for Event hubs in resources

Create an Event Hubs namespace,

Create an Event Hub

Click into the event hub panel, select “Shared Access Policies” on the left to create a new policy that allows ‘send’ events to the event hub. This must be done to get the access url, i.e. connection_string_primary_key.

 

Program a C# script to send events to event hub

From Visual Studio, create a console project

From tool->NuGet package manager -> Package manager console, run

Install-Package Azure.Messaging.EventHubs

Put in the program below. The connectionString is copied from the shared access policies.

using System;

using System.Text;

using System.Threading.Tasks;

using Azure.Messaging.EventHubs;

using Azure.Messaging.EventHubs.Producer;

 

namespace SendToEventHub

{

    class Program

    {

     // connection string to the Event Hubs namespace

     private const string connectionString = "Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=send-to-event-hub;SharedAccessKey=xxx;EntityPath=test-event-hub";

 

     // name of the event hub

     private const string eventHubName = "test-event-hub";

 

     // number of events to be sent to the event hub

     private const int numOfEvents = 3;

 

     // The Event Hubs client types are safe to cache and use as a singleton for the lifetime

     // of the application, which is best practice when events are being published or read regularly.

     static EventHubProducerClient producerClient;

 

    static async Task Main()

     {

         // Create a producer client that you can use to send events to an event hub

         producerClient = new EventHubProducerClient(connectionString, eventHubName);

 

         // Create a batch of events

            using(EventDataBatch eventBatch = await producerClient.CreateBatchAsync())

         {

             eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Event 1")));

             eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Event 2")));

             eventBatch.TryAdd(new EventData(Encoding.UTF8.GetBytes("Event 3")));

 

             await producerClient.SendAsync(eventBatch);

             await producerClient.DisposeAsync();

         }

     }

    }

 

}

 

Build and run the program. The program simply creates a EventHub Client and sends a batch of 3 events.

Now go to the overview of the event hub to see the stats, and ‘messages’ chart should show that there are 3 incoming messages. But to view the actual content of the messages, you need write a reader program, or use other tools like Azure Stream Analytics Query or some third party tool like service bus explorer.

 

Program a C# script to read from event hub

Here there is concept of consumer group, which defines a view of the event hubs, so different consumer groups can potentially see different event hubs.

Consumers are ‘processes’ within the consumer group that can read from event hubs through the AMQP protocol. Usually, the number of processes should match the number of partitions of an event hub so each process is reading from one partition. If more processes than partitions, which means multiple processes reading the same partition, it could cause duplicate read issues.

The location of an event in a partition of an event hub is called offset. You may want to save the offset (checkpointing) every now and then to a storage, so if the consumer process fails and restarts, it knows where to resume instead of starting over again. Usually, the checkpoint process saves the offset to a blob storage on Azure.

Create a container called ‘checkpoints’ within the blob storage first. Get the connection string of the storage account from ‘Access Keys’.

Create a new policy for ‘Listen’ to the event hub through “Shared Access Policies” on the left panel of the event hub. From the new policy, get the new connection string to the event hub.

Now create another console project, and install the packages required. The Processor package is new here.

From tool->NuGet package manager -> Package manager console, run

Install-Package Azure.Messaging.EventHubs

    Install-Package Azure.Messaging.EventHubs.Processor

 

The program is as below. We use the ‘$default ‘consumer group name comes with the event hub / create a new consumer group. The blob storage client is also needed for saving checkpoints. Both the consumer group name and blob storage client are passed into the eventhub client as parameters.

Here we also set the ProcessEvent and the ProcessError events. The first one is called every time an event is processed. We simply print the event and save the checkpoint. Note in practice we don’t need to save checkpoint after every event is processed. We might save checkpoint every 5 minutes or every 10 thousand events. The error event simply prints the message.


using System;

using System.Text;

using System.Threading.Tasks;

using Azure.Storage.Blobs;

using Azure.Messaging.EventHubs;

using Azure.Messaging.EventHubs.Consumer;

using Azure.Messaging.EventHubs.Processor;

 

namespace ReceiveFromEventHub

{

    class Program

    {

     // connection string to the Event Hubs namespace

     // note this is different to the conneciton string for sending events, this is for listening to events

     private const string eventHubconnectionString = " Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=send-to-event-hub;SharedAccessKey=xxx;EntityPath=test-event-hub";

     // name of the event hub

     private const string eventHubName = "test-event-hub";

     //connection string to the azure blob storage

     private const string blobStorageConnectionString = "DefaultEndpointsProtocol=https;AccountName=cleverfoxstorage;AccountKey=xxx;EndpointSuffix=core.windows.net";

     //the blob storage container

     private const string blobContainerName = "checkpoints";

 

     static async Task Main(string[] args)

     {

         // Read from the default consumer group: $Default

         string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;

 

         // Create a blob container client that the event processor will use

         BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);

 

         // Create an event processor client to process events in the event hub

         // it registers the storage client for saving checkpoints and the consumer group

         EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubconnectionString, eventHubName);

 

         // Register handlers for processing events and handling errors

         processor.ProcessEventAsync += ProcessEventHandler;

         processor.ProcessErrorAsync += ProcessErrorHandler;

 

         // Start the processing

         await processor.StartProcessingAsync();

 

         // Wait for 30 seconds for the events to be processed

         await Task.Delay(TimeSpan.FromSeconds(30));

 

            // Stop the processing

         await processor.StopProcessingAsync();

     }

 

     static async Task ProcessEventHandler(ProcessEventArgs eventArgs)

     {

         // Write the body of the event to the console window

         Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));

 

         // Update checkpoint in the blob storage so that the app receives only new events the next time it's run

         await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);

     }

 

     static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)

     {

         // Write details about the error to the console window

         Console.WriteLine($"\tPartition '{ eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");

            Console.WriteLine(eventArgs.Exception.Message);

         return Task.CompletedTask;

     }

    }

}

 

 

Run the program and it should receive the events sent to the event hub earlier.

    Received event: Event 1

     Received event: Event 2

     Received event: Event 3

 

 

Python scripts for sending to and receiving from Event Hubs:

Sender:

from azure.eventhub.aio import EventHubProducerClient

from azure.eventhub import EventData

event_hub_name = "test-event-hub"

event_hub_conn = "Endpoint=sb://cleverfoxeventhubsnamespace.servicebus.windows.net/;SharedAccessKeyName=send-to-event-hub;SharedAccessKey=xxx=;EntityPath=test-event-hub"

# event hub client

producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_conn, eventhub_name=event_hub_name)

# a batch to hold events

event_data_batch = await producer.create_batch()

# Add events to the batch.

event_data_batch.add(EventData('First event :-)'))

event_data_batch.add(EventData('Second event :-)'))

event_data_batch.add(EventData('Third event :-)'))

# Send the batch of events to the event hub.

await producer.send_batch(event_data_batch)

print('done')

Receiver:

#note this receiver runs endless, so need to stop execution when test is done

from azure.eventhub.aio import EventHubConsumerClient

event_hub_name = "test-event-hub"

event_hub_conn = "Endpoint=sb://cleverfoxeventhubsnamespace.servicebus.windows.net/;SharedAccessKeyName=listen-to-event-hub;SharedAccessKey=cxxx=;EntityPath=test-event-hub"

async def on_event(partition_context, event):

   # Print the event data.

   print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

   # Update the checkpoint so that the program doesn't read the events

   await partition_context.update_checkpoint(event)

# Create a consumer client for the event hub.

client = EventHubConsumerClient.from_connection_string(event_hub_conn, consumer_group="$Default", eventhub_name=event_hub_name)

async with client:

   # Call the receive method. Read from the beginning of the partition (starting_position: "-1")

   await client.receive(on_event=on_eventstarting_position="-1")