Hadoop Benchmarking and Performance Tuning
Abstract—In Hadoop based solutions the biggest challenge is to obtain the best performance from the underlying hardware stack. We take a log of pride in associating ourselves with setting up production ready Hadoop clusters with commodity hardware. In fact the biggest selling point of Hadoop being a Big Data solution is the reasoning that we setup cost effective data centers that can process huge amount of data.This paper demonstrates the hadoop benchmarking and Performance tuning optimization techniques.
With commodity hardware being the bottom line we need to fully utilize the clusters capability so that we can obtain the best performance from the underlying infrastructure. As Hadoop is horizontally scalable we mostly tend to add more nodes/instances into an existing cluster to improve the performance rather than focusing on improving the tuning parameters, this approach results in a massive cluster to maintain that’s not working on its optimal configuration thereby increasing the cost of operations.
In this paper we will explain in simple terms the complex nature of performance tuning of Hadoop and its related services(Hadoop Benchmarking and Performance Tuning). As an example we will also see how an optimally configured Hadoop cluster of 20 nodes work better than a default configured 30 node Hadoop cluster.
This paper is dedicated to express the complex nature of Hadoop performance tuning into a more simple and expressive discussion.
Introduction
Performance tuning plays a very important part in benchmarking Hadoop Clusters before being handed over as productionalized cluster to customers.
Out of the box Hadoop setup and configurations do not provide a true picture of the power of distributed processing and often provide an incorrect picture of the efficiency of Hadoop. Once a Hadoop cluster is setup, the cluster might not work as an optimized setup resulting in delivering the processing and storage throughput at a bare minimum level resulting in underutilization of resources.
This paper tries to identify the challenges and thereby provide a strategy as how to optimize Hadoop clusters and perform a benchmark that can result on optimizing cluster performance.
OverView
A. Identifying Services To Optimize
This paper focusses on optimizing of Hadoop and YARN services for Hadoop ecosystem. The focus is mostly on IO and processing throughput to ensure that data ingestion and data processing can be benchmarked.
The following services are discussed in this paper
HDFS
YARN
B. HDFS Cluster Tuning
Tuning HDFS provides a huge opportunity to ensure all IO operations of data management for bring data in and out of Hadoop cluster can be optimized.
HDFS provides the underlying filesystem for Hadoop which acts as a Data lake to host all data for processing later. The biggest challenge in a Hadoop cluster setup is optimization of HDFS. For any processing Hadoop framework requires data to be available on HDFS and hence optimizing HDFS IO operations are most important for a smooth functioning of Hadoop cluster.
The benefits of optimizing HDFS configuration ensures that disk IO of reading and writing data into HDFS is faster and more efficient. This HDFS optimization ensures that all eventual processing done on YARN can handle data better and faster for quicker processing.
Following are the parameters we have implemented for out project for HDFS optimization
compress.map.output = Map outputs are normally written into local storage on each data node and normally they are transferred across the cluster for each reduce process for respective keys. This setting ensures that intermediate map output is always compressed before saving into local storage ensuring faster and easier transfer of data across nodes on cluster during processing. Normally using this setup we have found text data that has been over 40% compressed that resulted in 1 TB data eventually worked upon as 600GB worth of processing.
block.size = this setup ensures that the block size used to save data into HDFS have an optimal configuration. We have implemented multiple clusters and with our experience we prefer a block size depending on the size of the data size. Normally for anything less than 4 TB we would prefer 256MB as block size and with anything beyond 4TB raw data we prefer 512MB or more.
sort.mb = this parameter ensures that we have optimal number of disk spills. A bigger number of sort parameter will ensure that intermediate outputs will not be spilled into disk very frequently, however a huge number will also cut down the actual size of container needed for map processes. In actual practice for smaller cluster working with 4TB data 100MB is a preferred size but in case of any bigger size of raw data even a size of 200MB would be good.
sort.factor = this configuration parameter works with the previous one and identifies the number of streams of map output to be merged and saved back into disk for access later. For small cluster size we prefer a size of 10 but with large clusters (>4TB raw data) we recommend increasing values till 100.
namenode.handler.count = this configuration parameter specifies the number of parallel RPC calls are managed throughout all IO calls. Normally for smaller cluster a size of 20 works fine but we can increase the same to 60 for cluster sizes of 4TB or more data size.
datanode.handler.count = this configuration is same as previous one discussed, this parameter focusses on handler for datanodes specifically.
http.threads = this configuration parameter specifies the same as previous one but instead of RPC they focusses on HTTP based parallel thread counts.
local.dir = specifies the local directory where mapper output will be saved. For large jobs that produces a huge set of output ensure that sufficient disk is available on the same. Normally we set this to be on /tmp on root disk.
replication factor = default set to 3 which works mostly for maximum projects but consider changing the same based on project requirements. For normal log analytics applications normally a rep factor of 2 works fine (and faster) whereas for financial projects we prefer bumping the same to 4 or even 5.
These settings ensure that the HDFS IO properties are maintained and managed for best cluster experience.
C. YARN Cluster Tuning
YARN provides the processing power to cluster. For most use cases people use Hadoop for Data Analytics, providing a highly optimized cluster configurations ensures that resources are used optimally and cost of management and ownership of project is managed.
In this paper we discuss few optimization techniques and parameters that we have used in our projects and have delivered acceptable and dependable cluster configuration(s). Normally when we use custom out of box parameter settings we utilize almost 40-50% of cluster resources which boils down to utilizing only 50% infrastructure. Imagine a cluster of 100 machines which eventually turn out to be 50 operational machines that is actually working. Once we tune the cluster we might give us a chance to use 95-98 of these 100 machines which boosts the cost of project ownership in terms of resource utilization, performance, and time of execution and eventually the monetary benefits of running the optimal number of nodes.
Following are the properties of YARN configurations that we advise to change:
map.tasks.speculative.execution = this parameter signifies the number of allotments for each map tasks. Normally for a production cluster ensure that this is turned off so that we don’t try running same map task on multiple nodes.
reduce.tasks.speculative.execution = this parameter is same as above, only difference is it provides number of redundant reducers running for each task.
tasktracker.map.tasks.maximum = this parameter defines the number of mapper that can run in parallel inside a container on each node managers. Normally we have mixed values for this parameter, in case of memory hungry processes set this to be not more than 2 or 3 but for smaller text mining algorithms increasing parameter to 4 or 5 is also acceptable.
tasktracker.reduce.tasks.maximum = same as above parameter, this configuration provides the number of containers that can be executed on top of node managers, for algorithms that requires more processing capabilities setup a smaller number of 1 or 2 but for algorithms that does not require a huge set of configurations set this up to have values 3 or 4.
job.reuse.jvm.num.tasks = this parameter signifies if we prefer to reuse utilizing existing jvm for task execution. As jvm creation and management takes time its best to setup this parameter as true for large jobs that uses many map and reduce tasks to execute.
reduce.parallel.copies = this parameter signifies the number of parallel threads used to copy intermediate data for reducer. Default value is 5 but it’s generally good to increase the value to 10 or even 20 provided the tasks are running on bigger cluster.
Jvm tuning = Hadoop processes and daemons run on java, tuning Hadoop JVM settings and heap sizes would be a great starting point to optimize the java processes for all Hadoop daemons.
D. Process of Tunining Cluster
Having a tuned cluster provides a huge advantage in terms of cost of operations and also maintainability.
As cluster tuning is a never ending procedure and it’s a part of the exploratory process throughout the cluster lifecycle, the best way to tune a cluster is to have an idea of the amount of data client is going to ingest and process at any point of time. Once we have that knowledge it’s always better to have a benchmark of the cluster with out of box cluster setup and have a first-hand knowledge of the cluster health. Once baseline benchmark is done we can then identify the expectation of throughput from client/customer and define an SLA for the defined set of data. Once an SLA is set we can then use our cluster benchmarking algorithms and tune out cluster with above defined tuning parameters ensuring that we can achieve the SLA on the benchmarking algorithms. If we can achieve the SLA during benchmarking then we can safely handover the cluster to client/customers and tag the cluster as production ready.
E. Cluster Benchmarking
1) DFSIO
DFSIO algorithms provides an HDFS IO throughput benchmarking calculation that can help identify and benchmark the read and write throughput into HDFS. This test is particularly useful for performing stress test on Hadoop cluster by loading a huge set of data onto HDFS and identify the HDFS throughput in terms of time taken to read and write data into HDFS. This process helps to identify potential bottleneck in cluster and tune network of hardware for better performance.
DFSIO benchmarking algorithm has two parts – one focusses on writing data into HDFS and benchmarks HDFS write throughput. Normally the output folder for this benchmarking is on /benchmarks/TestDFSIO folder in HDFS.
In order to run DFSIO write algorithm and generate data use the following command
hadoop jar $HADOOP_HOME/hadoop-*test*.jar TestDFSIO -write -nrFiles 4000 –filesize 1000
This command will run and generate 4000 files of 1 GB each which results in 1TB data that would be generated and loaded into HDFS.
The second part of DFSIO is to ensure that HDFS read can be properly initiated and all reads from HDFS happens at best possible throughput. This read test also provides an indication as how fast the processing algorithms will be able to read and process data.
In order to start the read throughput we use the following command.
hadoop jar $HADOOP_HOME/hadoop-*test*.jar TestDFSIO -read -nrFiles 4000 –filesize 1000
This command start a job that reads 4TB worth of data that was generated on previous step. Using this benchmarking tool we can determine the cumulative read throughput of the entire cluster by identifying how long it takes to read 4 TB worth of data.
Apart from these two tools to generate and read data into HDFS, DFSIO provides the third algorithm to clean up the HDFS space. Once we generate terabytes of data used to read and write data into HDFS we need to clean the same before handling over the cluster to clients/customers.
The following command cleanses the Hadoop output directory:
hadoop jar hadoop-*test*.jar TestDFSIO –clean
Once we have executed both the algorithms from DFSIO we can check the logs and find the throughput of the read and write operations. Following are an extract as how to evaluate the DFSIO throughput
—– TestDFSIO —– : write
Date & time: Fri Apr 08 2011
Number of files: 1000
Total MBytes processed: 1000000
Throughput mb/sec: 4.989
Average IO rate mb/sec: 5.185
IO rate std deviation: 0.960
Test exec time sec: 1113.53
—– TestDFSIO —– : read
Date & time: Fri Apr 08 2011
Number of files: 1000
Total MBytes processed: 1000000
Throughput mb/sec: 11.349
Average IO rate mb/sec: 22.341
IO rate std deviation: 119.231
Test exec time sec: 544.842
In both of these log extracts we can find the Throughput mb/sec value to give us an idea of the throughput calculations for HDFS read and write.
2) Terasort Benchmark
Terasort benchmarking focusing on providing throughput on CPU cycles spend of data processing. The idea of terasort is to sort a set of data which is randomly generated as fast as possible. The time taken to sort the data provides a clear picture of how well the cluster is tuned to perform CPU intensive operations.
Terasort benchmarking are a test to CPU and RAM based processing operations
To start the terasort operation we start by generating random set of data into HDFS which will be later sorted. To generate 4 TB of data we have the following commands:
hadoop jar hadoop-*examples*.jar teragen 40000000000 /user/hduser/terasort-input
This above command generated a random sequence of numbers that would be later sorted using distributed process and would provide a better idea of CPU and RAM utilization. In this example we generate 4 TB worth data into a HDFS folder in /user/hduser/terasort-input
Once the data is generated we now need to process the same and run sorting algorithm against the generated data.
We run the following algorithm to process the data:
hadoop jar hadoop-*examples*.jar terasort /user/hduser/terasort-input /user/hduser/terasort-output
This process runs the teragen algorithm against the folder where data was generated, processes the data on YARN and later saves the sorted data back into HDFS output folder.
Once the data generation and data processing are done we can also validate the data and algorithm with terasort validation algorithm. To validate the data use the following:
hadoop jar hadoop-*examples*.jar teravalidate /user/hduser/terasort-output /user/hduser/terasort-validate
Once the validation is done we can check the logs and validate the entire teragen process and benchmark the clusters performance parameters
Hadoop job: job_201102110201_0014
=====================================
Job tracker host name: master
job tracker start time: Fri Feb 11 2011
User: hadoop
JobName: TeraGen
JobConf: hdfs://master:54310/app/hadoop/tmp/mapred/system/job_201102110201_0014/job.xml
Submitted At: 11-Feb-2011
Launched At: 11-Feb-2011 13:58:14 (0sec)
Finished At: 11-Feb-2011 15:00:56 (1hrs, 2mins, 41sec)
Status: SUCCESS
=====================================
Task Summary
============================
Kind Total Successful Failed Killed StartTime FinishTime
Setup 1 1 0 0 11-Feb-2011 13:58:15 11-Feb-2011 13:58:16 (1sec)
Map 24 24 0 0 11-Feb-2011 13:58:18 11-Feb-2011 15:00:47 (1hrs, 2mins, 28sec)
Reduce 0 0 0 0
Cleanup 1 1 0 0 11-Feb-2011 15:00:50 11-Feb-2011 15:00:53 (2sec)
============================
Analysis
=========
Time taken by best performing map task task_201102110201_0014_m_000006: 59mins, 5sec
Average time taken by map tasks: 1hrs, 1mins, 24sec
Worse performing map tasks:
TaskId Timetaken
task_201102110201_0014_m_000004 1hrs, 2mins, 24sec
task_201102110201_0014_m_000020 1hrs, 2mins, 19sec
task_201102110201_0014_m_000013 1hrs, 2mins, 9sec
The above log provides details as what kind of CPU and RAM time is spend on each task. This report also specifies what kind of time spent on map and reduce tasks.
3) Namenode Benchmark
The idea of this algorithm is to benchmark namenode hardware throughput but generating lots of small HDFS files and storing the metadata into memory.
In Order to perform namenode benchmark we perform the following:
hadoop jar hadoop-*test*.jar nnbench -operation create_write \
-maps 12 -reduces 6 -blockSize 1 -bytesToWrite 0 -numberOfFiles 1000 \
-replicationFactorPerFile 3 -readFileAfterOpen true \
-baseDir /benchmarks/NNBench-`hostname -s`
The following command will run a NameNode benchmark
that creates 1000 files using 12 maps and 6 reducers. It uses
a custom output directory based on the machine’s short
Hostname. This is a simple trick to ensure that one box
does not accidentally write into the same output directory
of another box running NNBench at the same time.
4) Mapreduce benchmark
The idea is to benchmark smaller jobs that runs in parallel into Hadoop cluster and schedule multiple MR jobs into YARN scheduler. In order to run the process use the following command that would generate 50 parallel jobs:
hadoop jar hadoop-*test*.jar mrbench -numRuns 50
References