https://www.codementor.io/sheena/tutorials/extending-hadoop-apache-pig-with-python-udfs-du107mj6t
https://github.com/Netflix/lipstick
http://habrahabr.ru/post/223217/
http://mortar-public-site-content.s3-website-us-east-1.amazonaws.com/Mortar-Pig-Cheat-Sheet.pdf
http://habrahabr.ru/company/selectel/blog/215307/
http://www.ledem.net/hack/map-red-js-console.html
http://pig.apache.org/docs/r0.8.0/udf.html
http://pig.apache.org/docs/r0.8.0/cookbook.html
http://squarecog.wordpress.com/2010/12/19/new-features-in-apache-pig-0-8/
http://squarecog.wordpress.com/2010/05/11/group-operator-in-apache-pig/
http://squarecog.wordpress.com/2009/01/17/building-an-inverted-index-with-hadoop-and-pig/
http://developer.yahoo.com/blogs/hadoop/posts/2010/01/comparing_pig_latin_and_sql_fo/
http://www.cloudera.com/blog/2009/06/analyzing-apache-logs-with-pig/
http://agiletesting.blogspot.com/2011/07/processing-mail-logs-with-elastic.html
http://www.dataspora.com/2011/04/pigs-bees-and-elephants-a-comparison-of-eight-mapreduce-languages/
http://agiletesting.blogspot.com/2012/02/set-operations-in-apache-pig.html
complex data types: bags, tuples, and maps
tuple is an ordered set of fields
bag is collection of tuples. bag can have tuples with differing numbers of fields
A Pig relation is a bag of tuples. A Pig relation is similar to a table in a relational database, where the tuples in the bag correspond to the rows in a table. Unlike a relational table, however, Pig relations don't require that every tuple contain the same number of fields or that the fields in the same position (column) have the same type.
How do I prevent failure if some records don't have the needed number of columns?
A = LOAD 'foo' USING PigStorage('\t');
B = FILTER A BY ARITY(*) < 5;
How to do select count(*), also look on COUNT ALL:
a = LOAD 'mytestfile.txt';
b = GROUP a ALL;
c = FOREACH b GENERATE COUNT(a); -- ignore records with nulls in 1st field in tuple
c = FOREACH b GENERATE COUNT_STAR(a); -- NOT ignore records with nulls in 1st field in tuple
Another way to select count(*):
NUM = FOREACH A GENERATE 1 as whatever;
G = GROUP NUM BY whatever;
COUNT = FOREACH G COUNT(group);
GROUP BY
Keyword group: means the field name(s) which was used for grouping:
B = GROUP A BY somefield;
FOREACH b GENERATE group, COUNT_STAR(a)
Test the existence of values in a map using the null construct:
m#'key' is not null
Difference of cogroup and join.
Test.txt
1 2
1 45
3 4
Test1.txt
1 23
6 1
Join
A = load 'test.txt' using PigStorage() as (a:int, b:int);
B = load 'test1.txt' using PigStorage() as (a:int, b:int);
C = join A by a left outer, B by a parallel 2;
describe C;
Result:
(1,2,1,23)
(1,45,1,23)
(3,4,,)
The schema of C is of the form
C: {A::a: int,A::b: int,B::a: int,B::b: int}
As for Cogroup
C = cogroup A by a inner, B by a outer parallel 2;
the output will be:
(1,{(1,2),(1,45)},{(1,23)})
(3,{(3,4)},{})
The first bag is the tuples from the first relation with the matching key field.
The second bag is the tuples from the second relation with the matching key field.
If no tuples match the key field, the bag is empty.
Here the schema of C is of the form
C: {group: int,A: {a: int,b: int},B: {a: int,b: int}}
Join is the flattened structure: a set of tuples. Cogroup created a nested set of tuples.
SELECT TOP
data format is: query \t url \t frequency.
Goal: select top 10 urls of each query.
----
A = LOAD 'quf' AS (query, url, frequency:int);
B = GROUP A BY query;
C = FOREACH B generate group, TOP(10, 2, A);
this will not give the sorted data for your urls,
but just the top 10 in some partial order (heap order).
------
A = LOAD 'quf' USING PigStorage('\t') AS (query, url, frequency:int);
B = GROUP A BY query;
C = FOREACH B { X = ORDER A BY frequency DESC; Y = LIMIT X 10; GENERATE group,Y;};
QUERY
Dataset A contains campaigns and options signed up by the campaigns. Dataset B contains all the possible options that a campaign can sign up for.
dataset A
campaign options
------------------------
ABC 1
ABC 2
DEF 2
dataset B
options description
----------------------------
1 option1
2 option2
3 option3
We want to generate the following output:
campaigns options signed
-------------------------------------
ABC 1 Y
ABC 2 Y
ABC 3 N
DEF 1 N
DEF 2 Y
DEF 3 N
---------------------------------------------
A = load 'campaign' using PigStorage(',') as (cmpgn:chararray,opt:int);
B = load 'options' using PigStorage(',') as (opt:int,descr:chararray);
-- Generate all combinations
C = cross A, B;
D = foreach C generate A::cmpgn as cmpgn, A::opt as aopt, B::opt as bopt;
-- Mark matches
E = foreach D generate cmpgn, bopt, (aopt == bopt ? 1 : 0) as match;
F = group E by (cmpgn, bopt);
-- Count matches
G = foreach F generate flatten(group), (SUM(E.match) == 0 ? 'N' : 'Y');
dump G;
WordCount in PigLatin
input = LOAD ’in-dir' USING TextLoader();
words = FOREACH input GENERATE FLATTEN(TOKENIZE(*));
grouped = GROUP words BY $0;
counts = FOREACH grouped GENERATE group, COUNT(words);
STORE counts INTO ‘out-dir’;
Working with Jyton
File: mapkeys.data
[name#John,phone#5551212]
File mapkeys.pig
register 'mapkeys.py' using jython as mapkeys;
A = load 'mapkeys.data' using PigStorage() as ( aMap: map[] );
B = foreach A generate aMap#'name', aMap#'phone';
C = foreach A generate FLATTEN(mapkeys.keys(aMap));
dump C;
File: mapkeys.py
@outputSchema("keys:bag{t:tuple(key:chararray)}")
def keys(map):
print "mapkeys.py:keys:map:", map
outBag = []
for key in map.iterkeys():
#t = (key) ## doesn't work, causes Pig to crash
t = (key,) ## adding ,empty value is correct python
outBag.append(t)
print "mapkeys.py:keys:outBag:", outBag
return outBag
Example
pig -useversion 0.8 $HINT -Ddfs.block.size=$SIZE -Dmapred.min.split.size=$SIZE -Dmapred.job.queue.name=audience_fetl -Dmapred.reduce.tasks.speculative.execution=true -Dmapred.map.tasks.speculative.execution=true -p input=$SEQ group.pig
-------group.pig----------
SET default_parallel 100
REGISTER /homes/lubinsky/fetl_sequence_projector/lib/jar/SequenceProjector.jar;
A = LOAD '$input' USING com.yahoo.ccdi.fetl.sequence.pig.Projector('bcookie,src_pty,type');
describe A;
D = GROUP A BY type;
E = FOREACH D GENERATE group, COUNT(A);
DUMP E;
-------------------------------
(a,47)
(c,242688)
(l,960)
(p,2672360)
(B,25063)
$ pig --help
pig script usage: pig
-l, --latest
use latest, untested, unsupported version of pig.jar instaed of relased, tested, supported version.
--useversion
use a specific version of pig.jar.
-c, --cluster _clustername_
run on cluster _clustername_ instead of default kryptonite.
-A account
the account name
pig.jar help:
USAGE: Pig [options] [-] : Run interactively in grunt shell.
Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).
Pig [options] [-f[ile]] file : Run cmds found in file.
options include:
-4, -log4jconf - Log4j configuration file, overrides log conf
-b, -brief - Brief logging (no timestamps)
-c, -check - Syntax check
-d, -debug - Debug level, INFO is default
-e, -execute - Commands to execute (within quotes)
-f, -file - Path to the script to execute
-h, -help - Display this message. You can specify topic to get help for that topic.
properties is the only topic currently supported: -h properties.
-i, -version - Display version information
-l, -logfile - Path to client side log file; default is current working directory.
-m, -param_file - Path to the parameter file
-p, -param - Key value pair of the form param=val
-r, -dryrun - Produces script with substituted parameters. Script is not executed.
-t, -optimizer_off - Turn optimizations off. The following values are supported:
SplitFilter - Split filter conditions
MergeFilter - Merge filter conditions
PushUpFilter - Filter as early as possible
PushDownForeachFlatten - Join or explode as late as possible
ColumnMapKeyPrune - Remove unused data
LimitOptimizer - Limit as early as possible
AddForEach - Add ForEach to remove unneeded columns
MergeForEach - Merge adjacent ForEach
LogicalExpressionSimplifier - Combine multiple expressions
All - Disable all optimizations
All optimizations are enabled by default. Optimization values are case insensitive.
-v, -verbose - Print all error messages to screen
-w, -warning - Turn warning logging on; also turns warning aggregation off
-x, -exectype - Set execution mode: local|mapreduce, default is mapreduce.
-F, -stop_on_failure - Aborts execution on the first failed job; default is off
-M, -no_multiquery - Turn multiquery optimization off; default is on
-P, -propertyFile - Path to property file
Using external jars
REGISTER /homes/lubinsky/fetl_sequence_projector/lib/jar/SequenceProjector.jar;
A = LOAD '$input' USING com.yahoo.ccdi.fetl.sequence.pig.Projector('misc,src_spaceid,clickinfo');
Find the latest of all the records sharing the same (A,B,C) according to the timestamp for records like this: (timestamp, A, B, C)
In M/R, approach is: use (A,B,C) as the key and the timestamp as the value in the mappers and pick the latest timestamp in reducer.
AbcGroup = group Data by ( A, B, C );
MaxTimestamp = foreach AbcGroup generate FLATTEN(group), MAX(Data.timestamp);
STREAMING http://wiki.apache.org/hadoop/HadoopStreaming
http://devblog.factual.com/practical-hadoop-streaming-dealing-with-brittle-code
Configuration:
http://wiki.apache.org/hadoop/HowManyMapsAndReduces
if you want to reduce the number of mappers
dfs.block.size=<256M or 512M etc>
mapred.min.split.size=<256M or 512M etc>
mapred.min.split.size can be only set to larger than HDFS block size
The parameter which can increase the number of mappers is named
mapred.max.split.size, it should be less then the block size to take effect
- mapred.min.split.size controls the minimun size of a split.
- mapreduce.map.tasks: The suggested number of map tasks
- dfs.block.size: the file system block size in bytes of input file
http://www.hadoop-blog.com/2010/11/hadoop-administrator-interview.html
http://www.phacai.com/specify-the-number-of-mappers-and-make-every-mapper-process-the-same-data-set-on-mapreduce