Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 79
Текст из файла (страница 79)
In this case, agent1.sources.source1.type is set to spooldir, which is aspooling directory source that monitors a spooling directory for new files. The spoolingdirectory source defines a spoolDir property, so for source1 the full key is agent1.sources.source1.spoolDir. The source’s channels are set with agent1.sources.source1.channels.The sink is a logger sink for logging events to the console. It too must be connected tothe channel (with the agent1.sinks.sink1.channel property).1 The channel is a filechannel, which means that events in the channel are persisted to disk for durability. Thesystem is illustrated in Figure 14-1.Figure 14-1. Flume agent with a spooling directory source and a logger sink connectedby a file channelBefore running the example, we need to create the spooling directory on the local file‐system:% mkdir /tmp/spooldirThen we can start the Flume agent using the flume-ng command:% flume-ng agent \--conf-file spool-to-logger.properties \--name agent1 \--conf $FLUME_HOME/conf \-Dflume.root.logger=INFO,consoleThe Flume properties file from Example 14-1 is specified with the --conf-file flag.The agent name must also be passed in with --name (since a Flume properties file can1.
Note that a source has a channels property (plural) but a sink has a channel property (singular). This isbecause a source can feed more than one channel (see “Fan Out” on page 388), but a sink can only be fed byone channel. It’s also possible for a channel to feed multiple sinks. This is covered in “Sink Groups” on page395.An Example|383define several agents, we have to say which one to run).
The --conf flag tells Flumewhere to find its general configuration, such as environment settings.In a new terminal, create a file in the spooling directory. The spooling directory sourceexpects files to be immutable. To prevent partially written files from being read by thesource, we write the full contents to a hidden file. Then, we do an atomic rename so thesource can read it:2% echo "Hello Flume" > /tmp/spooldir/.file1.txt% mv /tmp/spooldir/.file1.txt /tmp/spooldir/file1.txtBack in the agent’s terminal, we see that Flume has detected and processed the file:Preparing to move file /tmp/spooldir/file1.txt to/tmp/spooldir/file1.txt.COMPLETEDEvent: { headers:{} body: 48 65 6C 6C 6F 20 46 6C 75 6D 65Hello Flume }The spooling directory source ingests the file by splitting it into lines and creating aFlume event for each line.
Events have optional headers and a binary body, which is theUTF-8 representation of the line of text. The body is logged by the logger sink in bothhexadecimal and string form. The file we placed in the spooling directory was only oneline long, so only one event was logged in this case. We also see that the file was renamedto file1.txt.COMPLETED by the source, which indicates that Flume has completed pro‐cessing it and won’t process it again.Transactions and ReliabilityFlume uses separate transactions to guarantee delivery from the source to the channeland from the channel to the sink. In the example in the previous section, the spoolingdirectory source creates an event for each line in the file. The source will only mark thefile as completed once the transactions encapsulating the delivery of the events to thechannel have been successfully committed.Similarly, a transaction is used for the delivery of the events from the channel to thesink.
If for some unlikely reason the events could not be logged, the transaction wouldbe rolled back and the events would remain in the channel for later redelivery.The channel we are using is a file channel, which has the property of being durable: oncean event has been written to the channel, it will not be lost, even if the agent restarts.(Flume also provides a memory channel that does not have this property, since eventsare stored in memory. With this channel, events are lost if the agent restarts. Dependingon the application, this might be acceptable. The trade-off is that the memory channelhas higher throughput than the file channel.)2. For a logfile that is continually appended to, you would periodically roll the logfile and move the old file tothe spooling directory for Flume to read it.384|Chapter 14: FlumeThe overall effect is that every event produced by the source will reach the sink.
Themajor caveat here is that every event will reach the sink at least once—that is, duplicatesare possible. Duplicates can be produced in sources or sinks: for example, after an agentrestart, the spooling directory source will redeliver events for an uncompleted file, evenif some or all of them had been committed to the channel before the restart. After arestart, the logger sink will re-log any event that was logged but not committed (whichcould happen if the agent was shut down between these two operations).At-least-once semantics might seem like a limitation, but in practice it is an acceptableperformance trade-off. The stronger semantics of exactly once require a two-phasecommit protocol, which is expensive. This choice is what differentiates Flume (at-leastonce semantics) as a high-volume parallel event ingest system from more traditionalenterprise messaging systems (exactly-once semantics).
With at-least-once semantics,duplicate events can be removed further down the processing pipeline. Usually this takesthe form of an application-specific deduplication job written in MapReduce or Hive.BatchingFor efficiency, Flume tries to process events in batches for each transaction, where pos‐sible, rather than one by one. Batching helps file channel performance in particular,since every transaction results in a local disk write and fsync call.The batch size used is determined by the component in question, and is configurablein many cases. For example, the spooling directory source will read files in batches of100 lines.
(This can be changed by setting the batchSize property.) Similarly, the Avrosink (discussed in “Distribution: Agent Tiers” on page 390) will try to read 100 eventsfrom the channel before sending them over RPC, although it won’t block if fewer areavailable.The HDFS SinkThe point of Flume is to deliver large amounts of data into a Hadoop data store, so let’slook at how to configure a Flume agent to deliver events to an HDFS sink.
The config‐uration in Example 14-2 updates the previous example to use an HDFS sink. The onlytwo settings that are required are the sink’s type (hdfs) and hdfs.path, which specifiesthe directory where files will be placed (if, like here, the filesystem is not specified in thepath, it’s determined in the usual way from Hadoop’s fs.defaultFS property). We’vealso specified a meaningful file prefix and suffix, and instructed Flume to write eventsto the files in text format.Example 14-2. Flume configuration using a spooling directory source and an HDFSsinkagent1.sources = source1agent1.sinks = sink1The HDFS Sink|385agent1.channels = channel1agent1.sources.source1.channels = channel1agent1.sinks.sink1.channel = channel1agent1.sources.source1.type = spooldiragent1.sources.source1.spoolDir = /tmp/spooldiragent1.sinks.sink1.type = hdfsagent1.sinks.sink1.hdfs.path = /tmp/flumeagent1.sinks.sink1.hdfs.filePrefix = eventsagent1.sinks.sink1.hdfs.fileSuffix = .logagent1.sinks.sink1.hdfs.inUsePrefix = _agent1.sinks.sink1.hdfs.fileType = DataStreamagent1.channels.channel1.type = fileRestart the agent to use the spool-to-hdfs.properties configuration, and create a new filein the spooling directory:% echo -e "Hello\nAgain" > /tmp/spooldir/.file2.txt% mv /tmp/spooldir/.file2.txt /tmp/spooldir/file2.txtEvents will now be delivered to the HDFS sink and written to a file.
Files in the processof being written to have a .tmp in-use suffix added to their name to indicate that theyare not yet complete. In this example, we have also set hdfs.inUsePrefix to be _(underscore; by default it is empty), which causes files in the process of being writtento have that prefix added to their names. This is useful since MapReduce will ignorefiles that have a _ prefix. So, a typical temporary filename would be _events.1399295780136.log.tmp; the number is a timestamp generated by the HDFS sink.A file is kept open by the HDFS sink until it has either been open for a given time (default30 seconds, controlled by the hdfs.rollInterval property), has reached a given size(default 1,024 bytes, set by hdfs.rollSize), or has had a given number of events writtento it (default 10, set by hdfs.rollCount).
If any of these criteria are met, the file is closedand its in-use prefix and suffix are removed. New events are written to a new file (whichwill have an in-use prefix and suffix until it is rolled).After 30 seconds, we can be sure that the file has been rolled and we can take a look atits contents:% hadoop fs -cat /tmp/flume/events.1399295780136.logHelloAgainThe HDFS sink writes files as the user who is running the Flume agent, unless thehdfs.proxyUser property is set, in which case files will be written as that user.386|Chapter 14: FlumePartitioning and InterceptorsLarge datasets are often organized into partitions, so that processing can be restrictedto particular partitions if only a subset of the data is being queried.
For Flume eventdata, it’s very common to partition by time. A process can be run periodically thattransforms completed partitions (to remove duplicate events, for example).It’s easy to change the example to store data in partitions by setting hdfs.path to includesubdirectories that use time format escape sequences:agent1.sinks.sink1.hdfs.path = /tmp/flume/year=%Y/month=%m/day=%dHere we have chosen to have day-sized partitions, but other levels of granularity arepossible, as are other directory layout schemes. (If you are using Hive, see “Partitionsand Buckets” on page 491 for how Hive lays out partitions on disk.) The full list of formatescape sequences is provided in the documentation for the HDFS sink in the FlumeUser Guide.The partition that a Flume event is written to is determined by the timestamp headeron the event.