EcoSystem

Hive and Pig provides alternate interfaces to Map Reduce

Hive and Pig are lot more easier compared to map reduce(MR)

Hive- Sits on top of MapReduce. Looks like more of a SQL database. Where we can connect through shell or ODBC etc. Runs sql queries on the data stored on Hadoop cluster. If you are familiar with SQL hive would be a good API.

-Uses a SQL like query language called HiveQL.

-Can be installed on each individual client or as centralised web service(Hive2). Developed at Facebook and it is easy to learn.

-Requires a metastore(usually mysql) to provide Hive table understand HDFS data mapping.

PIG- It is just a high level scripting language which sits on top of MapReduce. If you don't want to write java or python MR codes and you want to some script language which looks like sql, then Pig is the option.

Uses its own data flow language called Pig Latin.

But they are slower 10-15% compared to well written java.

-Can be installed on client or run as web service. Developed by yahoo. Uses a data flow language, particularly well suited to Extract Transform and Load(ETL).

-Requires nothing except the pig interpreter.

AMBARI: Which sits on every thing on the hadoop ecosystem and helps to visualise what is running on your cluster and which system are using how much resources and has some views to run hive queries or import databases into hive or execute pig queries. It helps to get the state of the systems running under its hood. There are many competing technolgies, Ambari is what Hortonwork uses.

MESOS: Mesos is a resource negotiator. Mesos is not hadoop proper, but can be used as an alternative to YARN or can be configured to run in parallel with YARN. Spark can connect to mesos or yarn to query data.

SPARK: Sitting at the same level of MapReduce. Want to process data in hadoop cluster very fast, stark is a good choice. Sparek codes can be written in pyton, java or scala. It is extremely fast and has a very big user community. It is very versatile, it can handle sql queries, do machine learning across a entire cluster of information and can also handle streaming data in realtime.

TEZ: Similar to spark and used the same techniques as spark notably directed acyclic graph this give us a legs up on Mapreduce does because it can produce more optimal plans for executing your queries. TEZ is used in usually in conjunction with hive to accelerate it. Hive through TEZ could be faster than Hive through map reduce.

HBASE: HBase is a no-sql database and a columnar datastore. Can be used to expose your data to transactional platforms. Hbase can expose the data that is stored on your cluster, given the data is transformed in someway using spark or MR. Helps in exposing the data to other systems.

STORM: A way of processing streaming data. Streaming data from sensors, weblogos etc. Storm is build to process streaming data quickly in realtime. Use cases could updating your machine learning model or transform you data as it comes in.

OOZIE: Oozie is a way of scheduler jobs in your cluster. If you have a task that involved many steps ooze is the way of scheduling all of this things as jobs. Say, you have a complicated operation loading data into hive and integrating that with pig and querying it with spark and transforming the results with hbase - ooze can run all those in consistent basis.

ZOOKEEPER: Is a co-ordinating service. It technology to keep track of which nodes are up which nodes are down and which keeps a shared state across your cluster which different application can use.

For DATA INGESTION into your cluster we can use

Scoop, Flume or Kafka

SCOOP: scoop is a way of tying your database into a relational database that can talk to ODBC or JDBC can be transformed to hadoop files system

FLUME: A ways to transform your weblogs into hadoop filesystem. Say you have a fleet of web server, flume can listen to the weblogs and publish them into your cluster in realtime by storm or spart for realtime streaming.

KAFKA: It is a generic Distributed message system that collected data from different systems sends to the consumers or downstream systems. Solves the problem of data ingestion.

External DATA STORAGE

We can use MYSQL or Cassandra/Mongodbe which are columnar storage to store the data extracted, say from spark. Which can be used to expose data to the web applications.

QUERY ENGINES:

DRILL - Allows to write sql across no-sql databases. It can query manodb, cassandra and other nodal databases and tie the results together. Allows you to write queries with all the desperate datastores.

HUE: Interactively creating queries. Like Amber. Execute query as a whole and is used by cloudera.

