Tom White - Hadoop The Definitive Guide_ 4 edition - 2015 (811394), страница 77
Текст из файла (страница 77)
ReadSupport and WriteSupport are the integration points in Java, and implementations of these classes do theconversion between the objects used by the tool or component and the objects used torepresent each Parquet type in the schema.To demonstrate, we’ll use a simple in-memory model that comes bundled with Parquetin the parquet.example.data and parquet.example.data.simple packages. Then, inthe next section, we’ll use an Avro representation to do the same thing.As the names suggest, the example classes that come with Parquet arean object model for demonstrating how to work with Parquet files;for production, one of the supported frameworks should be used(Avro, Protocol Buffers, or Thrift).To write a Parquet file, we need to define a Parquet schema, represented by an instanceof parquet.schema.MessageType:MessageType schema = MessageTypeParser.parseMessageType("message Pair {\n" +" required binary left (UTF8);\n" +" required binary right (UTF8);\n" +"}");Next, we need to create an instance of a Parquet message for each record to be writtento the file.
For the parquet.example.data package, a message is represented by aninstance of Group, constructed using a GroupFactory:GroupFactory groupFactory = new SimpleGroupFactory(schema);Group group = groupFactory.newGroup().append("left", "L").append("right", "R");Notice that the values in the message are UTF8 logical types, and Group provides a naturalconversion from a Java String for us.Writing and Reading Parquet Files|373The following snippet of code shows how to create a Parquet file and write a messageto it. The write() method would normally be called in a loop to write multiple messagesto the file, but this only writes one here:Configuration conf = new Configuration();Path path = new Path("data.parquet");GroupWriteSupport writeSupport = new GroupWriteSupport();GroupWriteSupport.setSchema(schema, conf);ParquetWriter<Group> writer = new ParquetWriter<Group>(path, writeSupport,ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME,ParquetWriter.DEFAULT_BLOCK_SIZE,ParquetWriter.DEFAULT_PAGE_SIZE,ParquetWriter.DEFAULT_PAGE_SIZE, /* dictionary page size */ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,ParquetProperties.WriterVersion.PARQUET_1_0, conf);writer.write(group);writer.close();The ParquetWriter constructor needs to be provided with a WriteSupport instance,which defines how the message type is translated to Parquet’s types.
In this case, we areusing the Group message type, so GroupWriteSupport is used. Notice that the Parquetschema is set on the Configuration object by calling the setSchema() static methodon GroupWriteSupport, and then the Configuration object is passed to ParquetWriter. This example also illustrates the Parquet file properties that may be set, correspond‐ing to the ones listed in Table 13-3.Reading a Parquet file is simpler than writing one, since the schema does not need tobe specified as it is stored in the Parquet file.
(It is, however, possible to set a readschema to return a subset of the columns in the file, via projection.) Also, there are nofile properties to be set since they are set at write time:GroupReadSupport readSupport = new GroupReadSupport();ParquetReader<Group> reader = new ParquetReader<Group>(path, readSupport);ParquetReader has a read() method to read the next message. It returns null whenthe end of the file is reached:Group result = reader.read();assertNotNull(result);assertThat(result.getString("left", 0), is("L"));assertThat(result.getString("right", 0), is("R"));assertNull(reader.read());Note that the 0 parameter passed to the getString() method specifies the index of thefield to retrieve, since fields may have repeated values.374| Chapter 13: ParquetAvro, Protocol Buffers, and ThriftMost applications will prefer to define models using a framework like Avro, ProtocolBuffers, or Thrift, and Parquet caters to all of these cases.
Instead of ParquetWriter andParquetReader, use AvroParquetWriter, ProtoParquetWriter, or ThriftParquetWriter, and the respective reader classes. These classes take care of translating betweenAvro, Protocol Buffers, or Thrift schemas and Parquet schemas (as well as performingthe equivalent mapping between the framework types and Parquet types), which meansyou don’t need to deal with Parquet schemas directly.Let’s repeat the previous example but using the Avro Generic API, just like we did in“In-Memory Serialization and Deserialization” on page 349.
The Avro schema is:{"type": "record","name": "StringPair","doc": "A pair of strings.","fields": [{"name": "left", "type": "string"},{"name": "right", "type": "string"}]}We create a schema instance and a generic record with:Schema.Parser parser = new Schema.Parser();Schema schema = parser.parse(getClass().getResourceAsStream("StringPair.avsc"));GenericRecord datum = new GenericData.Record(schema);datum.put("left", "L");datum.put("right", "R");Then we can write a Parquet file:Path path = new Path("data.parquet");AvroParquetWriter<GenericRecord> writer =new AvroParquetWriter<GenericRecord>(path, schema);writer.write(datum);writer.close();AvroParquetWriter converts the Avro schema into a Parquet schema, and also trans‐lates each Avro GenericRecord instance into the corresponding Parquet types to writeto the Parquet file. The file is a regular Parquet file—it is identical to the one written inthe previous section using ParquetWriter with GroupWriteSupport, except for an extrapiece of metadata to store the Avro schema.
We can see this by inspecting the file’smetadata using Parquet’s command-line tools:44. The Parquet tools can be downloaded as a binary tarball from the Parquet Maven repository. Search for“parquet-tools” on http://search.maven.org.Writing and Reading Parquet Files|375% parquet-tools meta data.parquet...extra:avro.schema = {"type":"record","name":"StringPair", ......Similarly, to see the Parquet schema that was generated from the Avro schema, we canuse the following:% parquet-tools schema data.parquetmessage StringPair {required binary left (UTF8);required binary right (UTF8);}To read the Parquet file back, we use an AvroParquetReader and get back Avro GenericRecord objects:AvroParquetReader<GenericRecord> reader =new AvroParquetReader<GenericRecord>(path);GenericRecord result = reader.read();assertNotNull(result);assertThat(result.get("left").toString(), is("L"));assertThat(result.get("right").toString(), is("R"));assertNull(reader.read());Projection and read schemasIt’s often the case that you only need to read a few columns in the file, and indeed thisis the raison d’être of a columnar format like Parquet: to save time and I/O.
You can usea projection schema to select the columns to read. For example, the following schemawill read only the right field of a StringPair:{"type": "record","name": "StringPair","doc": "The right field of a pair of strings.","fields": [{"name": "right", "type": "string"}]}In order to use a projection schema, set it on the configuration using the setRequestedProjection() static convenience method on AvroReadSupport:Schema projectionSchema = parser.parse(getClass().getResourceAsStream("ProjectedStringPair.avsc"));Configuration conf = new Configuration();AvroReadSupport.setRequestedProjection(conf, projectionSchema);Then pass the configuration into the constructor for AvroParquetReader:AvroParquetReader<GenericRecord> reader =new AvroParquetReader<GenericRecord>(conf, path);GenericRecord result = reader.read();376|Chapter 13: ParquetassertNull(result.get("left"));assertThat(result.get("right").toString(), is("R"));Both the Protocol Buffers and Thrift implementations support projection in a similarmanner.
In addition, the Avro implementation allows you to specify a reader’s schemaby calling setReadSchema() on AvroReadSupport. This schema is used to resolve Avrorecords according to the rules listed in Table 12-4.The reason that Avro has both a projection schema and a reader’s schema is that theprojection must be a subset of the schema used to write the Parquet file, so it cannot beused to evolve a schema by adding new fields.The two schemas serve different purposes, and you can use both together. The projec‐tion schema is used to filter the columns to read from the Parquet file. Although it isexpressed as an Avro schema, it can be viewed simply as a list of Parquet columns toread back. The reader’s schema, on the other hand, is used only to resolve Avro records.It is never translated to a Parquet schema, since it has no bearing on which columns areread from the Parquet file.
For example, if we added a description field to our Avroschema (like in “Schema Resolution” on page 355) and used it as the Avro reader’sschema, then the records would contain the default value of the field, even though theParquet file has no such field.Parquet MapReduceParquet comes with a selection of MapReduce input and output formats for reading andwriting Parquet files from MapReduce jobs, including ones for working with Avro,Protocol Buffers, and Thrift schemas and data.The program in Example 13-1 is a map-only job that reads text files and writes Parquetfiles where each record is the line’s offset in the file (represented by an int64—convertedfrom a long in Avro) and the line itself (a string).