Getting Started on NYU Dataproc

What is Hadoop?

Hadoop is an open-source software framework for storing and processing big data in a distributed/parallel fashion on large clusters of commodity hardware. At its core, Hadoop strives to increase processing speed by increasing data locality (i.e., it moves computation to servers where the data is located).   There are three components to Hadoop: HDFS (the Hadoop Distributed File System), the Hadoop implementation of MapReduce, and YARN (Yet Another Resource Negotiator; a scheduler).

What is Dataproc?

Dataproc is a cloud-based Hadoop distribution that is managed by Google.  Google administers updates to Dataproc so that it is kept current.  Google also packages and maintains additional software that can be run on top of Hadoop.

Additionally Dataproc includes other cloud-specific features, such as the ability to automatically add/remove nodes depending upon how busy the cluster is (autoscaling).  It can also use object storage (GCS) or BigQuery as an alternative to HDFS, and provides integration with BigTable using HBase interfaces.

Autoscaling

NYU Dataproc is configured to be as cloud-agnostic as possible, and still uses HDFS and non-proprietary HBase components.  It does, however, leverage autoscaling.  This means that if the cluster hasn't been used for a while, it might take a while for resources to become available (typically 3-5 minutes) as nodes are turned on and NodeManagers register with YARN.  During this time, the following warning message will appear and indicate that the cluster is at capacity and resources are not available:

WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources

This warning will go away after new nodes have been added by autoscaling and more resources are available for YARN applications.  If it does not go away after more than 10 minutes, please contact the HPC team.  Autoscaling is actively monitored, but a duration of more than 10 minutes may indicate a failure of  Dataproc's monitoring infrastructure.

Currently, NYU Dataproc's autoscaling is configured so that the cluster will have between 3 and 43 nodes depending upon demand.  The number of nodes that are currently active can be seen in the YARN web UI.  Additionally, percentage of cluster capacity that is used can be seen on the Scheduler page in the lefthand menu in the YARN web UI.

Accessing the NYU Dataproc Hadoop Cluster

Access to the NYU Dataproc cluster is granted via your NYU Google account.  If you are in a class that uses Dataproc, your instructor or TA will request Dataproc access for your NetID.  

Once this is granted you can log in by navigating to https://dataproc.hpc.nyu.edu/ssh in your web browser.  After you’ve reached this page you will have access to a browser-based terminal interface where you can run Hadoop commands.  

If you are having difficulty connecting to a terminal, please make sure that you are not logged in to a non-NYU Google account by clicking the icon displayed in the upper right corner in the Gmail web interface (see here).  If you are using Google Chrome, you may also need to switch to your NYU account profile using the instructions here.

If you continue to have difficulties connecting via https://dataproc.hpc.nyu.edu/ssh, you can also log in by navigating to https://shell.cloud.google.com/ and running the following command in the terminal that appears:

gcloud compute ssh nyu-dataproc-m --project=hpc-dataproc-19b8 --zone=us-central1-f

You may need to authorize Google to log in to Dataproc after running the above command.

Once logged in, your username will be of the form <your_net_id>_nyu_edu rather than just your NetID (unlike Peel).

HDFS

HDFS stands for Hadoop Distributed File System. HDFS is a highly fault-tolerant file system and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for analyses that use large datasets.

File Permissions and Access Control Lists

You can share files with others using access control lists (ACLs). An ACL gives you per-file, per-directory and per-user control over who has permission to access files. You can see the ACL for a file or directory with the getfacl command:

$ hdfs dfs -getfacl  /user/<net_id>_nyu_edu/testdir

# To modify permissions for files or directories, use setfacl:

$ hdfs dfs -setfacl -R -m user:<net_id>_nyu_edu:rwx /user/<net_id>_nyu_edu/testdir

$ hdfs dfs -setfacl -R -m default:user:<net_id>_nyu_edu:rwx /user/<net_id>_nyu_edu/testdir

To open the subdirectory permission to others, you need to open each higher level directory's navigation permission too:

$ hdfs dfs -setfacl -m user:<net_id>_nyu_edu:--x /user/<net_id>_nyu_edu

Uploading Data to HDFS from Your Computer

Small Transfers

You can add smaller files to HDFS by copying them to your local filesystem / non-HDFS  home directory and then copying them from there to HDFS.  Note that there is a limit on the size of your local filesystem home directory, so you should only perform these steps for smaller amounts of data- for larger datasets you should use the method described in the Large Transfers section.

hdfs dfs -put /home/<your_netid>_nyu_edu/<path_to_file> <hdfs_path>

To retrieve data from HDFS and copy it to your local filesystem home directory, you can use one of the following commands:

hdfs dfs -get <hdfs_path> /home/<your_netid>_nyu_edu/<path_to_file>

hdfs dfs -copyToLocal <hdfs_path> /home/<your_netid>_nyu_edu/<path_to_file>

You can then download data by going to the upper right corner of the window in the command line interface and selecting the Download File option and entering a file path (i.e., /home/<your_netid>_nyu_edu/<path_to_file>).

To list files in HDFS, use the following command:

