Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 94
Текст из файла (страница 94)
(This, incidentally, is why function names are casesensitive: because Java classnames are.) When searching for classes, Pig uses a class‐loader that includes the JAR files that have been registered. When running in distributedmode, Pig will ensure that your JAR files get shipped to the cluster.For the UDF in this example, Pig looks for a class with the name com.hadoopbook.pig.IsGoodQuality, which it finds in the JAR file we registered.Resolution of built-in functions proceeds in the same way, except for one difference:Pig has a set of built-in package names that it searches, so the function call does nothave to be a fully qualified name. For example, the function MAX is actually implementedby a class MAX in the package org.apache.pig.builtin.
This is one of the packages thatPig looks in, so we can write MAX rather than org.apache.pig.builtin.MAX in our Pigprograms.We can add our package name to the search path by invoking Grunt with this commandline argument: -Dudf.import.list=com.hadoopbook.pig. Alternatively, we can short‐en the function name by defining an alias, using the DEFINE operator:450|Chapter 16: Piggrunt> DEFINE isGood com.hadoopbook.pig.IsGoodQuality();grunt> filtered_records = FILTER records BY temperature != 9999 AND>>isGood(quality);Defining an alias is a good idea if you want to use the function several times in the samescript.
It’s also necessary if you want to pass arguments to the constructor of the UDF’simplementation class.If you add the lines to register JAR files and define function aliases tothe .pigbootup file in your home directory, they will be run whenev‐er you start Pig.Leveraging typesThe filter works when the quality field is declared to be of type int, but if the typeinformation is absent, the UDF fails! This happens because the field is the default type,bytearray, represented by the DataByteArray class. Because DataByteArray is not anInteger, the cast fails.The obvious way to fix this is to convert the field to an integer in the exec() method.However, there is a better way, which is to tell Pig the types of the fields that the functionexpects.
The getArgToFuncMapping() method on EvalFunc is provided for preciselythis reason. We can override it to tell Pig that the first field should be an integer:@Overridepublic List<FuncSpec> getArgToFuncMapping() throws FrontendException {List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>();funcSpecs.add(new FuncSpec(this.getClass().getName(),new Schema(new Schema.FieldSchema(null, DataType.INTEGER))));return funcSpecs;}This method returns a FuncSpec object corresponding to each of the fields of the tuplethat are passed to the exec() method. Here there is a single field, and we construct ananonymous FieldSchema (the name is passed as null, since Pig ignores the name whendoing type conversion).
The type is specified using the INTEGER constant on Pig’sDataType class.With the amended function, Pig will attempt to convert the argument passed to thefunction to an integer. If the field cannot be converted, then a null is passed for thefield. The exec() method always returns false when the field is null. For thisapplication, this behavior is appropriate, as we want to filter out records whose qualityfield is unintelligible.User-Defined Functions|451An Eval UDFWriting an eval function is a small step up from writing a filter function. Consider theUDF in Example 16-2, which trims the leading and trailing whitespace from chararrayvalues using the trim() method on java.lang.String.6Example 16-2.
An EvalFunc UDF to trim leading and trailing whitespace from charar‐ray valuespublic class Trim extends PrimitiveEvalFunc<String, String> {@Overridepublic String exec(String input) {return input.trim();}}In this case, we have taken advantage of PrimitiveEvalFunc, which is a specializationof EvalFunc for when the input is a single primitive (atomic) type. For the Trim UDF,the input and output types are both of type String.7In general, when you write an eval function, you need to consider what the output’sschema looks like. In the following statement, the schema of B is determined by thefunction udf:B = FOREACH A GENERATE udf($0);If udf creates tuples with scalar fields, then Pig can determine B’s schema through re‐flection. For complex types such as bags, tuples, or maps, Pig needs more help, and youshould implement the outputSchema() method to give Pig the information about theoutput schema.The Trim UDF returns a string, which Pig translates as a chararray, as can be seen fromthe following session:grunt> DUMP A;( pomegranate)(banana )(apple)( lychee )grunt> DESCRIBE A;A: {fruit: chararray}grunt> B = FOREACH A GENERATE com.hadoopbook.pig.Trim(fruit);grunt> DUMP B;(pomegranate)(banana)6.
Pig actually comes with an equivalent built-in function called TRIM.7. Although not relevant for this example, eval functions that operate on a bag may additionally implement Pig’sAlgebraic or Accumulator interfaces for more efficient processing of the bag in chunks.452| Chapter 16: Pig(apple)(lychee)grunt> DESCRIBE B;B: {chararray}A has chararray fields that have leading and trailing spaces. We create B from A byapplying the Trim function to the first field in A (named fruit). B’s fields are correctlyinferred to be of type chararray.Dynamic invokersSometimes you want to use a function that is provided by a Java library, but withoutgoing to the effort of writing a UDF. Dynamic invokers allow you to do this by callingJava methods directly from a Pig script.
The trade-off is that method calls are made viareflection, which can impose significant overhead when calls are made for every recordin a large dataset. So for scripts that are run repeatedly, a dedicated UDF is normallypreferred.The following snippet shows how we could define and use a trim UDF that uses theApache Commons Lang StringUtils class:grunt> DEFINE trim InvokeForString('org.apache.commons.lang.StringUtils.trim',>>'String');grunt> B = FOREACH A GENERATE trim(fruit);grunt> DUMP B;(pomegranate)(banana)(apple)(lychee)The InvokeForString invoker is used because the return type of the method is aString.
(There are also InvokeForInt, InvokeForLong, InvokeForDouble, and InvokeForFloat invokers.) The first argument to the invoker constructor is the fully qualifiedmethod to be invoked. The second is a space-separated list of the method argumentclasses.A Load UDFWe’ll demonstrate a custom load function that can read plain-text column ranges asfields, very much like the Unix cut command.8 It is used as follows:grunt> records = LOAD 'input/ncdc/micro/sample.txt'>>USING com.hadoopbook.pig.CutLoadFunc('16-19,88-92,93-93')>>AS (year:int, temperature:int, quality:int);grunt> DUMP records;(1950,0,1)(1950,22,1)8. There is a more fully featured UDF for doing the same thing in the Piggy Bank called FixedWidthLoader.User-Defined Functions|453(1950,-11,1)(1949,111,1)(1949,78,1)The string passed to CutLoadFunc is the column specification; each comma-separatedrange defines a field, which is assigned a name and type in the AS clause.
Let’s examinethe implementation of CutLoadFunc, shown in Example 16-3.Example 16-3. A LoadFunc UDF to load tuple fields as column rangespublic class CutLoadFunc extends LoadFunc {private static final Log LOG = LogFactory.getLog(CutLoadFunc.class);private final List<Range> ranges;private final TupleFactory tupleFactory = TupleFactory.getInstance();private RecordReader reader;public CutLoadFunc(String cutPattern) {ranges = Range.parse(cutPattern);}@Overridepublic void setLocation(String location, Job job)throws IOException {FileInputFormat.setInputPaths(job, location);}@Overridepublic InputFormat getInputFormat() {return new TextInputFormat();}@Overridepublic void prepareToRead(RecordReader reader, PigSplit split) {this.reader = reader;}@Overridepublic Tuple getNext() throws IOException {try {if (!reader.nextKeyValue()) {return null;}Text value = (Text) reader.getCurrentValue();String line = value.toString();Tuple tuple = tupleFactory.newTuple(ranges.size());for (int i = 0; i < ranges.size(); i++) {Range range = ranges.get(i);if (range.getEnd() > line.length()) {LOG.warn(String.format("Range end (%s) is longer than line length (%s)",range.getEnd(), line.length()));454|Chapter 16: Pigcontinue;}tuple.set(i, new DataByteArray(range.getSubstring(line)));}return tuple;} catch (InterruptedException e) {throw new ExecException(e);}}}In Pig, like in Hadoop, data loading takes place before the mapper runs, so it is importantthat the input can be split into portions that are handled independently by each mapper(see “Input Splits and Records” on page 220 for background).
A LoadFunc will typicallyuse an existing underlying Hadoop InputFormat to create records, with the LoadFuncproviding the logic for turning the records into Pig tuples.CutLoadFunc is constructed with a string that specifies the column ranges to use foreach field. The logic for parsing this string and creating a list of internal Range objectsthat encapsulates these ranges is contained in the Range class, and is not shown here (itis available in the example code that accompanies this book).Pig calls setLocation() on a LoadFunc to pass the input location to the loader. SinceCutLoadFunc uses a TextInputFormat to break the input into lines, we just pass thelocation to set the input path using a static method on FileInputFormat.Pig uses the new MapReduce API, so we use the input and outputformats and associated classes from the org.apache.hadoop.mapreduce package.Next, Pig calls the getInputFormat() method to create a RecordReader for each split,just like in MapReduce.
Pig passes each RecordReader to the prepareToRead() methodof CutLoadFunc, which we store a reference to, so we can use it in the getNext() methodfor iterating through the records.The Pig runtime calls getNext() repeatedly, and the load function reads tuples fromthe reader until the reader reaches the last record in its split. At this point, it returnsnull to signal that there are no more tuples to be read.It is the responsibility of the getNext() implementation to turn lines of the input fileinto Tuple objects.