[Spark-Kafka-Node.js] Twitter Sentiment Analysis Demo Sample App Code

On Tuesday, we walked through how to build a cluster with our Sentiment Analysis Sample application and how to get the app running. Today’s article is a deep dive into the code behind the app, and gives some basic pointers on how to use Spark to build applications like this easily. Tomorrow, we’ll dive into the frontend code to see how to consume the data we produced here and how to make it useful to the end user.

Our demo app consists of only a few pieces of technology: Apache Spark Streaming, Apache Kafka and js.

We take advantage of Spark’s bundled Twitter Streaming library to stream new tweets from Twitter’s API as they come in, analyze them using the algorithm we’ll go into in a minute, then post those scored tweets to a topic on an Apache Kafka message queue running in the cluster. From there, the node.js application simply monitors that topic in Kafka and streams the data to the website, which in turn generates a real-time graph as data is received.

Spark Streaming

If you strip the streaming code down to the core important bits, it’s relatively easy to follow. You can follow along with the source code on github.

We’ll walk through the code, bit by bit now. The code in this document isn’t necessarily in the same order as on github; it’s been reformatted and regrouped so it makes sense for explanation. The actual code is unchanged, but where it makes sense to show a function next to invocation of that function, the code has been reorganized, for example.

First, convert the keyword list to a token set using the same algorithm that will be used to tokenize incoming tweets into words. A tokenizer is just a generic name for an algorithm that breaks down a larger piece of text into individual pieces, or tokens, that are used to analyze text. Things like frequency of occurrence, co-occurrence (i.e. two words seen together), etc., can impact the results of the algorithm. Here’s our basic tokenizer:

This tokenizer is pretty bare-bones and could be vastly improved if we had a specific case we were trying to write this for. But since it’s just a generic sample app, we kept it as broad as possible, meaning it does an okay job for a lot of use cases, but not a great job for any particular one. We simply convert the text to individual English words using an analyzer from the Lucene project. If we wanted to improve it, we could parse out specific emoticons, give extra weight to particular punctuation (like exclamation marks), try to handle other languages, etc. All of these affect performance, so you have to tweak and iterate until you hit the sweet spot for your application.

Stemming is a process of converting related words into a common stem. So things like ran, running and run all become ‘run.’ This is a common technique used in search engines, although the major search engines have many more tricks up their sleeves these days. Here we convert the words we got back into their stems, and then generate a map of stem to keyword.

Since we used the stems as the keys, we convert that list of keys into a set to get a list of unique tokens. We’re doing this with the list of keywords we configured in the application in our previous article, so we have a group of keyword stems to compare incoming tweets against. That way, we know which group to show the scores under in the graph. We can give Twitter a list of keywords to match against, but they don’t tell us which one(s) were matched to the tweets they stream to us, so we have to figure that out ourselves.

The next step is to prepare and train the data model that will compare incoming tweets to. We gathered a couple of public data sets that have been pre-categorized as having a positive or negative sentiment, and make those available inside your cluster as part of the post-init script we installed as part of the cluster create in the previous article.

We simply load the existing tweet data (from CSV files) into a SQL Context. This is one of the powerful things about Spark. We have CSV data, but we can now execute SQL queries against it without any ETL conversions. We simply tell Spark to generate a schema from the header row in the CSV file, register that as a ‘temp’ table (i.e. in-memory) and then it will process individual rows of the file as separate records in the table, in parallel, across your cluster.

We now run the actual SQL query against our CSV data and get two data frame objects representing each data set. In the first query we select *, but we know from the second that our columns are id, sentiment, query and text. We really only care about sentiment and text. Since both data frames have the same schema, we can combine them into a single data frame using unionAll().

Now we convert that combined Data Frame into an RDD of tuples, each containing the sentiment score and the tweet associated with that score. If you recall from above, columns 1 and 3 are ‘sentiment’ and ‘text.’ Columns are 0-indexed, like arrays, so column 1 is really the 2nd entry in the row, and 3 is the 4th.

Transform the RDD so the sentiment score is an integer value, and break the tweet into individual tokens (words) with the same algorithm we are using everywhere else to convert text into tokens.

The HashingTF() is an object that transforms strings into hashes. TF is short for Transform, iirc. Think of this as basically an md5sum object. We’re going to hash all the tokens to train the model. We’ll do the same with incoming tweets to do predictions against the model.

We now generate a LabeledPoint object using our ‘label’ (i.e. positive or negative sentiment, represented by 1 and 0, respectively) and the hashed list of word stems that we generated from the text of the tweet. The prediction algorithm we use next knows how to read this format and do something useful with it. The important part to remember is that the ‘label’ can really be anything, but for our case, it’s just the positive or negative sentiment that we’re looking for.

This is the only really complicated part of the whole job, and I’ll be a bit hand-wavey because I don’t personally fully understand how the classifier algorithm works. What this basically does is train our model using a NaiveBayes classifier, telling it which keywords are associated most commonly with each sentiment so that when we feed it the incoming tweets later, it has a good reference point to compare them to. This is the bulk of the work for Sentiment Analysis, and also the area where we rely most heavily on Spark MLLib as we aren’t experts in this domain.