PHOENIX - Similar to drill, but it takes a step further not make it looks like a relational data store for a non-relational datastore.

PRESTO - Another way to execute query

ZEPPELIN - Take a notebook kind of approach how you interact with your cluster.

IMPALA- is a completely separate set of demons that runs in cluster.

- impalad( run on each server node)

-statestored( run on a single machine - 1 demon per cluster)

-Interfaces is a subset of HiveQL and does not allow for user defined functions

-Developed by cloudera.

-Meant for low latency interactive queries.

Hive and Impala are good for analytics. Pig is good in ETL or processing.

WHEN TO USE HIVE, PIG, IMPALA

Use Hive when your data formats well in row/column formats. more like sql - no brainer.

Use Pig to perform more complex data flow( pig give more control than hive)

User Impala when you data formats well in row/column format. Real time interactive query capability. Fault tolerance is acceptable.

Hive, Pig and Impala all can be used together, based on the type of query.

You can also tie Hive and Pig scripts together with the Ooze workflow tool.

HIVE-BASICS

Hive can install on each client machine individually. (CLI access)

Or installed as a service on centralised separate server called Hiver Servers(ODBC, JDBC, Beeline CLI) or HUE(beeswax, web interface), which can be used by many clients.

Requires Metastore for each client or server. Megtastore provide Hive "table" to HDFS to "data" mapping. So when we are running a query on Hive we are looking data underneath HDFS. No data is stored in the tables.

Example:

CREATE EXTERNAL TABLE page( userid bigint, url string, ip string)

COMMENT 'This is a test hive query'

ROW FORMAT DELIMITED

FIELD TERMINATED BY '/t' ----------> Inform that files is delimited by tab

LOCATION 'path/in/HDFS/' -----------> Location of the path in HDFS

Hive query can be

select * from page where url like "%hive.com%"

Hive can use "partitions" to speed queries

  • Partitions exists as directories below the main table

    • Think of page /page_view/<year>-01/and page_view/<date>-02

      • Only a particular partition(s) are consulted for specific month queries - querying "slices" of data is now much faster.

    • Hive "buckets" allow for efficient sampling of data

      • By creating buckets, we break up a column into "buckets" - hash partitioning that column.

      • Each bucket will now have a random sampling of data for that field

      • When we try out a query, we can now run a query on a bucket to sample only a portion of the data.

DATA IMPORT AND EXPORT

Real Time Ingestion and Analysis Tools

  • Flume, Storm, Kafka and Kinesis - roughly do the same thing

  • Involves multiple distributed agents who push/pull data to/from one another

  • Typically one agent per machine

  • Where agents can do "value added processing" like de-duping, filtering etc.

    • Flume can be better in 'value added processing better.

    • Engines like Strong, Kafka, Kinesis are rich in doing analysis in the data they are pulling in.

    • Systems are platform-agnostic

      • They do not care if the eventual sink is Hadoop, NOSQL, or flat file

Database Import and Export

  • Sqoop - SQL to Hadoop

  • Single process that imports/exports data from in and out of Hadoop

  • Does no analysis or filter itself

  • Frequently used to move data to/from Hadoop on periodic basis, so hadoop can do its job processing

    • A scoop job is ofter part of 'Oozie' workflow.

      1. Say at 2.00am we pull order table delta from MYSQL from the online transaction system.

      2. After the data is pulled, 'run' recommendation engine - i.e Map reduce job

      3. After map reduce job completes, we use scoop to exports data from the recommendation engine back to MYSQL and update recommendation engine.

      • The above 3 would be a separate work flow in oozie.

FLUME Vs SQUOOP

    • Flume can pull in data real time from any system

      • initially created to process log files(thus the name)

      • Flume is one direction and is only used for ingestion(does not export data from HDFS). But Scoop can export data from HDFS

      • Flume agent must be written in java and runs in JVM and is easy to configure ( a config file with 15 line of codes is suffice and even no need to write any java code)

    • Scoop can import/export data from RDBMS

      • can do "deltas" (i.e pull only newly added rows)

      • User Map only job to select /insert data from RDBMS