hdfs dfs -ls

Large Transfers

To upload large datasets to HDFS, first navigate to the data ingest website at https://dataproc.hpc.nyu.edu/ingest. The data ingest website provides a web interface for temporary cloud-based bucket storage.  Any datasets that are uploaded to the data ingest website will remain there for 2 days.  Before these 2 days have elapsed, you will need to upload your datasets to your HDFS home directory.  To do that you can use the following command after logging in:

hadoop distcp gs://nyu-dataproc-hdfs-ingest/<file_or_folder_name> /user/<your_net_id>_nyu_edu

You can find the full path to your file/folder in the ingest storage by clicking on it in the web interface and then scrolling down to the gsutil URI field in the Live Object tab.

Warning: Data uploaded into the ingest website will be visible to all members of the cluster temporarily.  If you are uploading files that cannot be shared with all cluster users (e.g., code) please use the alternate method described below.

Uploading Data to HDFS from Greene

First download gcloud on a desktop computer with a browser by following the instructions here.  Ensure that you run the install command described as optional in step 4.

Log into Greene and activate the Google Cloud command line interface module:

ml load google-cloud-sdk/379.0.0

Then log into Google Cloud by typing the following:

gcloud auth login

Copy and paste the command that you are given into a terminal application on your desktop and run it.  When prompted, type y to proceed.  If you are signed into multiple Google accounts, you will then be presented with a browser window where you can choose your account.  Select your NYU account.  Google will then present a message saying that “Google Cloud SDK wants to access your Google Account”.  Click Allow.

Copy the URL that you are given in the terminal window, and paste it into your Greene session on the line where gcloud asks for it.

Type gcloud auth list to verify that you are logged in:

[jp6546@hlog-1 ~]$ gcloud auth list

           Credentialed Accounts

ACTIVE             ACCOUNT

*                  jp6546@nyu.edu


To set the active account, run:

    $ gcloud config set account `ACCOUNT`

Now that you are logged in, use the instructions under the Small Transfers or Large Transfers sections below to upload your data.

Small Transfers

Run the following commands on Greene to ensure that gcloud knows that you are using it with Dataproc (rather than a different Google Cloud application):

gcloud config set project hpc-dataproc-19b8

gcloud config set compute/zone us-central1-f

Run the following command on Greene to upload your file to your filesystem home directory on Dataproc:

gcloud compute scp MYFILE nyu-dataproc-m:~

If you are prompted to give a passphrase while generating an SSH key, hit enter twice.  The above commands are the command line equivalent to the upload dialogue at http://dataproc.hpc.nyu.edu/ssh that is mentioned earlier.

Your file should now be available within your filesystem home directory on Dataproc.  You can then run the following command to get it into HDFS:

hdfs dfs -put /home/<your_netid>_nyu_edu/<path_to_file> <hdfs_path>

Large Transfers

On Greene, run the following to upload a single file to the staging bucket:

gsutil cp FILE  gs://nyu-dataproc-hdfs-ingest

Or run the following to copy a directory:

gsutil rsync -r DIRECTORY gs://nyu-dataproc-hdfs-ingest

The above commands are the command line equivalents to the data ingest website described earlier.

As with the earlier example, run the following from within Dataproc to ingest the dataset into your HDFS home directory:

hadoop distcp gs://nyu-dataproc-hdfs-ingest/<file_or_folder_name> /user/<your_net_id>_nyu_edu

Computation on Dataproc

MapReduce

What Is MapReduce?

MapReduce is a programming model and an associated implementation for processing and generating large datasets with a parallel, distributed algorithm on a cluster.

A MapReduce job splits a large dataset into independent chunks and organizes them into key-value pairs for parallel processing. The mapping and reducing functions receive not just values, but (key, value) pairs. 

Every MapReduce job consists of at least two parts:

Mapping Phase: Takes input as <key,value> pairs, processes them, and produces another set of intermediate <key,value> pairs as output.

Reducing Phase: Reducing lets you aggregate values together. A reducer function receives an iterator of input values from an input list. It then combines these values together, returning a single output value.

MapReduce Word Count Example


