Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 80
Текст из файла (страница 80)
Events don’t have this header by default, but it can be added using a Flumeinterceptor. Interceptors are components that can modify or drop events in the flow;they are attached to sources, and are run on events before the events have been placedin a channel.3 The following extra configuration lines add a timestamp interceptor tosource1, which adds a timestamp header to every event produced by the source:agent1.sources.source1.interceptors = interceptor1agent1.sources.source1.interceptors.interceptor1.type = timestampUsing the timestamp interceptor ensures that the timestamps closely reflect the timesat which the events were created. For some applications, using a timestamp for whenthe event was written to HDFS might be sufficient—although, be aware that when thereare multiple tiers of Flume agents there can be a significant difference between creationtime and write time, especially in the event of agent downtime (see “Distribution: AgentTiers” on page 390).
For these cases, the HDFS sink has a setting, hdfs.useLocalTimeStamp, that will use a timestamp generated by the Flume agent running the HDFSsink.File FormatsIt’s normally a good idea to use a binary format for storing your data in, since theresulting files are smaller than they would be if you used text. For the HDFS sink, thefile format used is controlled using hdfs.fileType and a combination of a few otherproperties.3. Table 14-1 describes the interceptors that Flume provides.The HDFS Sink|387If unspecified, hdfs.fileType defaults to SequenceFile, which will write events to asequence file with LongWritable keys that contain the event timestamp (or the currenttime if the timestamp header is not present) and BytesWritable values that contain theevent body.
It’s possible to use Text Writable values in the sequence file instead ofBytesWritable by setting hdfs.writeFormat to Text.The configuration is a little different for Avro files. The hdfs.fileType property is setto DataStream, just like for plain text. Additionally, serializer (note the lack of anhdfs. prefix) must be set to avro_event. To enable compression, set theserializer.compressionCodec property. Here is an example of an HDFS sink config‐ured to write Snappy-compressed Avro files:agent1.sinks.sink1.type = hdfsagent1.sinks.sink1.hdfs.path = /tmp/flumeagent1.sinks.sink1.hdfs.filePrefix = eventsagent1.sinks.sink1.hdfs.fileSuffix = .avroagent1.sinks.sink1.hdfs.fileType = DataStreamagent1.sinks.sink1.serializer = avro_eventagent1.sinks.sink1.serializer.compressionCodec = snappyAn event is represented as an Avro record with two fields: headers, an Avro map withstring values, and body, an Avro bytes field.If you want to use a custom Avro schema, there are a couple of options.
If you have Avroin-memory objects that you want to send to Flume, then the Log4jAppender is appro‐priate. It allows you to log an Avro Generic, Specific, or Reflect object using a log4jLogger and send it to an Avro source running in a Flume agent (see “Distribution: AgentTiers” on page 390).
In this case, the serializer property for the HDFS sink should beset to org.apache.flume.sink.hdfs.AvroEventSerializer$Builder, and the Avroschema set in the header (see the class documentation).Alternatively, if the events are not originally derived from Avro objects, you can writea custom serializer to convert a Flume event into an Avro object with a custom schema.The helper class AbstractAvroEventSerializer in the org.apache.flume.serialization package is a good starting point.Fan OutFan out is the term for delivering events from one source to multiple channels, so theyreach multiple sinks.
For example, the configuration in Example 14-3 delivers events toboth an HDFS sink (sink1a via channel1a) and a logger sink (sink1b via channel1b).Example 14-3. Flume configuration using a spooling directory source, fanning out to anHDFS sink and a logger sinkagent1.sources = source1agent1.sinks = sink1a sink1bagent1.channels = channel1a channel1b388| Chapter 14: Flumeagent1.sources.source1.channels = channel1a channel1bagent1.sinks.sink1a.channel = channel1aagent1.sinks.sink1b.channel = channel1bagent1.sources.source1.type = spooldiragent1.sources.source1.spoolDir = /tmp/spooldiragent1.sinks.sink1a.type = hdfsagent1.sinks.sink1a.hdfs.path = /tmp/flumeagent1.sinks.sink1a.hdfs.filePrefix = eventsagent1.sinks.sink1a.hdfs.fileSuffix = .logagent1.sinks.sink1a.hdfs.fileType = DataStreamagent1.sinks.sink1b.type = loggeragent1.channels.channel1a.type = fileagent1.channels.channel1b.type = memoryThe key change here is that the source is configured to deliver to multiple channels bysetting agent1.sources.source1.channels to a space-separated list of channel names,channel1a and channel1b.
This time, the channel feeding the logger sink (channel1b)is a memory channel, since we are logging events for debugging purposes and don’tmind losing events on agent restart. Also, each channel is configured to feed one sink,just like in the previous examples. The flow is illustrated in Figure 14-2.Figure 14-2.
Flume agent with a spooling directory source and fanning out to an HDFSsink and a logger sinkDelivery GuaranteesFlume uses a separate transaction to deliver each batch of events from the spoolingdirectory source to each channel. In this example, one transaction will be used to deliverto the channel feeding the HDFS sink, and then another transaction will be used todeliver the same batch of events to the channel for the logger sink. If either of theseFan Out|389transactions fails (if a channel is full, for example), then the events will not be removedfrom the source, and will be retried later.In this case, since we don’t mind if some events are not delivered to the logger sink, wecan designate its channel as an optional channel, so that if the transaction associatedwith it fails, this will not cause events to be left in the source and tried again later.
(Notethat if the agent fails before both channel transactions have committed, then the affectedevents will be redelivered after the agent restarts—this is true even if the uncommittedchannels are marked as optional.) To do this, we set the selector.optional propertyon the source, passing it a space-separated list of channels:agent1.sources.source1.selector.optional = channel1bNear-Real-Time IndexingIndexing events for search is a good example of where fan out is used in practice. Asingle source of events is sent to both an HDFS sink (this is the main repository of events,so a required channel is used) and a Solr (or Elasticsearch) sink, to build a search index(using an optional channel).The MorphlineSolrSink extracts fields from Flume events and transforms them into aSolr document (using a Morphline configuration file), which is then loaded into a liveSolr search server.
The process is called near real time since ingested data appears insearch results in a matter of seconds.Replicating and Multiplexing SelectorsIn normal fan-out flow, events are replicated to all channels—but sometimes more se‐lective behavior might be desirable, so that some events are sent to one channel andothers to another. This can be achieved by setting a multiplexing selector on the source,and defining routing rules that map particular event header values to channels. See theFlume User Guide for configuration details.Distribution: Agent TiersHow do we scale a set of Flume agents? If there is one agent running on every nodeproducing raw data, then with the setup described so far, at any particular time each filebeing written to HDFS will consist entirely of the events from one node.
It would bebetter if we could aggregate the events from a group of nodes in a single file, since thiswould result in fewer, larger files (with the concomitant reduction in pressure on HDFS,and more efficient processing in MapReduce; see “Small files and CombineFileInput‐Format” on page 226). Also, if needed, files can be rolled more often since they are being390|Chapter 14: Flumefed by a larger number of nodes, leading to a reduction between the time when an eventis created and when it’s available for analysis.Aggregating Flume events is achieved by having tiers of Flume agents. The first tiercollects events from the original sources (such as web servers) and sends them to asmaller set of agents in the second tier, which aggregate events from the first tier beforewriting them to HDFS (see Figure 14-3).
Further tiers may be warranted for very largenumbers of source nodes.Figure 14-3. Using a second agent tier to aggregate Flume events from the first tierTiers are constructed by using a special sink that sends events over the network, and acorresponding source that receives events. The Avro sink sends events over Avro RPCto an Avro source running in another Flume agent.
There is also a Thrift sink that doesthe same thing using Thrift RPC, and is paired with a Thrift source.4Don’t be confused by the naming: Avro sinks and sources do notprovide the ability to write (or read) Avro files. They are used onlyto distribute events between agent tiers, and to do so they useAvro RPC to communicate (hence the name). If you need to writeevents to Avro files, use the HDFS sink, described in “File For‐mats” on page 387.4.