Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 78
Текст из файла (страница 78)
It uses the Avro Generic API for itsin-memory data model.Example 13-1. MapReduce program to convert text files to Parquet files usingAvroParquetOutputFormatpublic class TextToParquetWithAvro extends Configured implements Tool {private static final Schema SCHEMA = new Schema.Parser().parse("{\n" +" \"type\": \"record\",\n" +" \"name\": \"Line\",\n" +" \"fields\": [\n" +"{\"name\": \"offset\", \"type\": \"long\"},\n" +"{\"name\": \"line\", \"type\": \"string\"}\n" +" ]\n" +"}");Parquet MapReduce|377public static class TextToParquetMapperextends Mapper<LongWritable, Text, Void, GenericRecord> {private GenericRecord record = new GenericData.Record(SCHEMA);@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {record.put("offset", key.get());record.put("line", value.toString());context.write(null, record);}}@Overridepublic int run(String[] args) throws Exception {if (args.length != 2) {System.err.printf("Usage: %s [generic options] <input> <output>\n",getClass().getSimpleName());ToolRunner.printGenericCommandUsage(System.err);return -1;}Job job = new Job(getConf(), "Text to Parquet");job.setJarByClass(getClass());FileInputFormat.addInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setMapperClass(TextToParquetMapper.class);job.setNumReduceTasks(0);job.setOutputFormatClass(AvroParquetOutputFormat.class);AvroParquetOutputFormat.setSchema(job, SCHEMA);job.setOutputKeyClass(Void.class);job.setOutputValueClass(Group.class);return job.waitForCompletion(true) ? 0 : 1;}public static void main(String[] args) throws Exception {int exitCode = ToolRunner.run(new TextToParquetWithAvro(), args);System.exit(exitCode);}}The job’s output format is set to AvroParquetOutputFormat, and the output key andvalue types are set to Void and GenericRecord to match, since we are using Avro’sGeneric API.
Void simply means that the key is always set to null.378| Chapter 13: ParquetLike AvroParquetWriter from the previous section, AvroParquetOutputFormat con‐verts the Avro schema to a Parquet schema automatically. The Avro schema is set onthe Job instance so that the MapReduce tasks can find the schema when writing thefiles.The mapper is straightforward; it takes the file offset (key) and line (value) and buildsan Avro GenericRecord object with them, which it writes out to the MapReduce contextobject as the value (the key is always null).
AvroParquetOutputFormat takes care of theconversion of the Avro GenericRecord to the Parquet file format encoding.Parquet is a columnar format, so it buffers rows in memory. Eventhough the mapper in this example just passes values through, itmust have sufficient memory for the Parquet writer to buffer eachblock (row group), which is by default 128 MB. If you get job fail‐ures due to out of memory errors, you can adjust the Parquet fileblock size for the writer with parquet.block.size (see Table 13-3).You may also need to change the MapReduce task memory alloca‐tion (when reading or writing) using the settings discussed in“Memory settings in YARN and MapReduce” on page 301.The following command runs the program on the four-line text file quangle.txt:% hadoop jar parquet-examples.jar TextToParquetWithAvro \input/docs/quangle.txt outputWe can use the Parquet command-line tools to dump the output Parquet file forinspection:% parquet-tools dump output/part-m-00000.parquetINT64 offset-------------------------------------------------------------------------------*** row group 1 of 1, values 1 to 4 ***value 1: R:0 D:0 V:0value 2: R:0 D:0 V:33value 3: R:0 D:0 V:57value 4: R:0 D:0 V:89BINARY line-------------------------------------------------------------------------------*** row group 1 of 1, values 1 to 4 ***value 1: R:0 D:0 V:On the top of the Crumpetty Treevalue 2: R:0 D:0 V:The Quangle Wangle sat,value 3: R:0 D:0 V:But his face you could not see,value 4: R:0 D:0 V:On account of his Beaver Hat.Notice how the values within a row group are shown together.
V indicates the value, Rthe repetition level, and D the definition level. For this schema, the latter two are zerosince there is no nesting.Parquet MapReduce|379CHAPTER 14FlumeHadoop is built for processing very large datasets. Often it is assumed that the data isalready in HDFS, or can be copied there in bulk. However, there are many systems thatdon’t meet this assumption. They produce streams of data that we would like to aggre‐gate, store, and analyze using Hadoop—and these are the systems that Apache Flumeis an ideal fit for.Flume is designed for high-volume ingestion into Hadoop of event-based data.
Thecanonical example is using Flume to collect logfiles from a bank of web servers, thenmoving the log events from those files into new aggregated files in HDFS for processing.The usual destination (or sink in Flume parlance) is HDFS. However, Flume is flexibleenough to write to other systems, like HBase or Solr.To use Flume, we need to run a Flume agent, which is a long-lived Java process that runssources and sinks, connected by channels.
A source in Flume produces events and de‐livers them to the channel, which stores the events until they are forwarded to the sink.You can think of the source-channel-sink combination as a basic Flume building block.A Flume installation is made up of a collection of connected agents running in a dis‐tributed topology.
Agents on the edge of the system (co-located on web server machines,for example) collect data and forward it to agents that are responsible for aggregatingand then storing the data in its final destination. Agents are configured to run a collec‐tion of particular sources and sinks, so using Flume is mainly a configuration exercisein wiring the pieces together. In this chapter, we’ll see how to build Flume topologiesfor data ingestion that you can use as a part of your own Hadoop pipeline.Installing FlumeDownload a stable release of the Flume binary distribution from the download page,and unpack the tarball in a suitable location:381% tar xzf apache-flume-x.y.z-bin.tar.gzIt’s useful to put the Flume binary on your path:% export FLUME_HOME=~/sw/apache-flume-x.y.z-bin% export PATH=$PATH:$FLUME_HOME/binA Flume agent can then be started with the flume-ng command, as we’ll see next.An ExampleTo show how Flume works, let’s start with a setup that:1.
Watches a local directory for new text files2. Sends each line of each file to the console as files are addedWe’ll add the files by hand, but it’s easy to imagine a process like a web server creatingnew files that we want to continuously ingest with Flume. Also, in a real system, ratherthan just logging the file contents we would write the contents to HDFS for subsequentprocessing—we’ll see how to do that later in the chapter.In this example, the Flume agent runs a single source-channel-sink, configured usinga Java properties file. The configuration controls the types of sources, sinks, and channelsthat are used, as well as how they are connected together. For this example, we’ll use theconfiguration in Example 14-1.Example 14-1. Flume configuration using a spooling directory source and a logger sinkagent1.sources = source1agent1.sinks = sink1agent1.channels = channel1agent1.sources.source1.channels = channel1agent1.sinks.sink1.channel = channel1agent1.sources.source1.type = spooldiragent1.sources.source1.spoolDir = /tmp/spooldiragent1.sinks.sink1.type = loggeragent1.channels.channel1.type = fileProperty names form a hierarchy with the agent name at the top level.
In this example,we have a single agent, called agent1. The names for the different components in anagent are defined at the next level, so for example agent1.sources lists the names ofthe sources that should be run in agent1 (here it is a single source, source1). Similarly,agent1 has a sink (sink1) and a channel (channel1).382|Chapter 14: FlumeThe properties for each component are defined at the next level of the hierarchy. Theconfiguration properties that are available for a component depend on the type of thecomponent.