Update (April 30):
1) You are allowed for data pre-processing, but you need to consider performance as well, e.g., pre-processing should not be a bottleneck.
2) You are allowed to use a sample dataset for your local Pseudo-Distributed Hadoop And Spark environment if resource on your local computer's resource is a constraint (you need to justify that in your README file).
Please start early. Although it may seem trivial, the programming aspect might be time sensitive and especially debugging is NOT easy.
Total credit: 35; extra credit: 15 (The credits will be normalized to 10 + 4.3 eventually)
This is just to warm you up. In reality, Hadoop and Spark are usually deployed over actual distributed cluster nodes. However, for the purpose of this course and to get an idea of the environment, we're starting with a local single node Hadoop and Spark setup (HDP sandbox, Cloudera quickstart VM or Docker).
Even in real world, it is always a good idea to start small and build - Keep It Simple, Stupid (KISS Principle)
Twitter data, rich in real-time user-generated content, has become a valuable resource for a wide range of analytical applications across industries. Each tweet carries not just text but also metadata like timestamps, user information, geolocation, hashtags, and interaction metrics such as retweets and likes. This makes Twitter a powerful tool for sentiment analysis, trend detection, event monitoring, and user behavior modeling. Researchers use it to study public opinion on politics or health, businesses analyze it for brand perception and customer feedback, and journalists track breaking news or misinformation. With its volume, velocity, and variety, Twitter data enables both exploratory as well as predictive analytics in fields ranging from marketing to disaster response.
Twitter was recently rebranded to X with many of its free APIs and dataset only available after paying an associated cost. Despite this, we still have access to past datasets to get a glimpse of analysis use cases that can be performed on this rich resource.
Task-1 Building on the simple WordCount example done in class and tutorial, your task is to perform simple processing on the provided Twitter dataset. Write code to find the number of distribution of distinct languages of tweets in the dataset
Program arguments
The path to your input dataset
The output path for your program
Output format
language <TAB> count
# Example
en 11000
in 5966
it 6789
Task-2 Given an hour of day and an integer N, identify top N trending hashtags from the whole dataset. Please note that this task will have two separate output files.
Program arguments
The path to your input dataset
Hour of day (0-23)
N
The output path for your program
Output-1 format
start date of whole dataset
end date of whole dataset
hour of day
Output-2 format
hashtag-1
hashtag-2
...
Task-3 Automated tweet bots have become a norm in today's social media. While there are legitamate use cases where bots are useful e.g. customer services, reminders and alerts etc. there are many grey areas such as follower automation, retweeting etc. where the use of these bots are questionable. This task requires you to write code to identify potential bots given the raw tweet datasets. Some common characteristics of identifying bots are (1) High tweet frequency, and (2) Identical or similar tweets. In addition to identifying the IDs of potential bots, also identify the most common location and device they tweet from. Please note there are multiple ways to identify tweet location from the tweet data.
Program arguments
The path to your input dataset
The output path for your program (NOTE: You should remove the output path after every execution of your program. Hadoop cannot start a job if output directory is already created)
Output format
id <TAB> most_common_location <TAB> most_common_device
[35 points] For task-1 and task-2, you will need to write three separate codes to perform the analysis. You can choose any one of the three coding languages i.e. Java, Python or Scala.
[5 + 5] Non-distributed conventional code. Task-1-non-distributed.{java, py, scala} and Task-2-non-distributed.{java, py, scala}
[5 + 5] Distributed Hadoop code. Task-1-Hadoop.{java, py, scala} and Task-2-Hadoop.{java, py, scala}
[5 + 5] Distributed Spark code. Task-1-Spark.{java, py, scala} and Task-2-Spark.{java, py, scala}
[5] In addition to the code files, you are also required to submit the following analysis files
Hardware setup hardware.txt with the following entries
[local setup]
number of CPUs/cores
amount of memory in GBs
[GCP setup]
number of worker nodes
number of vCPUs per worker nodes
amount of memory in GBs per worker node
Performance analysis file performance.txt containing run times for the following cases. All timing numbers should be in seconds
[Task-1]
non-distributed/local, Hadoop-on-GCP, Spark-on-GCP
[Task-2]
non-distributed/local, Hadoop-on-GCP, Spark-on-GCP
2. [15 points] For task-3, you will need to write only Spark code. You can choose any one of the three coding languages i.e. Java, Python or Scala.
Submit your code as Task-3-Spark.{java, py, scala}
In addition to code file, also submit the output file containing IDs of potential bots from the bigger dataset (~5 million tweets) as Task-3-bots.txt
Zip above files as submission.zip and submit it at BrightSpace.
The dataset is based on a snapshot of Twitter (X) data recorded on a particular hour of a single day in September, 2021. Please note that the free API only exposed about 1% of the actual data stream received on the platform.
In general, extensive parsing and cleaning is required to extract information of interest from real world datasets. To focus on Hadoop/Spark working, instead of spending your energies on data parsing, we provide you with abridged version of the dataset.
The dataset is in JSON format. Each entry in the JSON file corresponds to a single raw tweet. Following is a sample of one such tweet showing the properties
{
"created_at": "Sat Sep 25 03:35:02 +0000 2021",
"id": 1441607146806726663,
"id_str": "1441607146806726663",
"text": "@narendramodi \u0917\u0930\u0940\u092c\u094b\u0902 \u0915\u093e \u0915\u0932\u094d\u092f\u093e\u0923 \u0915\u0930\u0947\u0917\u0940\n\u092c\u0940\u091c\u0947\u092a\u0940 \u0906\u0917\u0947 \u0939\u0940 \u092c\u0922\u093c\u0947\u0917\u0940\u0964\n\ud83d\udc4d\ud83d\ude4f",
"display_text_range": [
14,
61
],
"source": "<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>",
"truncated": false,
"in_reply_to_status_id": 1441602984513785857,
"in_reply_to_status_id_str": "1441602984513785857",
"in_reply_to_user_id": 18839785,
"in_reply_to_user_id_str": "18839785",
"in_reply_to_screen_name": "narendramodi",
"user": {
"id": 2578538096,
"id_str": "2578538096",
"name": "Ashwani Kumar Bhagi",
"screen_name": "akbhagi",
"location": "Lucknow.gomti nagar",
"url": null,
"description": "some description ...",
"translator_type": "none",
"protected": false,
"verified": false,
"followers_count": 100,
"friends_count": 5,
"listed_count": 0,
"favourites_count": 18285,
"statuses_count": 18730,
"created_at": "Fri Jun 20 11:48:15 +0000 2014",
"utc_offset": null,
"time_zone": null,
"geo_enabled": true,
"lang": null,
"contributors_enabled": false,
"is_translator": false,
"profile_background_color": "C0DEED",
"profile_background_image_url": "http://abs.twimg.com/images/themes/theme1/bg.png",
"profile_background_image_url_https": "https://abs.twimg.com/images/themes/theme1/bg.png",
"profile_background_tile": false,
"profile_link_color": "1DA1F2",
"profile_sidebar_border_color": "C0DEED",
"profile_sidebar_fill_color": "DDEEF6",
"profile_text_color": "333333",
"profile_use_background_image": true,
"profile_image_url": "http://pbs.twimg.com/profile_images/1426815493579501568/8X57mFIC_normal.jpg",
"profile_image_url_https": "https://pbs.twimg.com/profile_images/1426815493579501568/8X57mFIC_normal.jpg",
"profile_banner_url": "https://pbs.twimg.com/profile_banners/2578538096/1629014202",
"default_profile": true,
"default_profile_image": false,
"following": null,
"follow_request_sent": null,
"notifications": null,
"withheld_in_countries": []
},
"geo": null,
"coordinates": null,
"place": {
"id": "74effb34539b2d64",
"url": "https://api.twitter.com/1.1/geo/id/74effb34539b2d64.json",
"place_type": "city",
"name": "Lucknow",
"full_name": "Lucknow, India",
"country_code": "IN",
"country": "India",
"bounding_box": {
"type": "Polygon",
"coordinates": [
[
[
80.658613,
26.642291
],
[
80.658613,
27.008048
],
[
81.110202,
27.008048
],
[
81.110202,
26.642291
]
]
]
},
"attributes": {}
},
"contributors": null,
"is_quote_status": false,
"quote_count": 0,
"reply_count": 0,
"retweet_count": 0,
"favorite_count": 0,
"entities": {
"hashtags": [],
"urls": [],
"user_mentions": [
{
"screen_name": "narendramodi",
"name": "Narendra Modi",
"id": 18839785,
"id_str": "18839785",
"indices": [
0,
13
]
}
],
"symbols": []
},
"favorited": false,
"retweeted": false,
"filter_level": "low",
"lang": "hi",
"timestamp_ms": "1632540902221"
}
You can download sample dataset from [https://drive.google.com/file/d/1qkoK_aYWPfgmIyhRDtLrXtHonlUJA3Yp/view?usp=share_link] (~145 MB compressed, 250,000 raw tweets).
The larger dataset can be accessed using the following path. You don't have to download or upload it. It's already uploaded to a public Google Cloud Storage bucket which you can give as an input to your code. (~3 GB compressed, 5 million tweets)
gs://cse532-spring25/data_sample.zip
If you have difficulty to access the bucket file, you can download the original data file at https://figshare.com/articles/dataset/Data_Sample_for_Mapping_Dynamic_Human_Sentiments_of_Heat_Exposure_with_Location-Based_Social_Media_Data_/21780065?file=38650880 with download link (refer to this on downloading web data).
To execute Hadoop jobs using Java API (Terminal Commands)
hadoop jar Twitter.jar Twitter_1 /cse532-s23/input/250000-tweets-2021-09-25_01-48-23.json /cse532-s23/output/
hadoop jar Twitter.jar Twitter_3 /cse532-s23/input/250000-tweets-2021-09-25_01-48-23.json 2011 /cse532-s23/output/
To view output (Terminal Commands)
(list contents of HDFS output directory)
hdfs dfs -ls /cse532-s23/output/
(print out the contents of output files to terminal)
hdfs dfs -cat /cse532-s23/output/part-*
To execute Spark jobs using Java API (Terminal Commands)
spark-submit --class Twitter_1 Twitter.jar /cse532-s23/input/250000-tweets-2021-09-25_01-48-23.json /cse532-s23/output/
spark-submit --class Twitter_1 --master local[2] Twitter.jar /cse532-s23/input/250000-tweets-2021-09-25_01-48-23.json /cse532-s23/output/
spark-submit --class Twitter_3 Twitter.jar /cse532-s23/input/250000-tweets-2021-09-25_01-48-23.json 2011 /cse532-s23/output/
To execute Spark jobs using Python API (Terminal Commands)
spark-submit Twitter_1.py /cse532-s23/input/250000-tweets-2021-09-25_01-48-23.json /cse532-s23/output/
spark-submit --master local[2] Twitter_1.py /cse532-s23/input/250000-tweets-2021-09-25_01-48-23.json /cse532-s23/output/
spark-submit Twitter_2.py /cse532-s23/input/250000-tweets-2021-09-25_01-48-23.json 2011 /cse532-s23/output/
In real world, distributed systems such as Hadoop and Spark come in handy when you have to process huge amounts of data. Processing such data on local machines is prohibitive unless you are working on Super Computers. The good news, however, is that the code written for pseudo distributed mode can safely be considered deployable (in most cases) on actual distributed cluster without much modifications.
Use a directory that doesn't exist on Cloud Storage bucket. Otherwise, your job will fail. If you would like to reuse the output directory, please delete the directory from Cloud Storage console.
It is highly recommended that you test your program locally (docker) before testing it on the bigger data set.
Once you successfully redeem Google Cloud credits, the first step is to create project.
Name your project
For Billing account, select "Billing Account for Education"
For Organization, select "cs.stonybrook.edu"
Click "Create"
Enable Dataproc API
From the sidebar, look for and select "Dataproc"
Enable Dataproc API (It may take a little while)
Once enabled, select "Dataproc" and "Clusters" from the sidebar
Create Cluster
Select "Cluster on Compute Engine"
On the first screen after cluster "Create", under "Setup Cluster" sub-menu
Name your cluster
Check "Enable component gateway" if you want to utilize working with Jupyter Notebook in Python. Otherwise leave it unchecked
Leave everything else as it is
Following the master-slave architecture, we will be working with 1-master-2-workers cluster
Under "Configure Nodes" sub-menu
Education credits limit the total number of cores (vCPUs) that can be provisioned in a cluster to 8. Therefore, we will need to customize the machine types we use for our cluster
For Master (Manager) node
Select "n2-standard-2" as machine type
Storage is one of the major service that can cost you money on cloud especially when working with distributed clusters
The amount of storage you configure your cluster nodes to use will be provisioned for you and you will get charged for it whether you actually use it or not
For Master (Manager) node
Change "Primary Disk Size" to less than 100 GB
Configure the cluster to use 2 worker nodes
Configure same machine type and disk size for worker nodes
Similar to storage, you get charged for any active cluster that you've provisioned whether you submit a job to it or not
This can result in unforeseen charges to you (the customer)
To avoid this, a very IMPORTANT step when working with cloud resources is to schedule appropriate termination mechanisms
Under "Customize Cluster" sub-menu, scroll down to "Scheduled Deletion"
Select "Delete after a cluster idle time period without submitted jobs"
Specify 1 hour idle time
Leave rest of the settings as is and proceed to "Create" the cluster
It may take some time for GCP to provision and create your configured cluster
GCP Storage
While the cluster is being provisioned, another important aspect of any workflow is to upload data to appropriate cloud storage so the cluster can access and process it
From the sidebar, select "Cloud Storage" and "Buckets"
You may notice that there are a few pre-created buckets in the Cloud Storage
These are buckets created by Dataproc to write logs and outputs of the cluster that we configured in the previous step
Name your bucket and let all other settings as is
Go ahead and create the bucket
Inside the bucket, you can create "folder/directories"
For the purpose of this project, your storage bucket will essentially contain your uploaded compiled code (Java, Scala) or scripts (Python) that contains Hadoop/Spark logic to be executed on the cluster resources
Submitting Job
Your cluster may have be created and active till now, the next step is to submit the job to the cluster
Click on the cluster once it is active
In addition to other cluster properties, there is a "Submit Job" button at the top
Clicking on "Submit Job" will open up a side window with option to create the job
Leave "Job ID" as is
For "Job Type", select "Hadoop", "Spark", "PySpark" or "SparkSql" based on your implementation
Remaining options will change based on your selected "Job Type". For the purposes of this assignment, the important options are
JAR type or Python script
Arguments
Remember both of these options should be paths to your code or data on "Cloud Storage"
Both of these paths starts with gs://
Path for a particular file can be copied from Cloud Storage by clicking on the particular file
Under "Live Object" tab
"Overview" Section
"gsutil URI" property