Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 95
Текст из файла (страница 95)
It does this by means of a TupleFactory, a Pig class for creatingTuple instances. The newTuple() method creates a new tuple with the required numberof fields, which is just the number of Range classes, and the fields are populated usingsubstrings of the line, which are determined by the Range objects.User-Defined Functions|455We need to think about what to do when the line is shorter than the range asked for.One option is to throw an exception and stop further processing. This is appropriate ifyour application cannot tolerate incomplete or corrupt records.
In many cases, it isbetter to return a tuple with null fields and let the Pig script handle the incomplete dataas it sees fit. This is the approach we take here; by exiting the for loop if the range endis past the end of the line, we leave the current field and any subsequent fields in thetuple with their default values of null.Using a schemaLet’s now consider the types of the fields being loaded. If the user has specified a schema,then the fields need to be converted to the relevant types.
However, this is performedlazily by Pig, so the loader should always construct tuples of type bytearrary, using theDataByteArray type. The load function still has the opportunity to do the conversion,however, by overriding getLoadCaster() to return a custom implementation of theLoadCaster interface, which provides a collection of conversion methods for thispurpose.CutLoadFunc doesn’t override getLoadCaster() because the default implementationreturns Utf8StorageConverter, which provides standard conversions betweenUTF-8–encoded data and Pig data types.In some cases, the load function itself can determine the schema.
For example, if wewere loading self-describing data such as XML or JSON, we could create a schema forPig by looking at the data. Alternatively, the load function may determine the schemain another way, such as from an external file, or by being passed information in itsconstructor. To support such cases, the load function should implement the LoadMetadata interface (in addition to the LoadFunc interface) so it can supply a schema to thePig runtime.
Note, however, that if a user supplies a schema in the AS clause of LOAD,then it takes precedence over the schema specified through the LoadMetadata interface.A load function may additionally implement the LoadPushDown interface as a means forfinding out which columns the query is asking for. This can be a useful optimizationfor column-oriented storage, so that the loader loads only the columns that are neededby the query. There is no obvious way for CutLoadFunc to load only a subset of columns,because it reads the whole line for each tuple, so we don’t use this optimization.Data Processing OperatorsLoading and Storing DataThroughout this chapter, we have seen how to load data from external storage for pro‐cessing in Pig. Storing the results is straightforward, too.
Here’s an example of usingPigStorage to store tuples as plain-text values separated by a colon character:456|Chapter 16: Piggrunt> STORE A INTO 'out' USING PigStorage(':');grunt> cat outJoe:cherry:2Ali:apple:3Joe:banana:2Eve:apple:7Other built-in storage functions were described in Table 16-7.Filtering DataOnce you have some data loaded into a relation, often the next step is to filter it toremove the data that you are not interested in. By filtering early in the processing pipe‐line, you minimize the amount of data flowing through the system, which can improveefficiency.FOREACH...GENERATEWe have already seen how to remove rows from a relation using the FILTER operatorwith simple expressions and a UDF.
The FOREACH...GENERATE operator is used to acton every row in a relation. It can be used to remove fields or to generate new ones. Inthis example, we do both:grunt> DUMP A;(Joe,cherry,2)(Ali,apple,3)(Joe,banana,2)(Eve,apple,7)grunt> B = FOREACH A GENERATE $0, $2+1, 'Constant';grunt> DUMP B;(Joe,3,Constant)(Ali,4,Constant)(Joe,3,Constant)(Eve,8,Constant)Here we have created a new relation, B, with three fields.
Its first field is a projection ofthe first field ($0) of A. B’s second field is the third field of A ($2) with 1 added to it. B’sthird field is a constant field (every row in B has the same third field) with the chararray value Constant.The FOREACH...GENERATE operator has a nested form to support more complex pro‐cessing. In the following example, we compute various statistics for the weather dataset:-- year_stats.pigREGISTER pig-examples.jar;DEFINE isGood com.hadoopbook.pig.IsGoodQuality();records = LOAD 'input/ncdc/all/19{1,2,3,4,5}0*'USING com.hadoopbook.pig.CutLoadFunc('5-10,11-15,16-19,88-92,93-93')AS (usaf:chararray, wban:chararray, year:int, temperature:int, quality:int);grouped_records = GROUP records BY year PARALLEL 30;Data Processing Operators|457year_stats = FOREACH grouped_records {uniq_stations = DISTINCT records.usaf;good_records = FILTER records BY isGood(quality);GENERATE FLATTEN(group), COUNT(uniq_stations) AS station_count,COUNT(good_records) AS good_record_count, COUNT(records) AS record_count;}DUMP year_stats;Using the cut UDF we developed earlier, we load various fields from the input datasetinto the records relation.
Next, we group records by year. Notice the PARALLEL keywordfor setting the number of reducers to use; this is vital when running on a cluster. Thenwe process each group using a nested FOREACH...GENERATE operator. The first nestedstatement creates a relation for the distinct USAF identifiers for stations using theDISTINCT operator. The second nested statement creates a relation for the records with“good” readings using the FILTER operator and a UDF.
The final nested statement is aGENERATE statement (a nested FOREACH...GENERATE must always have a GENERATE state‐ment as the last nested statement) that generates the summary fields of interest usingthe grouped records, as well as the relations created in the nested block.Running it on a few years’ worth of data, we get the following:(1920,8L,8595L,8595L)(1950,1988L,8635452L,8641353L)(1930,121L,89245L,89262L)(1910,7L,7650L,7650L)(1940,732L,1052333L,1052976L)The fields are year, number of unique stations, total number of good readings, and totalnumber of readings. We can see how the number of weather stations and readings grewover time.STREAMThe STREAM operator allows you to transform data in a relation using an external pro‐gram or script.
It is named by analogy with Hadoop Streaming, which provides a similarcapability for MapReduce (see “Hadoop Streaming” on page 37).STREAM can use built-in commands with arguments. Here is an example that uses theUnix cut command to extract the second field of each tuple in A. Note that the commandand its arguments are enclosed in backticks:grunt> C = STREAM A THROUGH `cut -f 2`;grunt> DUMP C;(cherry)(apple)(banana)(apple)458|Chapter 16: PigThe STREAM operator uses PigStorage to serialize and deserialize relations to and fromthe program’s standard input and output streams. Tuples in A are converted to tabdelimited lines that are passed to the script. The output of the script is read one line ata time and split on tabs to create new tuples for the output relation C. You can providea custom serializer and deserializer by subclassing PigStreamingBase (in theorg.apache.pig package), then using the DEFINE operator.Pig streaming is most powerful when you write custom processing scripts.
The followingPython script filters out bad weather records:#!/usr/bin/env pythonimport reimport sysfor line in sys.stdin:(year, temp, q) = line.strip().split()if (temp != "9999" and re.match("[01459]", q)):print "%s\t%s" % (year, temp)To use the script, you need to ship it to the cluster. This is achieved via a DEFINE clause,which also creates an alias for the STREAM command.