Ingest a text file into HDFS.  In the below example, the hdfs -put command is combined with another Unix command (curl) to send a copy of a Sherlock Holmes book (located at the URL https://www.gutenberg.org/files/1661/1661-0.txt) directly into HDFS:


curl https://www.gutenberg.org/files/1661/1661-0.txt | hdfs dfs -put -  /user/<netid>_nyu_edu/input.txt


The following command will run an example Word Count job (described in more detail here) with the Sherlock Holmes book as its input.


hadoop jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar wordcount -D mapreduce.job.maps=6 -D mapreduce.job.reduces=2 /user/<netid>_nyu_edu/input.txt /user/<netid>_nyu_edu/output

Spark

What is Spark?

Apache Spark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. (source: Wikipedia)

Launching an Interactive Spark Shell

Spark provides an interactive shell that you can use to learn the Spark API and analyze datasets interactively. To connect to Spark Shell from the command line, execute the following command:

spark-shell --deploy-mode client --num-executors=1 --driver-memory=1G --executor-memory=1G

Note: NYU Dataproc deploys Spark applications in cluster mode by default.  The following error indicates that you are trying to deploy an interactive shell, which must use client mode:

Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode is not applicable to Spark shells.

To resolve this error, either use the command line flag indicated above (--deploy-mode client) or set the spark.submit.deployMode property in your Spark configuration to client.  More details about the difference between cluster and client mode can be found here.

YARN Scheduler

YARN is the resource manager and job scheduler in the Hadoop cluster. YARN allows you to use various data processing engines for batch, interactive, and real-time stream processing of data stored in HDFS.

Application status and logs

Please find the list of current running apps using 'Yarn' script. Running the yarn script without any arguments prints the description for all commands.

$ yarn application -list 

To kill a currently running app because the submitted app started malfunctioning or in worst case scenario, it's stuck in an infinite loop. Get the app ID and then kill it as given below

$ yarn application -kill <application_ID>

To download application logs for examination on the command line

$ yarn logs -applicationId <application_ID>

Using Hive


Apache Hive is a data warehouse software package that facilitates querying and managing large datasets residing in distributed storage (i.e., HDFS). Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called Hive Query Language (HiveQL or HQL). 


You can access Hive with the following command:


beeline -u jdbc:hive2://localhost:10000

Type "!quit", to exit properly from a beeline session. 

If you are planning on using SerDe to query/work with JSON files, you will need to run the following code at the Beeline prompt first in order to ensure that the JsonSerDe class is loaded:

ADD JAR /usr/lib/hive/lib/hive-hcatalog-core.jar

See here for more information.

Access to Hive databases on NYU Dataproc is derived from HDFS permissions because we use Storage-Based Authorization .  To grant read-only access to a Hive database to someone other than yourself, you can run the following command:

hdfs dfs -setfacl -R -m user:<OTHER_PERSON_NETID>_nyu_edu:r-x /user/hive/warehouse/<YOUR_NETID>_nyu_edu.db

Outside of NYU, other Hadoop installations may use a different mechanism to share databases with other colleagues– it is common for Hadoop installations to use a SQL style grant/revoke mechanism for sharing databases ( SQL Standards Based Authorization ).  This mechanism is not used at NYU and it is important to bear in mind that external documentation referring to grant/revoke statements is not applicable to NYU Dataproc.

 Using Presto


Presto is an open source distributed SQL query engine (similar to Apache Impala) created by Facebook.  To access Presto, you can type the following command:


presto


Once you are inside, you can reference multiple data sources through catalogs (see here).  For instance, you may want to query Hive using Presto.  You can select a database to use through a fully-qualified database or table name as shown in the code below:


presto> show catalogs;

       Catalog        

----------------------

 bigquery             

 bigquery_public_data 

 hive                 

 memory               

 system               

 tpcds                

 tpch                 

(7 rows)


Query 20220826_155243_00000_m7z3s, FINISHED, 3 nodes

Splits: 53 total, 53 done (100.00%)

1.73 [0 rows, 0B] [0 rows/s, 0B/s]

presto> use hive.<netid>_nyu_edu;

USE


You can also specify a catalog / data source that you want to use on the command line when you start Presto:


presto --catalog=hive


Using Conda


NYU Dataproc comes with miniconda3 by default.  This can be used to manage Python packages within your filesystem home directory.  The main difference between using conda on NYU Dataproc and using conda on Peel is that conda is enabled on NYU Dataproc by default, so you do not need to use the module command to load it.  See here or here for more information on the conda command.

Using Jupyter Notebooks


    To access the notebook, open this file in a browser:

        file:///home/jp6546_nyu_edu/.local/share/jupyter/runtime/nbserver-7866-open.html

    Or copy and paste one of these URLs:

        http://localhost:8888/?token=90d9c6297ba2c963cdb998ae374041384bac71c781b18ed1

     or http://127.0.0.1:8888/?token=90d9c6297ba2c963cdb998ae374041384bac71c781b18ed1

gcloud compute ssh nyu-dataproc-m --project hpc-dataproc-19b8 --zone us-central1-f -- -N -L PORT:localhost:PORT


In our example, from the output in step 2 this command would be as follows:


 gcloud compute ssh nyu-dataproc-m --project hpc-dataproc-19b8 --zone us-central1-f -- -N -L 8888:localhost:8888


Using Zeppelin Notebooks

jp6546_nyu_edu@nyu-dataproc-m:~$ zeppelin start

Zeppelin is starting with the following configuration:

-------------------------------------------------------

Username: jp6546_nyu_edu

Password: REDACTED

Port: 64739


Zeppelin start                                             [  OK  ]

gcloud compute ssh nyu-dataproc-m --project hpc-dataproc-19b8 --zone us-central1-f -- -N -L PORT:localhost:PORT


In our example this command would be as follows using the output from step 1 :


 gcloud compute ssh nyu-dataproc-m --project hpc-dataproc-19b8 --zone us-central1-f -- -N -L 64739:localhost:64739

zeppelin get-port

zeppelin get-pass