Hadoop User Guide


NOTICE: This page refers to the Peel Hadoop Cluster which was retired in 2023 and is in the process of being archived. Its contents should not be referenced for Dataproc Courses.

What is Hadoop?

Hadoop is an open-source software framework for storing and processing big data in a distributed/parallel fashion on large clusters of commodity hardware. Essentially, it accomplishes two tasks: massive data storage and faster processing. The core Hadoop consists of HDFS - the Hadoop File System - and Hadoop implementation of MapReduce.

What is MapReduce?

MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster.

A MapReduce job splits a large data set into independent chunks and organizes them into key-value pairs for parallel processing. The mapping and reducing functions receive not just values, but (key, value) pairs. 

Every MapReduce job consists of at least two parts:

Mapping Phase: Takes input as <key,value> pairs, processes them, and produces another set of intermediate <key,value> pairs as output.

Reducing Phase: Reducing lets you aggregate values together. A reducer function receives an iterator of input values from an input list. It then combines these values together, returning a single output value.

MapReduce Data Flow for WordCount Problem

To Trigger the Job 

export HADOOP_LIPATH=/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/libhadoop jar <jarfilename>.jar <DriverClassName> <ip_file_in_HDFS> <op_dir_name>

Where <jarfilename>.jar is the jar file from Mapper, Reducer and DriverClass.

Accessing the Peel Hadoop Cluster

All active HPC users have an account on Peel. If you need an account, review the HPC Getting and Renewing Accounts page for instructions on how to get an account. 

The Peel login nodes can be reached directly using the following command (NYU VPN required):

ssh <NetID>@peel.hpc.nyu.edu

For more details about logging into the Peel cluster read the Accessing HPC Systems page. 

YARN Scheduler

YARN is the resource manager and job scheduler in the Peel cluster. YARN allows you to use various data processing engines for batch, interactive, and real-time stream processing of data stored in HDFS.

Job Queues

The memory available to users’ Yarn containers is 4.45 TB in total. There is a queue named 'q1' created to accommodate users with a large memory requirement. Majority of users are in the 'default' queue which is guaranteed resources. If you want to be placed in 'q1', please contact us. To check which queue you are using, run a Spark application then go to the page All Yarn Applications, and look at the 'Queue' column for your application.

Application status and logs

Please find the list of current running apps using 'Yarn' script. Running the yarn script without any arguments prints the description for all commands.

$ yarn application -list 

To kill a currently running app because the submitted app started malfunctioning or in worst case scenario, it's stuck in an infinite loop. Get the app ID and then kill it as given below

$ yarn application -kill <application_ID>

To download an app logs for examination on the command line

$ yarn logs -applicationId <application_ID>

What is Spark?

Spark is a framework for performing general data analytics on a distributed computing cluster like Hadoop. It provides in-memory computations for increased speed and data processing over mapreduce. It runs on top of existing hadoop cluster and access hadoop data store (HDFS), can also process structured data in Hive and Streaming data from HDFS, Flume, Kafka, Twitter.

Running Spark Jobs

The default deploy mode for Spark on Peel is set to 'cluster'. When running Spark jobs, please make sure Spark is in the 'cluster' deploy mode, which helps reduce load on the login nodes, which are not for computing. To get an interactive session for debugging, run the following:

$ pyspark --deploy-mode client

  or

$ spark-shell --deploy-mode client

If you need Python 3 for Spark, use the setup:

$ module purge$ module load python/gcc/3.7.9 $ pyspark [......]

Add the option '--conf spark.yarn.submit.waitAppCompletion=false' to your Spark command, if you want to exit from a terminal after applications submitted. 

Running PySpark with Conda Env

The section is based on a post in the Cloudera Community. You may read the article before trying it out. We modified the recipe, tailored it for Peel and tested it. And it works great. The example demonstrates the use of Conda env to transport a python environment with a PySpark application needed to be executed. This sample application uses the NLTK package with the additional requirement of making tokenizer and tagger resources available to the application as well. So it is a bit complicated than those utilizing many other packages. Users are encouraged to make own Conda env by following the steps as demonstrated here. The sample application 'spark_nltk_sample.py':

import osimport sysfrom pyspark.context import SparkContextfrom pyspark.conf import SparkConfconf = SparkConf()conf.setAppName("spark-ntlk-env")sc = SparkContext(conf=conf)data = sc.textFile('/user/tst867/1970-Nixon.txt')def word_tokenize(x):    import nltk    return nltk.word_tokenize(x)def pos_tag(x):    import nltk    return nltk.pos_tag([x])words = data.flatMap(word_tokenize)words.saveAsTextFile('./nixon_tokens')pos_word = words.map(pos_tag)pos_word.saveAsTextFile('./nixon_tokens_pos')

Below are the commands to run, to make the magic happening. 

