Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 58
Текст из файла (страница 58)
For thecombiner, we reuse MaxTemperatureReducer (from Chapters 2 and 6) to pick themaximum temperature for any given group of map outputs on the map side. The reducer(MaxTemperatureReducerWithStationLookup) is different from the combiner, since inaddition to finding the maximum temperature, it uses the cache file to look up the stationname.We use the reducer’s setup() method to retrieve the cache file using its original name,relative to the working directory of the task.You can use the distributed cache for copying files that do not fit inmemory. Hadoop map files are very useful in this regard, since theyserve as an on-disk lookup format (see “MapFile” on page 135).
Be‐cause map files are collections of files with a defined directory struc‐ture, you should put them into an archive format (JAR, ZIP, tar, orgzipped tar) and add them to the cache using the -archives option.Here’s a snippet of the output, showing some maximum temperatures for a few weatherstations:PEATS RIDGE WARATAHSTRATHALBYN RACECOU276|Chapter 9: MapReduce Features372410SHEOAKS AWSWANGARATTA AEROMOOGARAMACKAY AERO399409334331How it worksWhen you launch a job, Hadoop copies the files specified by the -files, -archives,and -libjars options to the distributed filesystem (normally HDFS). Then, before atask is run, the node manager copies the files from the distributed filesystem to a localdisk—the cache—so the task can access the files.
The files are said to be localized at thispoint. From the task’s point of view, the files are just there, symbolically linked from thetask’s working directory. In addition, files specified by -libjars are added to the task’sclasspath before it is launched.The node manager also maintains a reference count for the number of tasks using eachfile in the cache. Before the task has run, the file’s reference count is incremented by 1;then, after the task has run, the count is decreased by 1. Only when the file is not beingused (when the count reaches zero) is it eligible for deletion. Files are deleted to makeroom for a new file when the node’s cache exceeds a certain size—10 GB by default—using a least-recently used policy.
The cache size may be changed by setting the con‐figuration property yarn.nodemanager.localizer.cache.target-size-mb.Although this design doesn’t guarantee that subsequent tasks from the same job runningon the same node will find the file they need in the cache, it is very likely that they will:tasks from a job are usually scheduled to run at around the same time, so there isn’t theopportunity for enough other jobs to run to cause the original task’s file to be deletedfrom the cache.The distributed cache APIMost applications don’t need to use the distributed cache API, because they can use thecache via GenericOptionsParser, as we saw in Example 9-13. However, if GenericOptionsParser is not being used, then the API in Job can be used to put objects intothe distributed cache.6 Here are the pertinent methods in Job:publicpublicpublicpublicpublicpublicvoidvoidvoidvoidvoidvoidaddCacheFile(URI uri)addCacheArchive(URI uri)setCacheFiles(URI[] files)setCacheArchives(URI[] archives)addFileToClassPath(Path file)addArchiveToClassPath(Path archive)6.
If you are using the old MapReduce API, the same methods can be found in org.apache.hadoop.filecache.DistributedCache.Side Data Distribution|277Recall that there are two types of objects that can be placed in the cache: files and ar‐chives. Files are left intact on the task node, whereas archives are unarchived on the tasknode. For each type of object, there are three methods: an addCacheXXXX() method toadd the file or archive to the distributed cache, a setCacheXXXXs() method to set theentire list of files or archives to be added to the cache in a single call (replacing thoseset in any previous calls), and an addXXXXToClassPath() method to add the file orarchive to the MapReduce task’s classpath.
Table 9-7 compares these API methods tothe GenericOptionsParser options described in Table 6-1.Table 9-7. Distributed cache APIJob API methodGenericOptionsParserequivalentDescriptionaddCacheFile(URI uri)setCacheFiles(URI[] files)-filesfile1,file2,...Add files to the distributed cacheto be copied to the task node.addCacheArchive(URI uri)setCacheArchives(URI[] files)-archivesarchive1,archive2,...Add archives to the distributedcache to be copied to the tasknode and unarchived there.addFileToClassPath(Path file)-libjarsjar1,jar2,...Add files to the distributed cacheto be added to the MapReducetask’s classpath.
The files are notunarchived, so this is a useful wayto add JAR files to the classpath.addArchiveToClassPath(Path archive) NoneAdd archives to the distributedcache to be unarchived and addedto the MapReduce task’s classpath.This can be useful when you wantto add a directory of files to theclasspath, since you can create anarchive containing the files.Alternatively, you could create aJAR file and useaddFileToClassPath(),which works equally well.The URIs referenced in the add or set methods must be files in ashared filesystem that exist when the job is run. On the other hand,the filenames specified as a GenericOptionsParser option (e.g., files) may refer to local files, in which case they get copied to thedefault shared filesystem (normally HDFS) on your behalf.This is the key difference between using the Java API directly andusing GenericOptionsParser: the Java API does not copy the filespecified in the add or set method to the shared filesystem, whereasthe GenericOptionsParser does.278|Chapter 9: MapReduce FeaturesRetrieving distributed cache files from the task works in the same way as before: youaccess the localized file directly by name, as we did in Example 9-13.
This works becauseMapReduce will always create a symbolic link from the task’s working directory to everyfile or archive added to the distributed cache.7 Archives are unarchived so you can accessthe files in them using the nested path.MapReduce Library ClassesHadoop comes with a library of mappers and reducers for commonly used functions.They are listed with brief descriptions in Table 9-8. For further information on how touse them, consult their Java documentation.Table 9-8. MapReduce library classesClassesDescriptionChainMapper, ChainReducerRun a chain of mappers in a single mapper and a reducer followedby a chain of mappers in a single reducer, respectively.(Symbolically, M+RM*, where M is a mapper and R is a reducer.)This can substantially reduce the amount of disk I/O incurredcompared to running multiple MapReduce jobs.FieldSelectionMapReduce (old API):FieldSelectionMapper and FieldSelectionReducer (new API)A mapper and reducer that can select fields (like the Unix cutcommand) from the input keys and values and emit them asoutput keys and values.IntSumReducer, LongSumReducerReducers that sum integer values to produce a total for every key.InverseMapperA mapper that swaps keys and values.MultithreadedMapRunner (old API), Multi A mapper (or map runner in the old API) that runs mappersconcurrently in separate threads.
Useful for mappers that are notthreadedMapper (new API)CPU-bound.TokenCounterMapperA mapper that tokenizes the input value into words (using Java’sStringTokenizer) and emits each word along with a count of1.RegexMapperA mapper that finds matches of a regular expression in the inputvalue and emits the matches along with a count of 1.7. In Hadoop 1, localized files were not always symlinked, so it was sometimes necessary to retrieve localizedfile paths using methods on JobContext. This limitation was removed in Hadoop 2.MapReduce Library Classes|279PART IIIHadoop OperationsCHAPTER 10Setting Up a Hadoop ClusterThis chapter explains how to set up Hadoop to run on a cluster of machines.
RunningHDFS, MapReduce, and YARN on a single machine is great for learning about thesesystems, but to do useful work, they need to run on multiple nodes.There are a few options when it comes to getting a Hadoop cluster, from building yourown, to running on rented hardware or using an offering that provides Hadoop as ahosted service in the cloud. The number of hosted options is too large to list here, buteven if you choose to build a Hadoop cluster yourself, there are still a number of in‐stallation options:Apache tarballsThe Apache Hadoop project and related projects provide binary (and source) tar‐balls for each release.
Installation from binary tarballs gives you the most flexibilitybut entails the most amount of work, since you need to decide on where the in‐stallation files, configuration files, and logfiles are located on the filesystem, set theirfile permissions correctly, and so on.PackagesRPM and Debian packages are available from the Apache Bigtop project, as well asfrom all the Hadoop vendors. Packages bring a number of advantages over tarballs:they provide a consistent filesystem layout, they are tested together as a stack (soyou know that the versions of Hadoop and Hive, say, will work together), and theywork well with configuration management tools like Puppet.Hadoop cluster management toolsCloudera Manager and Apache Ambari are examples of dedicated tools for instal‐ling and managing a Hadoop cluster over its whole lifecycle. They provide a simpleweb UI, and are the recommended way to set up a Hadoop cluster for most usersand operators.