This little bit of code isn’t strictly necessary. It was added in during testing to verify that the model training was successful by taking a subset of the data (30 percent of it) and using our algorithm to generate a new score for that data, then compare it to the known scores. We will likely remove this in the future as it just adds startup time costs with no real-time benefit, since we’ve already verified the algorithm produces predictions that match known data.

This does two things: prepare the model using the function above, and broadcast that trained model.

Establish the Spark Streaming Context from Twitter:

These credentials need to be configured in the config file for the application. We touched on this briefly in the last article, but there is also more info in the README.

The ‘twitterRefresh’ variable controls how many seconds worth of tweets to gather in each iteration. Basically it buffers for this long, then sends that data to Spark as a group of partitions that contain a number of individual tweets.

This is a comma-separated list of keywords we want to watch for on Twitter. We covered the bit where we generate the token list from these keywords, this is just how we pass that list to Twitter.

This causes Spark Streaming to listen for incoming tweets from the Twitter Streaming API, and then gracefully shutdown if a kill signal is received or the streaming job exits for any reason. This is boilerplate setup/teardown for any Spark Streaming job.

Stream the tweets using Spark Streaming:

The actual inner loop functionality is elided here to showcase how the loop works. This breaks down the Twitter Stream RDD into individual partitions. By default, Spark Streaming will convert the Twitter stream into partitions of a particular chunk size and process them separately in individual executors in order to parallelize the load. Each iteration of foreachPartition() will be executed in an executor process. If the number of partitions exceeds the number of executors, then the extra partitions will queue up until an executor is freed. Here we establish some partition-level variables that get shared in-memory in each executor process. Think of these as process globals for the executor processes:

We don’t want to create the KafkaProducer object every time we process a tweet, but rather once per partition of data received. It’s a minor optimization, but a useful one.

Again, the main body of code is elided to show the loop structure on its own. When we loop the partition RDD we get the individual Twitter4J.Status objects to process. This wasn’t obvious to me at first. I expected there to be only be two loops, one for looping through the StreamRDD and then through the individual tweets. I didn’t expect the partitions abstraction to be there. That’s something to be aware of if you’re building a streaming application. The incoming stream RDD is made up of partitions, which are in turn made up of individual items.

Next, convert the tweet to a set of words for comparison to the model we built earlier:

We convert the incoming tweet to a list of tokens (word stems) and then get the intersection of those with the keywords we’re watching to determine which keywords apply to this tweet. This is not bulletproof, as Twitter is smarter about including tweets in the list if they link to an article that references the keyword we’re watching for, or if that word is used in hashtags, etc. We could definitely make this smarter if we were doing this for real, or Twitter could actually tell us which keywords were matched in the tweets, since they already have that data.

Now it’s time to score the incoming tweet and generate a JSON document of the tweet details and the score. You’ll notice we apply the same hashing algorithm to the incoming tweet tokens.

This is what determines the sentiment of the incoming tweet as compared to the trained model we produced earlier from the pre-categorized data sets. You’ll notice we use broadcastedModel.value to get the underlying NaiveBayesModel object rather than the wrapper object that sc.broadcast() produced. If we had not done a sc.broadcast() the model would only be available here if it implemented the Java Serializable interface, and it would be sent across the network on every iteration of the streaming loop, which would be super inefficient as it’s a rather large object. Since that model is trained on the sample data we loaded from HDFS, we now just call model.predict() on it to predict the sentiment of the incoming tweet.

Now on to the JSON document generation. This basically converts the Twitter4J.Status object to a JSON object that we can send to Kafka. Kafka doesn’t know JSON; it takes in raw bytes of whatever. We do have to convert the object to a JSON string first, and then decode it on the other end. We could use any serialization format, but we chose JSON because the other end is a Javascript app.

We originally planned to include a heat map in the UI, to show where on the map the tweets were coming from. So we added the geolocation data to the outgoing JSON. But, as we discovered in our testing, something like maybe 1 percent of tweets include this data, so we ended up discarding the heatmap from the UI. The remaining bits are all fairly self-explanatory, I think.

Now we have a JSONObject representing the tweet, with the ‘score’ and ‘keywords’ added. We’ll convert it to raw text when we push it to Kafka later.

Next, send that JSON document to Kafka to be picked up by the node.js app:

The bit with the counter and determining the partition manually is due to the default Kafka behavior not aligning with our use-case. Kafka will automatically pick a partition for you if you don’t send it one, but the way it load-balances this is to round-robin based on how long the KafkaProducer object has been alive. Since this gets re-recreated on each partition, we end up hitting one partition a lot more frequently than the others. So instead, we round-robin the partitions per incoming tweet.

This actually sends the JSON text to Kafka. From there, it’s all up to node.js to show the information to the end-user. That’s the end of the Spark Streaming application. Besides having to know enough about ML to know which algorithms to use and how to prepare the data for that algorithm, it’s really not that complicated. Getting really accurate results would take a bit more work and a lot of domain knowledge to know how to better tokenize the tweets and train the model for your use case.

And there you have it. That’s all there is to analyzing the incoming data and putting it into a format that’s consumable by a frontend application. I realize I kind of skimmed the surface in a few places, but I hope this helps show how the Sentiment Analysis Sample Application works, and given some insight in general about how Spark works and how to architect applications of this sort.  

Tune back in tomorrow, when we’ll unpack the frontend code, showing how to consume the data we produced here and make it useful to the end user.