Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 96
Текст из файла (страница 96)
The STREAM statement can thenrefer to the alias, as the following Pig script shows:-- max_temp_filter_stream.pigDEFINE is_good_quality `is_good_quality.py`SHIP ('ch16-pig/src/main/python/is_good_quality.py');records = LOAD 'input/ncdc/micro-tab/sample.txt'AS (year:chararray, temperature:int, quality:int);filtered_records = STREAM records THROUGH is_good_qualityAS (year:chararray, temperature:int);grouped_records = GROUP filtered_records BY year;max_temp = FOREACH grouped_records GENERATE group,MAX(filtered_records.temperature);DUMP max_temp;Grouping and Joining DataJoining datasets in MapReduce takes some work on the part of the programmer (see“Joins” on page 268), whereas Pig has very good built-in support for join operations,making it much more approachable.
Since the large datasets that are suitable for analysisby Pig (and MapReduce in general) are not normalized, however, joins are used moreinfrequently in Pig than they are in SQL.JOINLet’s look at an example of an inner join. Consider the relations A and B:grunt> DUMP A;(2,Tie)Data Processing Operators|459(4,Coat)(3,Hat)(1,Scarf)grunt> DUMP B;(Joe,2)(Hank,4)(Ali,0)(Eve,3)(Hank,2)We can join the two relations on the numerical (identity) field in each:grunt> C = JOIN A BY $0, B BY $1;grunt> DUMP C;(2,Tie,Hank,2)(2,Tie,Joe,2)(3,Hat,Eve,3)(4,Coat,Hank,4)This is a classic inner join, where each match between the two relations corresponds toa row in the result.
(It’s actually an equijoin because the join predicate is equality.) Theresult’s fields are made up of all the fields of all the input relations.You should use the general join operator when all the relations being joined are too largeto fit in memory. If one of the relations is small enough to fit in memory, you can use aspecial type of join called a fragment replicate join, which is implemented by distributingthe small input to all the mappers and performing a map-side join using an in-memorylookup table against the (fragmented) larger relation. There is a special syntax for tellingPig to use a fragment replicate join:9grunt> C = JOIN A BY $0, B BY $1 USING 'replicated';The first relation must be the large one, followed by one or more small ones (all of whichmust fit in memory).Pig also supports outer joins using a syntax that is similar to SQL’s (this is covered forHive in “Outer joins” on page 506).
For example:grunt> C = JOIN A BY $0 LEFT OUTER, B BY $1;grunt> DUMP C;(1,Scarf,,)(2,Tie,Hank,2)(2,Tie,Joe,2)(3,Hat,Eve,3)(4,Coat,Hank,4)9. There are more keywords that may be used in the USING clause, including 'skewed' (for large datasets witha skewed keyspace), 'merge' (to effect a merge join for inputs that are already sorted on the join key), and'merge-sparse' (where 1% or less of data is matched). See Pig’s documentation for details on how to usethese specialized joins.460|Chapter 16: PigCOGROUPJOIN always gives a flat structure: a set of tuples. The COGROUP statement is similar toJOIN, but instead creates a nested set of output tuples.
This can be useful if you want toexploit the structure in subsequent statements:grunt> D = COGROUP A BY $0, B BY $1;grunt> DUMP D;(0,{},{(Ali,0)})(1,{(1,Scarf)},{})(2,{(2,Tie)},{(Hank,2),(Joe,2)})(3,{(3,Hat)},{(Eve,3)})(4,{(4,Coat)},{(Hank,4)})COGROUP generates a tuple for each unique grouping key. The first field of each tuple isthe key, and the remaining fields are bags of tuples from the relations with a matchingkey. The first bag contains the matching tuples from relation A with the same key.
Sim‐ilarly, the second bag contains the matching tuples from relation B with the same key.If for a particular key a relation has no matching key, the bag for that relation is empty.For example, since no one has bought a scarf (with ID 1), the second bag in the tuplefor that row is empty. This is an example of an outer join, which is the default type forCOGROUP. It can be made explicit using the OUTER keyword, making this COGROUP state‐ment the same as the previous one:D = COGROUP A BY $0 OUTER, B BY $1 OUTER;You can suppress rows with empty bags by using the INNER keyword, which gives theCOGROUP inner join semantics. The INNER keyword is applied per relation, so the fol‐lowing suppresses rows only when relation A has no match (dropping the unknownproduct 0 here):grunt> E = COGROUP A BY $0 INNER, B BY $1;grunt> DUMP E;(1,{(1,Scarf)},{})(2,{(2,Tie)},{(Hank,2),(Joe,2)})(3,{(3,Hat)},{(Eve,3)})(4,{(4,Coat)},{(Hank,4)})We can flatten this structure to discover who bought each of the items in relation A:grunt> F = FOREACH E GENERATE FLATTEN(A), B.$0;grunt> DUMP F;(1,Scarf,{})(2,Tie,{(Hank),(Joe)})(3,Hat,{(Eve)})(4,Coat,{(Hank)})Using a combination of COGROUP, INNER, and FLATTEN (which removes nesting) it’s pos‐sible to simulate an (inner) JOIN:Data Processing Operators|461grunt> G = COGROUP A BY $0 INNER, B BY $1 INNER;grunt> H = FOREACH G GENERATE FLATTEN($1), FLATTEN($2);grunt> DUMP H;(2,Tie,Hank,2)(2,Tie,Joe,2)(3,Hat,Eve,3)(4,Coat,Hank,4)This gives the same result as JOIN A BY $0, B BY $1.If the join key is composed of several fields, you can specify them all in the BY clausesof the JOIN or COGROUP statement.
Make sure that the number of fields in each BY clauseis the same.Here’s another example of a join in Pig, in a script for calculating the maximum tem‐perature for every station over a time period controlled by the input:-- max_temp_station_name.pigREGISTER pig-examples.jar;DEFINE isGood com.hadoopbook.pig.IsGoodQuality();stations = LOAD 'input/ncdc/metadata/stations-fixed-width.txt'USING com.hadoopbook.pig.CutLoadFunc('1-6,8-12,14-42')AS (usaf:chararray, wban:chararray, name:chararray);trimmed_stations = FOREACH stations GENERATE usaf, wban, TRIM(name);records = LOAD 'input/ncdc/all/191*'USING com.hadoopbook.pig.CutLoadFunc('5-10,11-15,88-92,93-93')AS (usaf:chararray, wban:chararray, temperature:int, quality:int);filtered_records = FILTER records BY temperature != 9999 AND isGood(quality);grouped_records = GROUP filtered_records BY (usaf, wban) PARALLEL 30;max_temp = FOREACH grouped_records GENERATE FLATTEN(group),MAX(filtered_records.temperature);max_temp_named = JOIN max_temp BY (usaf, wban), trimmed_stations BY (usaf, wban)PARALLEL 30;max_temp_result = FOREACH max_temp_named GENERATE $0, $1, $5, $2;STORE max_temp_result INTO 'max_temp_by_station';We use the cut UDF we developed earlier to load one relation holding the station IDs(USAF and WBAN identifiers) and names, and one relation holding all the weatherrecords, keyed by station ID.
We group the filtered weather records by station ID andaggregate by maximum temperature before joining with the stations. Finally, we projectout the fields we want in the final result: USAF, WBAN, station name, and maximumtemperature.Here are a few results for the 1910s:462|Chapter 16: Pig228020029110040650999999999999999SORTAVALA322VAASA AIRPORT 300GRIMSEY378This query could be made more efficient by using a fragment replicate join, as the stationmetadata is small.CROSSPig Latin includes the cross-product operator (also known as the Cartesian product),CROSS, which joins every tuple in a relation with every tuple in a second relation (andwith every tuple in further relations, if supplied).
The size of the output is the productof the size of the inputs, potentially making the output very large:grunt> I = CROSS A, B;grunt> DUMP I;(2,Tie,Joe,2)(2,Tie,Hank,4)(2,Tie,Ali,0)(2,Tie,Eve,3)(2,Tie,Hank,2)(4,Coat,Joe,2)(4,Coat,Hank,4)(4,Coat,Ali,0)(4,Coat,Eve,3)(4,Coat,Hank,2)(3,Hat,Joe,2)(3,Hat,Hank,4)(3,Hat,Ali,0)(3,Hat,Eve,3)(3,Hat,Hank,2)(1,Scarf,Joe,2)(1,Scarf,Hank,4)(1,Scarf,Ali,0)(1,Scarf,Eve,3)(1,Scarf,Hank,2)When dealing with large datasets, you should try to avoid operations that generateintermediate representations that are quadratic (or worse) in size. Computing the crossproduct of the whole input dataset is rarely needed, if ever.For example, at first blush, one might expect that calculating pairwise document simi‐larity in a corpus of documents would require every document pair to be generatedbefore calculating their similarity.
However, if we start with the insight that mostdocument pairs have a similarity score of zero (i.e., they are unrelated), then we can finda way to a better algorithm.In this case, the key idea is to focus on the entities that we are using to calculate similarity(terms in a document, for example) and make them the center of the algorithm. Inpractice, we also remove terms that don’t help discriminate between documents (stop‐Data Processing Operators|463words), and this reduces the problem space still further. Using this technique to analyzea set of roughly one million (106) documents generates on the order of one billion (109)intermediate pairs,10 rather than the one trillion (1012) produced by the naive approach(generating the cross product of the input) or the approach with no stopword removal.GROUPWhere COGROUP groups the data in two or more relations, the GROUP statement groupsthe data in a single relation. GROUP supports grouping by more than equality of keys:you can use an expression or user-defined function as the group key.
For example,consider the following relation A:grunt> DUMP A;(Joe,cherry)(Ali,apple)(Joe,banana)(Eve,apple)Let’s group by the number of characters in the second field:grunt> B = GROUP A BY SIZE($1);grunt> DUMP B;(5,{(Eve,apple),(Ali,apple)})(6,{(Joe,banana),(Joe,cherry)})GROUP creates a relation whose first field is the grouping field, which is given the aliasgroup. The second field is a bag containing the grouped fields with the same schema asthe original relation (in this case, A).There are also two special grouping operations: ALL and ANY. ALL groups all the tuplesin a relation in a single group, as if the GROUP function were a constant:grunt> C = GROUP A ALL;grunt> DUMP C;(all,{(Eve,apple),(Joe,banana),(Ali,apple),(Joe,cherry)})Note that there is no BY in this form of the GROUP statement.
The ALL grouping is com‐monly used to count the number of tuples in a relation, as shown in “Validation andnulls” on page 442.The ANY keyword is used to group the tuples in a relation randomly, which can be usefulfor sampling.10. Tamer Elsayed, Jimmy Lin, and Douglas W.