Getting Started on NYU Dataproc
What is Hadoop?
Hadoop is an open-source software framework for storing and processing big data in a distributed/parallel fashion on large clusters of commodity hardware. At its core, Hadoop strives to increase processing speed by increasing data locality (i.e., it moves computation to servers where the data is located). There are three components to Hadoop: HDFS (the Hadoop Distributed File System), the Hadoop implementation of MapReduce, and YARN (Yet Another Resource Negotiator; a scheduler).
What is Dataproc?
Dataproc is a cloud-based Hadoop distribution that is managed by Google. Google administers updates to Dataproc so that it is kept current. Google also packages and maintains additional software that can be run on top of Hadoop.
Additionally Dataproc includes other cloud-specific features, such as the ability to automatically add/remove nodes depending upon how busy the cluster is (autoscaling). It can also use object storage (GCS) or BigQuery as an alternative to HDFS, and provides integration with BigTable using HBase interfaces.
Autoscaling
NYU Dataproc is configured to be as cloud-agnostic as possible, and still uses HDFS and non-proprietary HBase components. It does, however, leverage autoscaling. This means that if the cluster hasn't been used for a while, it might take a while for resources to become available (typically 3-5 minutes) as nodes are turned on and NodeManagers register with YARN. During this time, the following warning message will appear and indicate that the cluster is at capacity and resources are not available:
WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
This warning will go away after new nodes have been added by autoscaling and more resources are available for YARN applications. If it does not go away after more than 10 minutes, please contact the HPC team. Autoscaling is actively monitored, but a duration of more than 10 minutes may indicate a failure of Dataproc's monitoring infrastructure.
Currently, NYU Dataproc's autoscaling is configured so that the cluster will have between 3 and 43 nodes depending upon demand. The number of nodes that are currently active can be seen in the YARN web UI. Additionally, percentage of cluster capacity that is used can be seen on the Scheduler page in the lefthand menu in the YARN web UI.
Accessing the NYU Dataproc Hadoop Cluster
Access to the NYU Dataproc cluster is granted via your NYU Google account. If you are in a class that uses Dataproc, your instructor or TA will request Dataproc access for your NetID.
Once this is granted you can log in by navigating to https://dataproc.hpc.nyu.edu/ssh in your web browser. After you’ve reached this page you will have access to a browser-based terminal interface where you can run Hadoop commands.
If you are having difficulty connecting to a terminal, please make sure that you are not logged in to a non-NYU Google account by clicking the icon displayed in the upper right corner in the Gmail web interface (see here). If you are using Google Chrome, you may also need to switch to your NYU account profile using the instructions here.
If you continue to have difficulties connecting via https://dataproc.hpc.nyu.edu/ssh, you can also log in by navigating to https://shell.cloud.google.com/ and running the following command in the terminal that appears:
gcloud compute ssh nyu-dataproc-m --project=hpc-dataproc-19b8 --zone=us-central1-f
You may need to authorize Google to log in to Dataproc after running the above command.
Once logged in, your username will be of the form <your_net_id>_nyu_edu rather than just your NetID (unlike Peel).
HDFS
HDFS stands for Hadoop Distributed File System. HDFS is a highly fault-tolerant file system and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for analyses that use large datasets.
File Permissions and Access Control Lists
You can share files with others using access control lists (ACLs). An ACL gives you per-file, per-directory and per-user control over who has permission to access files. You can see the ACL for a file or directory with the getfacl command:
$ hdfs dfs -getfacl /user/<net_id>_nyu_edu/testdir
# To modify permissions for files or directories, use setfacl:
$ hdfs dfs -setfacl -R -m user:<net_id>_nyu_edu:rwx /user/<net_id>_nyu_edu/testdir
$ hdfs dfs -setfacl -R -m default:user:<net_id>_nyu_edu:rwx /user/<net_id>_nyu_edu/testdir
To open the subdirectory permission to others, you need to open each higher level directory's navigation permission too:
$ hdfs dfs -setfacl -m user:<net_id>_nyu_edu:--x /user/<net_id>_nyu_edu
Uploading Data to HDFS from Your Computer
Small Transfers
You can add smaller files to HDFS by copying them to your local filesystem / non-HDFS home directory and then copying them from there to HDFS. Note that there is a limit on the size of your local filesystem home directory, so you should only perform these steps for smaller amounts of data- for larger datasets you should use the method described in the Large Transfers section.
Navigate to the command line interface by going to http://dataproc.hpc.nyu.edu/ssh.
In the upper right portion of the header banner, select Upload File. Use the web browser dialog to select your file.
Once the file is uploaded to your Unix directory, run the following command to copy it into HDFS:
hdfs dfs -put /home/<your_netid>_nyu_edu/<path_to_file> <hdfs_path>
To retrieve data from HDFS and copy it to your local filesystem home directory, you can use one of the following commands:
hdfs dfs -get <hdfs_path> /home/<your_netid>_nyu_edu/<path_to_file>
hdfs dfs -copyToLocal <hdfs_path> /home/<your_netid>_nyu_edu/<path_to_file>
You can then download data by going to the upper right corner of the window in the command line interface and selecting the Download File option and entering a file path (i.e., /home/<your_netid>_nyu_edu/<path_to_file>).
To list files in HDFS, use the following command:
hdfs dfs -ls
Large Transfers
To upload large datasets to HDFS, first navigate to the data ingest website at https://dataproc.hpc.nyu.edu/ingest. The data ingest website provides a web interface for temporary cloud-based bucket storage. Any datasets that are uploaded to the data ingest website will remain there for 2 days. Before these 2 days have elapsed, you will need to upload your datasets to your HDFS home directory. To do that you can use the following command after logging in:
hadoop distcp gs://nyu-dataproc-hdfs-ingest/<file_or_folder_name> /user/<your_net_id>_nyu_edu
You can find the full path to your file/folder in the ingest storage by clicking on it in the web interface and then scrolling down to the gsutil URI field in the Live Object tab.
Warning: Data uploaded into the ingest website will be visible to all members of the cluster temporarily. If you are uploading files that cannot be shared with all cluster users (e.g., code) please use the alternate method described below.
Uploading Data to HDFS from Greene
First download gcloud on a desktop computer with a browser by following the instructions here. Ensure that you run the install command described as optional in step 4.
Log into Greene and activate the Google Cloud command line interface module:
ml load google-cloud-sdk/379.0.0
Then log into Google Cloud by typing the following:
gcloud auth login
Copy and paste the command that you are given into a terminal application on your desktop and run it. When prompted, type y to proceed. If you are signed into multiple Google accounts, you will then be presented with a browser window where you can choose your account. Select your NYU account. Google will then present a message saying that “Google Cloud SDK wants to access your Google Account”. Click Allow.
Copy the URL that you are given in the terminal window, and paste it into your Greene session on the line where gcloud asks for it.
Type gcloud auth list to verify that you are logged in:
[jp6546@hlog-1 ~]$ gcloud auth list
Credentialed Accounts
ACTIVE ACCOUNT
* jp6546@nyu.edu
To set the active account, run:
$ gcloud config set account `ACCOUNT`
Now that you are logged in, use the instructions under the Small Transfers or Large Transfers sections below to upload your data.
Small Transfers
Run the following commands on Greene to ensure that gcloud knows that you are using it with Dataproc (rather than a different Google Cloud application):
gcloud config set project hpc-dataproc-19b8
gcloud config set compute/zone us-central1-f
Run the following command on Greene to upload your file to your filesystem home directory on Dataproc:
gcloud compute scp MYFILE nyu-dataproc-m:~
If you are prompted to give a passphrase while generating an SSH key, hit enter twice. The above commands are the command line equivalent to the upload dialogue at http://dataproc.hpc.nyu.edu/ssh that is mentioned earlier.
Your file should now be available within your filesystem home directory on Dataproc. You can then run the following command to get it into HDFS:
hdfs dfs -put /home/<your_netid>_nyu_edu/<path_to_file> <hdfs_path>
Large Transfers
On Greene, run the following to upload a single file to the staging bucket:
gsutil cp FILE gs://nyu-dataproc-hdfs-ingest
Or run the following to copy a directory:
gsutil rsync -r DIRECTORY gs://nyu-dataproc-hdfs-ingest
The above commands are the command line equivalents to the data ingest website described earlier.
As with the earlier example, run the following from within Dataproc to ingest the dataset into your HDFS home directory:
hadoop distcp gs://nyu-dataproc-hdfs-ingest/<file_or_folder_name> /user/<your_net_id>_nyu_edu
Computation on Dataproc
MapReduce
What Is MapReduce?
MapReduce is a programming model and an associated implementation for processing and generating large datasets with a parallel, distributed algorithm on a cluster.
A MapReduce job splits a large dataset into independent chunks and organizes them into key-value pairs for parallel processing. The mapping and reducing functions receive not just values, but (key, value) pairs.
Every MapReduce job consists of at least two parts:
The Mapper
The Reducer
Mapping Phase: Takes input as <key,value> pairs, processes them, and produces another set of intermediate <key,value> pairs as output.
Reducing Phase: Reducing lets you aggregate values together. A reducer function receives an iterator of input values from an input list. It then combines these values together, returning a single output value.
MapReduce Word Count Example
Ingest a text file into HDFS. In the below example, the hdfs -put command is combined with another Unix command (curl) to send a copy of a Sherlock Holmes book (located at the URL https://www.gutenberg.org/files/1661/1661-0.txt) directly into HDFS:
curl https://www.gutenberg.org/files/1661/1661-0.txt | hdfs dfs -put - /user/<netid>_nyu_edu/input.txt
The following command will run an example Word Count job (described in more detail here) with the Sherlock Holmes book as its input.
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar wordcount -D mapreduce.job.maps=6 -D mapreduce.job.reduces=2 /user/<netid>_nyu_edu/input.txt /user/<netid>_nyu_edu/output
Spark
What is Spark?
Apache Spark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. (source: Wikipedia)
Launching an Interactive Spark Shell
Spark provides an interactive shell that you can use to learn the Spark API and analyze datasets interactively. To connect to Spark Shell from the command line, execute the following command:
spark-shell --deploy-mode client --num-executors=1 --driver-memory=1G --executor-memory=1GNote: NYU Dataproc deploys Spark applications in cluster mode by default. The following error indicates that you are trying to deploy an interactive shell, which must use client mode:
Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode is not applicable to Spark shells.
To resolve this error, either use the command line flag indicated above (--deploy-mode client) or set the spark.submit.deployMode property in your Spark configuration to client. More details about the difference between cluster and client mode can be found here.
YARN Scheduler
YARN is the resource manager and job scheduler in the Hadoop cluster. YARN allows you to use various data processing engines for batch, interactive, and real-time stream processing of data stored in HDFS.
Application status and logs
Please find the list of current running apps using 'Yarn' script. Running the yarn script without any arguments prints the description for all commands.
$ yarn application -listTo kill a currently running app because the submitted app started malfunctioning or in worst case scenario, it's stuck in an infinite loop. Get the app ID and then kill it as given below
$ yarn application -kill <application_ID>To download application logs for examination on the command line
$ yarn logs -applicationId <application_ID>Using Hive
Apache Hive is a data warehouse software package that facilitates querying and managing large datasets residing in distributed storage (i.e., HDFS). Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called Hive Query Language (HiveQL or HQL).
You can access Hive with the following command:
beeline -u jdbc:hive2://localhost:10000
Type "!quit", to exit properly from a beeline session.
If you are planning on using SerDe to query/work with JSON files, you will need to run the following code at the Beeline prompt first in order to ensure that the JsonSerDe class is loaded:
ADD JAR /usr/lib/hive/lib/hive-hcatalog-core.jar
See here for more information.
Access to Hive databases on NYU Dataproc is derived from HDFS permissions because we use Storage-Based Authorization . To grant read-only access to a Hive database to someone other than yourself, you can run the following command:
hdfs dfs -setfacl -R -m user:<OTHER_PERSON_NETID>_nyu_edu:r-x /user/hive/warehouse/<YOUR_NETID>_nyu_edu.db
Outside of NYU, other Hadoop installations may use a different mechanism to share databases with other colleagues– it is common for Hadoop installations to use a SQL style grant/revoke mechanism for sharing databases ( SQL Standards Based Authorization ). This mechanism is not used at NYU and it is important to bear in mind that external documentation referring to grant/revoke statements is not applicable to NYU Dataproc.
Using Presto
Presto is an open source distributed SQL query engine (similar to Apache Impala) created by Facebook. To access Presto, you can type the following command:
presto
Once you are inside, you can reference multiple data sources through catalogs (see here). For instance, you may want to query Hive using Presto. You can select a database to use through a fully-qualified database or table name as shown in the code below:
presto> show catalogs;
Catalog
----------------------
bigquery
bigquery_public_data
hive
memory
system
tpcds
tpch
(7 rows)
Query 20220826_155243_00000_m7z3s, FINISHED, 3 nodes
Splits: 53 total, 53 done (100.00%)
1.73 [0 rows, 0B] [0 rows/s, 0B/s]
presto> use hive.<netid>_nyu_edu;
USE
You can also specify a catalog / data source that you want to use on the command line when you start Presto:
presto --catalog=hive
Using Conda
NYU Dataproc comes with miniconda3 by default. This can be used to manage Python packages within your filesystem home directory. The main difference between using conda on NYU Dataproc and using conda on Peel is that conda is enabled on NYU Dataproc by default, so you do not need to use the module command to load it. See here or here for more information on the conda command.
Using Jupyter Notebooks
Log into the Dataproc cluster and run jupyter-notebook. Do not close the command line interface where jupyter-notebook is running until you're done using Jupyter.
From the output produced by jupyter-notebook, obtain the port number that the notebook is running on. In the example below, for instance, the notebook is running on port 8888:
To access the notebook, open this file in a browser:
file:///home/jp6546_nyu_edu/.local/share/jupyter/runtime/nbserver-7866-open.html
Or copy and paste one of these URLs:
http://localhost:8888/?token=90d9c6297ba2c963cdb998ae374041384bac71c781b18ed1
or http://127.0.0.1:8888/?token=90d9c6297ba2c963cdb998ae374041384bac71c781b18ed1
On an individual workstation that has the gcloud command installed (installation instructions for gcloud can be found here), run the following command (with PORT replaced with the port number from step 2):
gcloud compute ssh nyu-dataproc-m --project hpc-dataproc-19b8 --zone us-central1-f -- -N -L PORT:localhost:PORT
In our example, from the output in step 2 this command would be as follows:
gcloud compute ssh nyu-dataproc-m --project hpc-dataproc-19b8 --zone us-central1-f -- -N -L 8888:localhost:8888
You can then use the URLs from the jupyter-notebook output in step 2 (e.g., http://localhost:8888/?token=90d9c6297ba2c963cdb998ae374041384bac71c781b18ed1) to access your notebook from the workstation.
When you are done, you can exit the terminals where the jupyter-notebook and the gcloud commands are running.
Using Zeppelin Notebooks
Log into the Dataproc cluster and run zeppelin start. The terminal will output three pieces of information that you will need later: username, password, and port number. The output should look something like this:
jp6546_nyu_edu@nyu-dataproc-m:~$ zeppelin start
Zeppelin is starting with the following configuration:
-------------------------------------------------------
Username: jp6546_nyu_edu
Password: REDACTED
Port: 64739
Zeppelin start [ OK ]
On an individual workstation that has the gcloud command installed (installation instructions for gcloud can be found here), run the following command (with PORT replaced with the port number from step 1):
gcloud compute ssh nyu-dataproc-m --project hpc-dataproc-19b8 --zone us-central1-f -- -N -L PORT:localhost:PORT
In our example this command would be as follows using the output from step 1 :
gcloud compute ssh nyu-dataproc-m --project hpc-dataproc-19b8 --zone us-central1-f -- -N -L 64739:localhost:64739
In a web browser, navigate to localhost:PORT.
Log in by clicking the Login button in the upper right corner. Use your credentials from step 1.
If you forget the password or the port number at any time, you can run the following commands to retrieve this information:
zeppelin get-port
zeppelin get-pass
When you are finished, run "zeppelin stop" to turn off the Zeppelin server.