[tst867@hlog-1 ~]$ module load miniconda3/2020.11[tst867@hlog-1 ~]$ conda create -p /scratch/tst867/conda/envs/nltk_env --copy -y -q python=3.7 nltk numpy[tst867@hlog-1 ~]$ cd /scratch/tst867/conda/envs/[tst867@hlog-1 envs]$ zip -r nltk_env.zip nltk_env[tst867@hlog-1 envs]$ conda info --envs[tst867@hlog-1 envs]$ source activate /scratch/tst867/conda/envs/nltk_env(nltk_env) [tst867@hlog-1 envs]$ python -m nltk.downloader -d nltk_data all(nltk_env) [tst867@hlog-1 envs]$ conda deactivate[tst867@hlog-1 envs]$ hdfs dfs -put nltk_data/corpora/state_union/1970-Nixon.txt /user/tst867/[tst867@hlog-1 envs]$ cd nltk_data/tokenizers/[tst867@hlog-1 tokenizers]$ zip -r ../../tokenizers.zip *[tst867@hlog-1 tokenizers]$ cd ../../[tst867@hlog-1 envs]$ cd nltk_data/taggers/[tst867@hlog-1 taggers]$ zip -r ../../taggers.zip *[tst867@hlog-1 taggers]$ cd ../../

# Using vi, emacs or nano editor, create the application script with content as shown previously

[tst867@hlog-1 envs]$ emacs -nw spark_nltk_sample.py

# At this point we should have generated these files and directories 

[tst867@hlog-1 envs]$ ls -ltrtotal 541186-rw-rw-r--  1 tst867 tst867       519 Apr  6 14:54 spark_nltk_sample.pydrwxrwsr-x 10 tst867 tst867      4096 Apr  6 15:42 nltk_env-rw-rw-r--  1 tst867 tst867 483854238 Apr  6 15:43 nltk_env.zipdrwxrwsr-x 12 tst867 tst867      4096 Apr  6 15:47 nltk_data-rw-rw-r--  1 tst867 tst867  27415412 Apr  6 15:49 tokenizers.zip-rw-rw-r--  1 tst867 tst867  42663686 Apr  6 15:50 taggers.zip

# Run spark-submit

[tst867@hlog-1 envs]$ PYSPARK_PYTHON=./NLTK/nltk_env/bin/python spark-submit --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./NLTK/nltk_env/bin/python --conf spark.yarn.appMasterEnv.NLTK_DATA=./ --conf spark.executorEnv.NLTK_DATA=./ --archives nltk_env.zip#NLTK,tokenizers.zip#tokenizers,taggers.zip#taggers spark_nltk_sample.py# The application succeeded, now to check results[tst867@hlog-1 envs]$ hdfs dfs -cat /user/tst867/nixon_tokens/* | head -n 20[tst867@hlog-1 envs]$ hdfs dfs -cat /user/tst867/nixon_tokens_pos/* | head -n 20

Once the zip files with Conda env folded in,  we may re-use them to submit many applications requiring same packages. Additionally we can put the zip files to a HDFS directory, and tell Spark to grab them there by giving option e.g.:

--archives hdfs:///user/tst867/archives/nltk_env.zip#NLTK,hdfs:///user/tst867/archives/tokenizers.zip#tokenizers,hdfs:///user/tst867/archives/taggers.zip#taggers

Using Hive

Apache Hive is a Data Warehouse software that facilitates querying and managing large datasets residing in a distributed storage (Example: HDFS). Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. The Hive Query Language (HiveQL or HQL) for MapReduce processes structured data using Hive. 

We configure Sentry so that by default a regular user can see only your own database which is pre-created. We can grant permission to your project members' database upon request. 

[tst867@hlog-1 ~]$ beeline --silentWARNING: Use "yarn jar" to launch YARN applications.SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/jars/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6626826/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]beeline> !connect jdbc:hive2://hm-1.hpc.nyu.edu:10000/Enter username for jdbc:hive2://hm-1.hpc.nyu.edu:10000/: tst867Enter password for jdbc:hive2://hm-1.hpc.nyu.edu:10000/: ****************0: jdbc:hive2://hm-1.hpc.nyu.edu:10000/> use tst867;0: jdbc:hive2://hm-1.hpc.nyu.edu:10000/> create table messages5 (users STRING, post STRING, time BIGINT, country STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE;0: jdbc:hive2://hm-1.hpc.nyu.edu:10000/> load data inpath 'hdfs://horton.hpc.nyu.edu:8020/user/tst867/user_posts.txt' overwrite into table messages5; 0: jdbc:hive2://hm-1.hpc.nyu.edu:10000/> 0: jdbc:hive2://hm-1.hpc.nyu.edu:10000/> insert overwrite directory '/user/tst867/hiveoutput' select * from foo;

Using the Impala Shell

Apache Impala is and open source parallel processing SQL query engine running on all compute nodes (hc[01-18].nyu.cluster).

To connect to Impala use hostname where impala daemon is running. In fact, impala daemons are running on all computing nodes on Peel 'hc[01-18].nyu.cluster'. So, connecting to any compute nodes, impala will work.

Here is the process to connect to Impala-shell.

