Databricks - send tweets to & receive from event hubs

Databricks is based on Apache Spark, providing a bunch of functionalities to run cluster, store data, run notebook with python/scala/sql/etc, MLflow, jobs and scheduling, etc. A pretty handy big data product and is available on Azure.

Logging into the databrick workspace, you have:

  Data -> database, tables. This supports kind of a simplified and modified SQL.

  Workspace -> notebook, MLFlow to run sql, python, scala, machine learning stuff, etc.

  Compute -> cluster. A cluster must be launched first before running any database / scripts.

  Jobs -> orchestration, schedule, workflow


Here walks through an example creating Azure databricks and streaming data from Twitter to Event Hubs and to databricks for consumption.


Create a resource -> Azure Databricks

It goes straight to “Create an Azure Databricks workspace”. Similar to creating other resources, put in the subscription, resource group, tier (e.g. free trial), etc. This can take a few minutes.

Go into the databricks resource. There is not much on the left panel to do. You need “launch workspace” which takes you to a separate website “azuredatabricks.net” where you do the actual work.

From the workspace, you can see a lot of common tasks, e.g. “new cluster”, “new job”, “new notebook”, “new mlflow”,etc.

Firstly, create a Spark Cluster to run databricks. Simply “new cluster”. Select cluster mode, single node or high concurrency or standard. For trial purpose, a single node is enough, which means the master and worker are on the same node. A nice feature is it can terminate after x minutes of inactivity, so it automatically close the cluster. The node type picks the hardware, cpu optimized or memory optimized, and the number of cores, etc. Here choose the minimum standard_F4 for the trial. Note the latest version of databricks use DeltaLake as the table format so pretty cool.

As we want to connect to twitter and event hubs through databricks notebook, we need to install the libraries/connectors for twitter api and event hubs. Of course this part can be run from other app/scripts, not necessarily from databricks notebook, especially the scraping of tweets can be done even outside of azure. So click on the newly created cluster, from “libraries”, install new. Choose PyPI as the source as we will be using Python for programming. Install:

            azure-eventhub

This is for connecting to azure event hubs .

Now, we are ready to program. From the WorkSpace, click create -> notebook, which is similar to Jupyter notebook where you can run scripts in cells. Put in the notebook name, select language, here use Python, and pick the cluster just created for running the notebook. The below script firstly defines the method for sending a list of events to Event Hubs. The connection string is required here by copying from the “shared access policies” of the event hub. Secondly it connects the Twitter api and passes on a query. The query results texts are sent as events to the Event Hub. The Twitter api also requires the client key and secret copied from the Twitter API developer portal.

from azure.eventhub.aio import EventHubProducerClient

from azure.eventhub import EventData

import base64

import requests

 

event_hub_name = "test-event-hub"

event_hub_conn = "Endpoint=sb://cleverfoxeventhubsnamespace.servicebus.windows.net/;SharedAccessKeyName=send-to-event-hub;SharedAccessKey=xxx=;EntityPath=test-event-hub"

 

async def send_events(events):

# event hub client

producer = EventHubProducerClient.from_connection_string(conn_str=event_hub_conn, eventhub_name=event_hub_name)

 

# a batch to hold events

event_data_batch = await producer.create_batch()

 

# Add events to the batch.

for event in events:

        event_data_batch.add(EventData(event))

 

# Send the batch of events to the event hub.

await producer.send_batch(event_data_batch)

 

 

client_key = 'Odk1YHrDVxxxxxxxxk3TfYATR'

client_secret = 'kYRiUVMVDI02xxxxxxxa0eIhqO56tmiAXLj'

 

key_secret = '{}:{}'.format(client_key, client_secret).encode('ascii')

b64_encoded_key = base64.b64encode(key_secret)

b64_encoded_key = b64_encoded_key.decode('ascii')

 

 

base_url = 'https://api.twitter.com/'

auth_url = '{}oauth2/token'.format(base_url)

 

auth_headers = {

'Authorization': 'Basic {}'.format(b64_encoded_key),

'Content-Type': 'application/x-www-form-urlencoded;charset=UTF-8'

}

 

auth_data = {

'grant_type': 'client_credentials'

}

 

auth_resp = requests.post(auth_url, headers=auth_headers, data=auth_data)

access_token = auth_resp.json()['access_token']

#print(auth_resp.status_code)

#print(access_token)

 

search_headers = {

'Authorization': 'Bearer {}'.format(access_token)

}

 

search_params = {

'q': 'brisbane',

'count': 20,

'tweet_model':'extended'

}

 

search_url = 'https://api.twitter.com/1.1/search/tweets.json'

search_resp = requests.get(search_url, headers=search_headers, params=search_params)

tweet_data = search_resp.json()

#print(search_resp.status_code)

 

events = []

for t in tweet_data['statuses']:

events.append(t['text'])

print(events)

await send_events(events)

 

print('done')

 

Press Shift+Enter / click Run Cell will run this piece of code and send some tweets to Event Hubs. To check the messages in the Event Hubs, create another cell to run the receiver script. Note the connection strings for send events and listen events can be different for an event hub.

from azure.eventhub.aio import EventHubConsumerClient

event_hub_name = "test-event-hub"

event_hub_conn = "Endpoint=sb://cleverfoxeventhubsnamespace.servicebus.windows.net/;SharedAccessKeyName=listen-to-event-hub;SharedAccessKey=cxxx=;EntityPath=test-event-hub"

 

async def on_event(partition_context, event):

# Print the event data.

print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

# Update the checkpoint so that the program doesn't read the events

await partition_context.update_checkpoint(event)

# Create a consumer client for the event hub.

client = EventHubConsumerClient.from_connection_string(event_hub_conn, consumer_group="$Default", eventhub_name=event_hub_name)

async with client:

# Call the receive method. Read from the beginning of the partition (starting_position: "-1")

await client.receive(on_event=on_eventstarting_position="-1")

 

Check https://pypi.org/project/azure-eventhub/ for more details on python coding for event hubs.