Both - Can act as a "dumping agents' into Hadoop as opposed to "analyse on ingest" systems like Storm, Kafka and Kinesis.

FLUME Vs STORM

Storm

  • Built for complex real-time streaming analytics(analysing data in-flight) to analyse data as it is being generated.

  • Developed by Twitter

    • Can write worker processes in any language(uses Thrift interface, JSON over stdin/stdout)

Flume

    • Build for moving massive amount of data (log data) typically from source to destination(typically Hadoop or NoSQL db)

      • Extremely easy to configure and use

    • Developed by Cloudera

    • Analysis capable but not as easy as Storm(as storm is build for analysis purpose.

Both are extremely scalable - horizontally and vertically. You can add tiers to individual data flow or you can add additional agent.

FLUME Vs KINESIS, KAFKA, STORM

Flume - Pushes most of the analysis to batch processing systems like Hadoop. Ideal for log analysis.

Kinesis- Kafka - Storm

    • Perform data analysis on unterminated data(i.e the flow of data never stops)

    • Can also persist data for later analysis(especially Kafka)

    • Ideal for projects where fast analysis is required.

BEST PRACTICES

User a workflow engine lie Oozie - Which can chain together pull/analyze/push operations into repeatable 'chunks'. So that we can run many things like pulling the data from RDBMS, running a series of pig, scoop or running map reduce jobs and exporting the data back into RDBMS.

These export import system do have some concerns we need to take into account.

HDFS Concerns

- HDFS exposes files as the #blocks as they are written.

- Unless a directory structure is in place, users might end up processing data that is not completely finished. To avoid use directory structure like this in HDFS

/incoming - #as data is written - incoming data and upstream process, pushes data to

/ready-for-processing # once a full files is written it will be moved here

/processed # all batch processing complete.

-Systems like Flume, Kinesis, Kafka keeps writing data forever and setting up the 3-tires directory structure becomes very important.

-Systems like Flume will create small files problems in HDFS, where it creates random reads which becomes burns up memory and becomes a overhead to the name node.

To over this use agent process(like Flume) to limit what gets into HDFS. We can de-dupe or filter out certain record to limit what gets into HDFS.

-Extend collection period to larger files. Say, when Flume write to HDFS in 5sec increment, to larger period so we push large files to HDFS.

-Comine and compact the small files and store them in HDFS as one single large file.

FLUME BASICS

    • Moves large amount of data from where it's generated to where it need to be processed. And it is very different from 'analyze to ingest' systems like Storm, Kafta and Kinesis.

    • Developed by Cloudera for use with Hadoop (sink to HDFS). Now it is extended to with many NOSQL engines.

    • Series of Agents that live in Linux machines

    • Each Agent has a

      • Source - where data comes from - could be remote port of a remote machine or a local tail of a log file of web server.

      • Sink (drains events from a channel) - Sink could be remote port/machine or HDFS.

      • Channel - Buffer between the source and the sink.

    • Transactional - when passing data to sink, passing agent waits for 'ack' from the downstream. If no 'ack', upstream does a re-transmit.

      • Agents can be made HA and horizontally scaled.

    • Extending Flume is very Simple - Can write source, sink and channel. Most existing 'standard' systems are already supported.

      • Can process the data between source and sink

        • natively (regex_filter_interceptor)

        • Custom(extend existing code for custom operation). But must be written in Java.

    • Maintenance - There could be many systems running the agents. Will need to use some config management tool like Puppet or Chef to push out config changes.

      • Config changes can be updated on the fly using properties file. No agent restart is required if new property file come-in.

    • Usage

      • Install Flume agents in each node in flow

      • setup source, sink, channel config files (only 20+ lines of config)

      • Start agents

      • Setup MR jobs ( for periodic processing) as needed. You are Done and takes only minutes to setup.

      • And easily automated with config mgmt tools.

HVFS INTERACTION TOOLS

WebHDFS, HttpFS, FuseDFS

WebHDFS and HttpFS

  • Both allow (even legacy) programatic, REST-ful access to HDFS.

    • By calling URLs HDFS file system can be accessed by calling the Name Node or httpFS server.

    • This can be useful for legacy systems where hadoop client cannot be installed.

    • We can put command operations put, get list etc.

    • user 'curl' or 'wget' to interact with WebHDFS or HttpFS.

    • Both user exactly same API commands to interact with HDFS

      • HttpFS call use webHDFS url in the call

    • Both can use both strong Kerberos authentication or weak HDFS base hadoop authentication.

WebHDFS - It is simplest to deploy. One config change in the NN, and resort of NN and DN.

      • Requires direct client access to every DN in the cluster

HttpFS - (formerly know as Hoop)

      • Requires setup of HttpsFS server.

      • Only HttpFS server needs access to the DN. Users then submits requests to HttpFS servers and clients don't require direct access to DN server

http://<host>:<port>/webhdfs/v1/<path>?op=

FuseDFS - File System in UserSpace

    • Let users to mount HDFS as though it were local, just like c or d drive.

    • Many Fuse implementation exists for other virtual systems like Cassandra, box.net etc.

    • Extremely useful for novice users

      • Managers can run excel macros right on HDFS data

      • Data entry folks can write directly to HDFS without making any WebHDFS calls.

      • Educating and Sandboxing is necessary.

      • Users quotas are must

SQOOP - RBDMS Import/Export Tool

    • Transfer data between Hadoop and RDBMS

    • Uses map only job to select/insert data from RDBMS

      • By default four mappers which is configurable.

      • Can throttle bandwidth

      • Many custom connector available for Netezza and Terradata.

      • Developed by Cloudera.

    • With scoop server essentially we need to install a fat client on our local machine.

      • This scoop service will then kickoff the Map Tasks in the cluster. when then dumps data into HDFS Hbase or Hive.

      • Where it gets data from is EDW, Document based systems, RDBMS. We can pull data from any of these systems via a map task into HDFS. We can take HDFS data and export to any of these systems.

SQOOP2

    • Has a light weight client on users machine.

    • The client connect to 'REST' url called 'Mode' or UI which can be accessed via browser.

    • It also has connectors and metadata repository. This is the same metadata repository as hive.

    • Users connect to the heavy Scoop server and run the map tasks.

    • Reset are same except that Sqoop2 can run reduce tasks as well.

    • Since Sqoop has centralised server access control is possible.

    • Probably the preferred way to run updates.

Advance Usage

  • Sqoop can support incremental updates(all deltas since last autoinc or timestamp field)

  • Can limit import rows/colums

  • Can auto-gen Hive metastore tables and HBase records

    • Can import delimited text like cvs or binaries like avro or sequencefiles

    • Suports BLOBS and CLOBS

    • Commands for Database inspection like show list etc.

OOZIE BASICS

  • Standalone Java web app that accepts requests to run workflows

    • Tomcat Java servlet(requires own server machine)

    • Workflow written in XML

      • 'Actions' can be combo of Sqoop, Pig, Hive, Java and more

    • Workflow can run on

      • On demand

      • On Schedule

      • Directly from java(Java Client API)

        • When data appears in directory(great for sporadic client uploads)

      • Support Kerberos and hadoop basic authentication

Control and Action

    • Workflows has control and action nodes

    • Control

      • When to start (where processing begins)

      • End (return success)

      • Kill (die with messaging)

      • Decision(case:switch)

      • Fork:join(wait until all [fork]complete, continue processing at[join]

    • Action

      • Email

      • Shell ( any arbitrary shell script)

      • Hive, Pig, and Scoop action nodes, which allow us to run hive, pig and scoop commands.

      • Sub-work flow - A workflow can contain whole other workflows

      • Java (runnable JAR)

      • MapReduce jobs

      • HDFS operations