[tst867@hlog-1 ~]$ impala-shellStarting Impala Shell without Kerberos authenticationError connecting: TTransportException, Could not connect to hlog-1.hpc.nyu.edu:21000***********************************************************************************Welcome to the Impala shell.(Impala Shell v3.2.0-cdh6.3.4 (5fe4723) built on Wed Oct 21 08:51:34 PDT 2020)The HISTORY command lists all shell commands in chronological order.***********************************************************************************[Not connected] > connect hc07.nyu.cluster;Connection lost, reconnecting...Error connecting: TTransportException, Could not connect to hlog-1.hpc.nyu.edu:21000Opened TCP connection to hc07.nyu.cluster:21000Connected to hc07.nyu.cluster:21000Server version: impalad version 3.2.0-cdh6.3.4 RELEASE (build 5fe4723ad8fe1c3aaecbeb32c7533048be2420cf)[hc07.nyu.cluster:21000] default>

Zeppelin

Apache Zeppelin is a web-based interactive computational environment that could use Apache Spark as a backend. In some sense it is like the IPython Notebook. Zeppelin is installed on Peel. 

Peel is a YARN cluster, not a standalone Spark cluster.  Here is the process to work with Zeppelin on Peel.

# Create personal directories and copy the configuration over$ mkdir -p $HOME/zeppelin/{conf,logs,notebook,run,webapps} $ cp /share/apps/peel/zeppelin/0.9.0/conf/* $HOME/zeppelin/conf/

See [users] section in $HOME/zeppelin/conf/shiro.ini for the default user/password and the instruction to change password. The procedures to start and stop a Zeppelin server on Peel login nodes are:

# Start a Zeppelin daemon$ /share/apps/peel/zeppelin/0.9.0/bin/zeppelin-daemon.sh --config $HOME/zeppelin/conf startZeppelin start at port 9178                                [  OK  ]

# please remember to clean up when you are done, not to leave outdated processes hanging around

# Stop the daemon$ /share/apps/peel/zeppelin/0.9.0/bin/zeppelin-daemon.sh --config $HOME/zeppelin/conf stopZeppelin stop                                              [  OK  ]

There are two Peel login nodes. Verify which login node the Zeppelin server is just started by using command 'hostname' at command line. Suppose that is 'hlog-1.hpc.nyu.edu', and NYU VPN is established. Open a new terminal (or PuTTY session) on your computer, and enable SSH port forwarding by running command "ssh -L 4321:localhost:9178 <net_id>@216.165.13.149", or equivalent in PuTTY.  Note that 216.165.13.149 is the external IP address for the node 'hlog-1'. The node 'hlog-2' external IP address is 216.165.13.150. Also note that the Zeppelin port is randomized, it could be different next time you start a new server. The number 9178 is generated as shown in the example above.  

Now it is time to open a new web browser tab, input the following address to get the Zeppelin UI:

http://localhost:4321

File Permissions and Access Control Lists

Users can share files with others using ACL's.

An access control list (or ACL) gives per-file, per-directory and per-user control over who have permissions on the files. You can see the ACL for a file or directory with the getfacl command:

$ hdfs dfs -getfacl  /user/<net_id>/testdir
# To modify permissions for files or directories, use setfacl:
$ hdfs dfs -setfacl -R -m user:<net_id>:rwx /user/<net_id>/testdir
$ hdfs dfs -setfacl -R -m default:user:<net_id>:rwx /user/<net_id>/testdir 

To open subdirectory permission to others, you need to open each higher level directory's navigation permission too:

$ hdfs dfs -setfacl -m user:<net_id>:--x /user/<net_id>

Hadoop File System (HDFS)

HDFS stands for Hadoop Distributed File System. HDFS is a highly fault-tolerant file system and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets.

TO UPLOAD DATA TO HDFS

HDFS stands for Hadoop Distributed File System. HDFS is a highly fault-tolerant file system and is designed to be deployed on low-cost hardware. HDFS provides high throughput access to application data and is suitable for applications that have large data sets.

To Upload Data to HDFS

hadoop fs  -put   <filename_in_lfs>  <hdfs_name>

To Get Data from HDFS

hadoop fs  -get  <hdfs_name>  <filename_in_lfs>

Peel Account Access

All HPC users should have an account on Peel. If you are enrolled in a class using the clusters, you may already have an account, try logging in first to check. See the HPC 'Getting and Renewing Accounts' page for instructions on how to get an account.  (NYU VPN is required). 

Your home directory on Peel is the same as that on the HPC Greene cluster. To see which cluster you are on, on the command prompt run 'echo $CLUSTER'.

Transferring Local Files

To replicate files from your computer to a Peel HDFS directory, first use 'scp' to copy from your computer to $HOME or $SCRATCH, then run 'hdfs dfs -put' to move them to the HDFS directory.

my_laptop$ scp inetutils-1.9.4.tar.gz <net_id>@peel.hpc.nyu.edu:~/
peel$ hdfs dfs -put inetutils-1.9.4.tar.gz /user/<net_id>

Transferring Big Data Files

Use Globus to transfer the data to /scratch first. The same /scratch folder is also available on Peel login nodes. Then copy the data from /scratch to your HDFS directory in Peel. /home is shared too between multiple clusters i.e., Greene and Peel. 

See instructions Data Transfers page to transfer files to and from NYU HPC storage.