Dec 19, 2010

Apache Avro serialization without code generation and IDL

It is not a secret that real programmers just love writing their own serializaiton frameworks. Everyone from JBoss to GOOG has one. So it's only natural for the Hadoop stack guys to have one of their own :) What makes Avro different from Protocol Buffers and Thrift though is that one does not need IDL and code generation to send data streams between nodes of a distributed system. An Avro data stream always carries its JSON-formatted schema with it and so can be deserialized by a client without any previous knowledge of what could be inside. 

While experimenting with this feature I did not find good examples not requiring IDL and code generation so I came up with my own. What I have in this tiny test project is a couple of builders for Avro Schema and GenericRecord classes and corresponding unit tests. The tests demonstrate writing to an output stream and reading back. 

In the most basic case, an entire stream/file represents a serialized sequence of records of the same schema. All one needs to write data to an output stream is a schema and data records created with the schema. The very same generic record type is used to read data from an input stream. In the test below we just assume a certain schema but in production code one would need to check schema type of each field. Also, note the idiomatic Avro approach to reusing the same record instance to read the whole data set.

@Test
    public void testUniformSchemaSerialization() throws Exception {
        final Schema schema = new AvroSchemaBuilder(2).string(COLUMN1).int32(COLUMN2).build("DATA");

        final ByteArrayOutputStream out = new ByteArrayOutputStream();
        final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)).create(schema, out);

        final AvroRecordBuilder builder = new AvroRecordBuilder(schema);
        writer.append(builder.field(CITY1).field(POPULATION1).build());
        writer.append(builder.field(CITY2).field(POPULATION2).build());
        writer.close();

        final DataFileStream<GenericRecord> reader = new DataFileStream<GenericRecord>(new ByteArrayInputStream(out.toByteArray()), new GenericDatumReader<GenericRecord>());
        GenericRecord deserialized = null;

        assertTrue(reader.hasNext());
        deserialized = reader.next(deserialized);
        assertEquals(new Utf8(CITY1), deserialized.get(COLUMN1));
        assertEquals(POPULATION1, deserialized.get(COLUMN2));

        assertTrue(reader.hasNext());
        deserialized = reader.next(deserialized);
        assertEquals(new Utf8(CITY2), deserialized.get(COLUMN1));
        assertEquals(POPULATION2, deserialized.get(COLUMN2));

        assertFalse(reader.hasNext());
    }

The generated schema looks like this:

{
  "type" : "record",
  "name" : "DATA",
  "fields" : [ {
    "name" : "CITY",
    "type" : "string"
  }, {
    "name" : "POPULATION",
    "type" : "int"
  } ]
}

In real life the need to support headers (e.g. with version-like fields) and/or footers (e.g. with checksum-like fields) of some kind is also likely to arise. In contrast to actual data tuples, header and footer formats are likely to be the same in all streams.There is one obvious way to implement it with the Avro union type. We declare the output schema for our records to be a union of three possible schemas. The ones for header and footer will have only one record each.

final String HEADER_FIELD1 = "VERSION";
        final String FOOTER_FIELD1 = "CHKSUM";
        final String HEADER_SCHEMA = "HEADER";
        final String BODY_SCHEMA = "DATA";
        final String FOOTER_SCHEMA = "FOOTER";
        final int version = 2010;
        final long total = 2;

        final Schema schema = Schema.createUnion(
                Lists.<Schema>newArrayList(
                        new AvroSchemaBuilder(1).int32(HEADER_FIELD1).build(HEADER_SCHEMA),
                        new AvroSchemaBuilder(2).string(COLUMN1).int32(COLUMN2).build(BODY_SCHEMA),
                        new AvroSchemaBuilder(1).int64(FOOTER_FIELD1).build(FOOTER_SCHEMA)));

        final ByteArrayOutputStream out = new ByteArrayOutputStream();
        final DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>(schema)).create(schema, out);

        final AvroRecordBuilder headerBuilder = new AvroRecordBuilder(schema.getTypes().get(0));
        writer.append(headerBuilder.field(version).build());

        final AvroRecordBuilder bodyBuilder = new AvroRecordBuilder(schema.getTypes().get(1));
        writer.append(bodyBuilder.field(CITY1).field(POPULATION1).build());
        writer.append(bodyBuilder.field(CITY2).field(POPULATION2).build());

        final AvroRecordBuilder footerBuilder = new AvroRecordBuilder(schema.getTypes().get(2));
        writer.append(footerBuilder.field(total).build());

        writer.close();

To read it back we again assume we know what is in the input stream. I a real application one would need to compare schema names against a well-known/hard-coded list of alternatives (in this test, HEADER, DATA and FOOTER) to understand how to interpret a record. Actually, it would be a good idea to check in production code that the first record is always of type HEADER and the last of type FOOTER.

final DataFileStream<GenericRecord> reader = new DataFileStream<GenericRecord>(new ByteArrayInputStream(out.toByteArray()), new GenericDatumReader<GenericRecord>());
        GenericRecord deserialized = null;

        assertTrue(reader.hasNext());
        deserialized = reader.next(deserialized);
        assertEquals(HEADER_SCHEMA, deserialized.getSchema().getName());
        assertEquals(version, deserialized.get(HEADER_FIELD1));

        assertTrue(reader.hasNext());
        deserialized = reader.next(deserialized);
        assertEquals(BODY_SCHEMA, deserialized.getSchema().getName());
        assertEquals(new Utf8(CITY1), deserialized.get(COLUMN1));
        assertEquals(POPULATION1, deserialized.get(COLUMN2));

        assertTrue(reader.hasNext());
        deserialized = reader.next(deserialized);
        assertEquals(BODY_SCHEMA, deserialized.getSchema().getName());
        assertEquals(new Utf8(CITY2), deserialized.get(COLUMN1));
        assertEquals(POPULATION2, deserialized.get(COLUMN2));

        assertTrue(reader.hasNext());
        deserialized = reader.next(deserialized);
        assertEquals(FOOTER_SCHEMA, deserialized.getSchema().getName());
        assertEquals(total, deserialized.get(FOOTER_FIELD1));

        assertFalse(reader.hasNext());

In this case the generated schema looks slightly more interesting:

[ {
  "type" : "record",
  "name" : "HEADER",
  "fields" : [ {
    "name" : "VERSION",
    "type" : "int"
  } ]
}, {
  "type" : "record",
  "name" : "DATA",
  "fields" : [ {
    "name" : "CITY",
    "type" : "string"
  }, {
    "name" : "POPULATION",
    "type" : "int"
  } ]
}, {
  "type" : "record",
  "name" : "FOOTER",
  "fields" : [ {
    "name" : "CHKSUM",
    "type" : "long"
  } ]
} ]

No comments: