Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 40
Текст из файла (страница 40)
Here are the contents of the properties file:nameNode=hdfs://localhost:8020resourceManager=localhost:8032oozie.wf.application.path=${nameNode}/user/${user.name}/max-temp-workflowTo get information about the status of the workflow job, we use the -info option, spec‐ifying the job ID that was printed by the run command earlier (type oozie job to geta list of all jobs):% oozie job -info 0000001-140911033236814-oozie-oozi-WThe output shows the status: RUNNING, KILLED, or SUCCEEDED. You can also find all thisinformation via Oozie’s web UI (http://localhost:11000/oozie).MapReduce Workflows|183When the job has succeeded, we can inspect the results in the usual way:% hadoop fs -cat output/part-*1949 1111950 22This example only scratched the surface of writing Oozie workflows.
The documenta‐tion on Oozie’s website has information about creating more complex workflows, aswell as writing and running coordinator jobs.184|Chapter 6: Developing a MapReduce ApplicationCHAPTER 7How MapReduce WorksIn this chapter, we look at how MapReduce in Hadoop works in detail. This knowledgeprovides a good foundation for writing more advanced MapReduce programs, whichwe will cover in the following two chapters.Anatomy of a MapReduce Job RunYou can run a MapReduce job with a single method call: submit() on a Job object (youcan also call waitForCompletion(), which submits the job if it hasn’t been submittedalready, then waits for it to finish).1 This method call conceals a great deal of processingbehind the scenes.
This section uncovers the steps Hadoop takes to run a job.The whole process is illustrated in Figure 7-1. At the highest level, there are five inde‐pendent entities:2• The client, which submits the MapReduce job.• The YARN resource manager, which coordinates the allocation of compute re‐sources on the cluster.• The YARN node managers, which launch and monitor the compute containers onmachines in the cluster.• The MapReduce application master, which coordinates the tasks running the Map‐Reduce job. The application master and the MapReduce tasks run in containers thatare scheduled by the resource manager and managed by the node managers.1.
In the old MapReduce API, you can call JobClient.submitJob(conf) or JobClient.runJob(conf).2. Not discussed in this section are the job history server daemon (for retaining job history data) and the shufflehandler auxiliary service (for serving map outputs to reduce tasks).185• The distributed filesystem (normally HDFS, covered in Chapter 3), which is usedfor sharing job files between the other entities.Figure 7-1. How Hadoop runs a MapReduce jobJob SubmissionThe submit() method on Job creates an internal JobSubmitter instance and callssubmitJobInternal() on it (step 1 in Figure 7-1). Having submitted the job, waitForCompletion() polls the job’s progress once per second and reports the progress to theconsole if it has changed since the last report. When the job completes successfully, thejob counters are displayed.
Otherwise, the error that caused the job to fail is logged tothe console.The job submission process implemented by JobSubmitter does the following:186|Chapter 7: How MapReduce Works• Asks the resource manager for a new application ID, used for the MapReduce jobID (step 2).• Checks the output specification of the job. For example, if the output directory hasnot been specified or it already exists, the job is not submitted and an error is thrownto the MapReduce program.• Computes the input splits for the job. If the splits cannot be computed (because theinput paths don’t exist, for example), the job is not submitted and an error is thrownto the MapReduce program.• Copies the resources needed to run the job, including the job JAR file, the config‐uration file, and the computed input splits, to the shared filesystem in a directorynamed after the job ID (step 3).
The job JAR is copied with a high replication factor(controlled by the mapreduce.client.submit.file.replication property, whichdefaults to 10) so that there are lots of copies across the cluster for the node managersto access when they run tasks for the job.• Submits the job by calling submitApplication() on the resource manager(step 4).Job InitializationWhen the resource manager receives a call to its submitApplication() method, ithands off the request to the YARN scheduler. The scheduler allocates a container, andthe resource manager then launches the application master’s process there, under thenode manager’s management (steps 5a and 5b).The application master for MapReduce jobs is a Java application whose main class isMRAppMaster.
It initializes the job by creating a number of bookkeeping objects to keeptrack of the job’s progress, as it will receive progress and completion reports from thetasks (step 6). Next, it retrieves the input splits computed in the client from the sharedfilesystem (step 7). It then creates a map task object for each split, as well as a numberof reduce task objects determined by the mapreduce.job.reduces property (set by thesetNumReduceTasks() method on Job). Tasks are given IDs at this point.The application master must decide how to run the tasks that make up the MapReducejob. If the job is small, the application master may choose to run the tasks in the sameJVM as itself.
This happens when it judges that the overhead of allocating and runningtasks in new containers outweighs the gain to be had in running them in parallel, com‐pared to running them sequentially on one node. Such a job is said to be uberized, orrun as an uber task.What qualifies as a small job? By default, a small job is one that has less than 10 mappers,only one reducer, and an input size that is less than the size of one HDFS block. (NotethatthesevaluesmaybechangedforajobbysettingAnatomy of a MapReduce Job Run|187mapreduce.job.ubertask.maxmaps, mapreduce.job.ubertask.maxreduces, and mapreduce.job.ubertask.maxbytes.) Uber tasks must be enabled explicitly (for an indi‐vidual job, or across the cluster) by setting mapreduce.job.ubertask.enable to true.Finally, before any tasks can be run, the application master calls the setupJob() methodon the OutputCommitter.
For FileOutputCommitter, which is the default, it will createthe final output directory for the job and the temporary working space for the taskoutput. The commit protocol is described in more detail in “Output Committers” onpage 206.Task AssignmentIf the job does not qualify for running as an uber task, then the application masterrequests containers for all the map and reduce tasks in the job from the resource manager(step 8). Requests for map tasks are made first and with a higher priority than those forreduce tasks, since all the map tasks must complete before the sort phase of the reducecan start (see “Shuffle and Sort” on page 197).
Requests for reduce tasks are not madeuntil 5% of map tasks have completed (see “Reduce slow start” on page 308).Reduce tasks can run anywhere in the cluster, but requests for map tasks have datalocality constraints that the scheduler tries to honor (see “Resource Requests” on page81). In the optimal case, the task is data local—that is, running on the same node thatthe split resides on.
Alternatively, the task may be rack local: on the same rack, but notthe same node, as the split. Some tasks are neither data local nor rack local and retrievetheir data from a different rack than the one they are running on. For a particular jobrun, you can determine the number of tasks that ran at each locality level by looking atthe job’s counters (see Table 9-6).Requests also specify memory requirements and CPUs for tasks. By default, each mapand reduce task is allocated 1,024 MB of memory and one virtual core.
The values areconfigurable on a per-job basis (subject to minimum and maximum values describedin “Memory settings in YARN and MapReduce” on page 301) via the following properties:mapreduce.map.memory.mb, mapreduce.reduce.memory.mb, mapreduce.map.cpu.vcores and mapreduce.reduce.cpu.vcores.188|Chapter 7: How MapReduce WorksTask ExecutionOnce a task has been assigned resources for a container on a particular node by theresource manager’s scheduler, the application master starts the container by contactingthe node manager (steps 9a and 9b). The task is executed by a Java application whosemain class is YarnChild.
Before it can run the task, it localizes the resources that thetask needs, including the job configuration and JAR file, and any files from the dis‐tributed cache (step 10; see “Distributed Cache” on page 274). Finally, it runs the map orreduce task (step 11).The YarnChild runs in a dedicated JVM, so that any bugs in the user-defined map andreduce functions (or even in YarnChild) don’t affect the node manager—by causing itto crash or hang, for example.Each task can perform setup and commit actions, which are run in the same JVM asthe task itself and are determined by the OutputCommitter for the job (see “OutputCommitters” on page 206). For file-based jobs, the commit action moves the task outputfrom a temporary location to its final location.
The commit protocol ensures that whenspeculative execution is enabled (see “Speculative Execution” on page 204), only one ofthe duplicate tasks is committed and the other is aborted.StreamingStreaming runs special map and reduce tasks for the purpose of launching the usersupplied executable and communicating with it (Figure 7-2).The Streaming task communicates with the process (which may be written in any lan‐guage) using standard input and output streams.