Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 19
Текст из файла (страница 19)
Thisaction flushes all the remaining packets to the datanode pipeline and waits for ac‐knowledgments before contacting the namenode to signal that the file is complete (step7). The namenode already knows which blocks the file is made up of (because DataStreamer asks for block allocations), so it only has to wait for blocks to be minimallyreplicated before returning successfully.Replica PlacementHow does the namenode choose which datanodes to store replicas on? There’s a tradeoff between reliability and write bandwidth and read bandwidth here.
For example,placing all replicas on a single node incurs the lowest write bandwidth penalty (sincethe replication pipeline runs on a single node), but this offers no real redundancy (if thenode fails, the data for that block is lost). Also, the read bandwidth is high for off-rackreads. At the other extreme, placing replicas in different data centers may maximizeredundancy, but at the cost of bandwidth.
Even in the same data center (which is whatall Hadoop clusters to date have run in), there are a variety of possible placementstrategies.Data Flow|73Hadoop’s default strategy is to place the first replica on the same node as the client (forclients running outside the cluster, a node is chosen at random, although the systemtries not to pick nodes that are too full or too busy). The second replica is placed on adifferent rack from the first (off-rack), chosen at random.
The third replica is placed onthe same rack as the second, but on a different node chosen at random. Further replicasare placed on random nodes in the cluster, although the system tries to avoid placingtoo many replicas on the same rack.Once the replica locations have been chosen, a pipeline is built, taking network topologyinto account. For a replication factor of 3, the pipeline might look like Figure 3-5.Figure 3-5. A typical replica pipelineOverall, this strategy gives a good balance among reliability (blocks are stored on tworacks), write bandwidth (writes only have to traverse a single network switch), readperformance (there’s a choice of two racks to read from), and block distribution acrossthe cluster (clients only write a single block on the local rack).Coherency ModelA coherency model for a filesystem describes the data visibility of reads and writes fora file.
HDFS trades off some POSIX requirements for performance, so some operationsmay behave differently than you expect them to.After creating a file, it is visible in the filesystem namespace, as expected:Path p = new Path("p");fs.create(p);assertThat(fs.exists(p), is(true));74|Chapter 3: The Hadoop Distributed FilesystemHowever, any content written to the file is not guaranteed to be visible, even if the streamis flushed. So, the file appears to have a length of zero:Path p = new Path("p");OutputStream out = fs.create(p);out.write("content".getBytes("UTF-8"));out.flush();assertThat(fs.getFileStatus(p).getLen(), is(0L));Once more than a block’s worth of data has been written, the first block will be visibleto new readers. This is true of subsequent blocks, too: it is always the current block beingwritten that is not visible to other readers.HDFS provides a way to force all buffers to be flushed to the datanodes via the hflush()method on FSDataOutputStream.
After a successful return from hflush(), HDFS guar‐antees that the data written up to that point in the file has reached all the datanodes inthe write pipeline and is visible to all new readers:Path p = new Path("p");FSDataOutputStream out = fs.create(p);out.write("content".getBytes("UTF-8"));out.hflush();assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));Note that hflush() does not guarantee that the datanodes have written the data to disk,only that it’s in the datanodes’ memory (so in the event of a data center power outage,for example, data could be lost). For this stronger guarantee, use hsync() instead.9The behavior of hsync() is similar to that of the fsync() system call in POSIX thatcommits buffered data for a file descriptor. For example, using the standard Java APIto write a local file, we are guaranteed to see the content after flushing the stream andsynchronizing:FileOutputStream out = new FileOutputStream(localFile);out.write("content".getBytes("UTF-8"));out.flush(); // flush to operating systemout.getFD().sync(); // sync to diskassertThat(localFile.length(), is(((long) "content".length())));Closing a file in HDFS performs an implicit hflush(), too:Path p = new Path("p");OutputStream out = fs.create(p);out.write("content".getBytes("UTF-8"));out.close();assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));9.
In Hadoop 1.x, hflush() was called sync(), and hsync() did not exist.Data Flow|75Consequences for application designThis coherency model has implications for the way you design applications. With nocalls to hflush() or hsync(), you should be prepared to lose up to a block of data inthe event of client or system failure.
For many applications, this is unacceptable, so youshould call hflush() at suitable points, such as after writing a certain number of recordsor number of bytes. Though the hflush() operation is designed to not unduly tax HDFS,it does have some overhead (and hsync() has more), so there is a trade-off betweendata robustness and throughput. What constitutes an acceptable trade-off is applicationdependent, and suitable values can be selected after measuring your application’s per‐formance with different hflush() (or hsync()) frequencies.Parallel Copying with distcpThe HDFS access patterns that we have seen so far focus on single-threaded access. It’spossible to act on a collection of files—by specifying file globs, for example—but forefficient parallel processing of these files, you would have to write a program yourself.Hadoop comes with a useful program called distcp for copying data to and from Hadoopfilesystems in parallel.One use for distcp is as an efficient replacement for hadoop fs -cp.
For example, youcan copy one file to another with:10% hadoop distcp file1 file2You can also copy directories:% hadoop distcp dir1 dir2If dir2 does not exist, it will be created, and the contents of the dir1 directory will becopied there. You can specify multiple source paths, and all will be copied to thedestination.If dir2 already exists, then dir1 will be copied under it, creating the directory structuredir2/dir1. If this isn’t what you want, you can supply the -overwrite option to keep thesame directory structure and force files to be overwritten. You can also update only thefiles that have changed using the -update option.
This is best shown with an example.If we changed a file in the dir1 subtree, we could synchronize the change with dir2 byrunning:% hadoop distcp -update dir1 dir210. Even for a single file copy, the distcp variant is preferred for large files since hadoop fs -cp copies the filevia the client running the command.76|Chapter 3: The Hadoop Distributed FilesystemIf you are unsure of the effect of a distcp operation, it is a good ideato try it out on a small test directory tree first.distcp is implemented as a MapReduce job where the work of copying is done by themaps that run in parallel across the cluster.
There are no reducers. Each file is copiedby a single map, and distcp tries to give each map approximately the same amount ofdata by bucketing files into roughly equal allocations. By default, up to 20 maps are used,but this can be changed by specifying the -m argument to distcp.A very common use case for distcp is for transferring data between two HDFS clusters.For example, the following creates a backup of the first cluster’s /foo directory on thesecond:% hadoop distcp -update -delete -p hdfs://namenode1/foo hdfs://namenode2/fooThe -delete flag causes distcp to delete any files or directories from the destination thatare not present in the source, and -p means that file status attributes like permissions,block size, and replication are preserved.
You can run distcp with no arguments to seeprecise usage instructions.If the two clusters are running incompatible versions of HDFS, then you can use thewebhdfs protocol to distcp between them:% hadoop distcp webhdfs://namenode1:50070/foo webhdfs://namenode2:50070/fooAnother variant is to use an HttpFs proxy as the distcp source or destination (againusing the webhdfs protocol), which has the advantage of being able to set firewall andbandwidth controls (see “HTTP” on page 54).Keeping an HDFS Cluster BalancedWhen copying data into HDFS, it’s important to consider cluster balance.
HDFS worksbest when the file blocks are evenly spread across the cluster, so you want to ensure thatdistcp doesn’t disrupt this. For example, if you specified -m 1, a single map would dothe copy, which—apart from being slow and not using the cluster resources efficiently—would mean that the first replica of each block would reside on the node running themap (until the disk filled up). The second and third replicas would be spread across thecluster, but this one node would be unbalanced. By having more maps than nodes inthe cluster, this problem is avoided. For this reason, it’s best to start by running distcpwith the default of 20 maps per node.Parallel Copying with distcp|77However, it’s not always possible to prevent a cluster from becoming unbalanced. Per‐haps you want to limit the number of maps so that some of the nodes can be used byother jobs. In this case, you can use the balancer tool (see “Balancer” on page 329) tosubsequently even out the block distribution across the cluster.78|Chapter 3: The Hadoop Distributed FilesystemCHAPTER 4YARNApache YARN (Yet Another Resource Negotiator) is Hadoop’s cluster resource man‐agement system.