Skip to content

Instantly share code, notes, and snippets.

@aaronkub
Forked from davideicardi/README.md
Created June 3, 2021 16:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aaronkub/44b058041d7e5736d9c930ac5c9efadb to your computer and use it in GitHub Desktop.
Save aaronkub/44b058041d7e5736d9c930ac5c9efadb to your computer and use it in GitHub Desktop.
Write and read Avro records from bytes array

Avro serialization

There are 4 possible serialization format when using avro:

Avro4s

Serialization using the avro4s library, that have the feature to generate a schema and a record (GenericRecord) given a case class.

Add library: libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "1.8.3"

Example of Avro Data Serialization:

import java.io.ByteArrayOutputStream

import com.sksamuel.avro4s.{AvroInputStream, AvroOutputStream}

object SampleAvro4sData {
  case class User(name: String, favorite_number: Int,favorite_color: String )

  def main(args: Array[String]): Unit = {

    val outputStream = new ByteArrayOutputStream()
    val os = AvroOutputStream.data[User](outputStream)
    os.write(Seq(
      User("davide", 6, "red"),
      User("mark", 4, "white")))
    os.flush()
    os.close()

    val bytes = outputStream.toByteArray

    val is = AvroInputStream.data[User](bytes)
    val users = is.iterator.toSet
    is.close()

    println("len: " + bytes.length)
    println(users.mkString("\n"))
  }
}

To use Avro Binary Encoding just change AvroOutputStream.data to AvroOutputStream.binary.

org.apache.avro

Serialization using the official java library.

Add library: libraryDependencies += "org.apache.avro" % "avro" % "1.7.7"

Example of Avro Data Serialization and Binary Encoding.

import org.apache.avro.io.{DecoderFactory, EncoderFactory}

import scala.collection.mutable

object SampleAvro {
  def main(args: Array[String]): Unit = {

    val schema =
    """
      |{"namespace": "example.avro",
      | "type": "record",
      | "name": "User",
      | "fields": [
      |     {"name": "name", "type": "string"},
      |     {"name": "favorite_number",  "type": "int"},
      |     {"name": "favorite_color", "type": "string"}
      | ]
      |}
    """.stripMargin

    import org.apache.avro.generic.GenericData

    val schemaObj = new org.apache.avro.Schema.Parser().parse(schema)

    val user1 = new GenericData.Record(schemaObj)
    user1.put("name", "Alyssa")
    user1.put("favorite_number", 256)
    user1.put("favorite_color", "blue")

    val user2 = new GenericData.Record(schemaObj)
    user2.put("name", "Ben")
    user2.put("favorite_number", 7)
    user2.put("favorite_color", "red")

    // Data serialization (data + schema)
    val bytes = write(List(user1, user2), schemaObj)
    val users = read(bytes, schemaObj)

    println("Data serialization")
    println(users.mkString("\n"))

    // Binary encoding only (only data without schema)
    val bytes2 = writeBinary(List(user1, user2), schemaObj)
    val users2 = readBinary(bytes2, schemaObj)

    println("Binary encoding")
    println(users2.mkString("\n"))
  }

  def write(records: Seq[org.apache.avro.generic.GenericData.Record],
            schema: org.apache.avro.Schema): Array[Byte] = {
    import java.io.ByteArrayOutputStream
    import org.apache.avro.file.DataFileWriter
    import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}

    val outputStream = new ByteArrayOutputStream()
    val datumWriter = new GenericDatumWriter[GenericRecord](schema)
    val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
    dataFileWriter.create(schema, outputStream)

    for (record <- records)
      dataFileWriter.append(record)

    dataFileWriter.flush()
    dataFileWriter.close()

    outputStream.toByteArray
  }

  def read(bytes: Array[Byte],
           schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
    import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
    import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val inputStream = new SeekableByteArrayInput(bytes)
    val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader)

    import scala.collection.JavaConverters._
    val list = dataFileReader.iterator().asScala.toList

    dataFileReader.close()

    list
  }

  def writeBinary(records: Seq[org.apache.avro.generic.GenericData.Record],
            schema: org.apache.avro.Schema): Array[Byte] = {
    import java.io.ByteArrayOutputStream
    import org.apache.avro.file.DataFileWriter
    import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}

    val outputStream = new ByteArrayOutputStream()
    val datumWriter = new GenericDatumWriter[GenericRecord](schema)
    val encoder = EncoderFactory.get.binaryEncoder(outputStream, null)

    for (record <- records)
      datumWriter.write(record, encoder)

    encoder.flush()

    outputStream.toByteArray
  }

  def readBinary(bytes: Array[Byte],
           schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
    import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
    import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val inputStream = new SeekableByteArrayInput(bytes)
    val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)

    val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord]
    while (!decoder.isEnd) {
      val item = datumReader.read(null, decoder)

      result += item
    }

    result.toList
  }
}

The same can also be performed using specific class instead of using GenericRecord. One way is to use ReflectDatumWriter/ReflectDatumReader instead of GenericDatumWriter/GenericDatumReader.

Schema resolution

Rules that must be used to ensure correct schema evolution: https://avro.apache.org/docs/current/spec.html#Schema+Resolution

Note that when reading a binary avro you should always provide the original schema used to write it. It can be provided as an header (see data serialization) or from some where else. If you want to read data to a new schema (a new class) you should provide the old and the new schema. The old schema is used to read the binary data, the new schema is used to map old fields to new fields (following the above rules).

import java.io.{ByteArrayOutputStream, FileOutputStream}
import java.nio.file.{Files, Paths}
import org.apache.avro.file.{DataFileReader, DataFileWriter, SeekableByteArrayInput}
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter, GenericRecord}
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.{ReflectData, ReflectDatumReader, ReflectDatumWriter}
import scala.collection.mutable
import scala.reflect.ClassTag
import scala.collection.JavaConverters._
object avroUtils {
def writeGenericData(records: Seq[org.apache.avro.generic.GenericData.Record],
schema: org.apache.avro.Schema): Array[Byte] = {
val outputStream = new ByteArrayOutputStream()
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
dataFileWriter.create(schema, outputStream)
for (record <- records)
dataFileWriter.append(record)
dataFileWriter.flush()
dataFileWriter.close()
outputStream.toByteArray
}
def readGenericData(bytes: Array[Byte],
schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
val datumReader = new GenericDatumReader[GenericRecord](schema)
val inputStream = new SeekableByteArrayInput(bytes)
val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader)
val list = dataFileReader.iterator().asScala.toList
dataFileReader.close()
list
}
def writeGenericBinary(records: Seq[org.apache.avro.generic.GenericData.Record],
schema: org.apache.avro.Schema): Array[Byte] = {
val outputStream = new ByteArrayOutputStream()
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
val encoder = EncoderFactory.get.binaryEncoder(outputStream, null)
for (record <- records)
datumWriter.write(record, encoder)
encoder.flush()
outputStream.toByteArray
}
def readGenericBinary(bytes: Array[Byte],
schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
val datumReader = new GenericDatumReader[GenericRecord](schema)
val inputStream = new SeekableByteArrayInput(bytes)
val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)
val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord]
while (!decoder.isEnd) {
val item = datumReader.read(null, decoder)
result += item
}
result.toList
}
def writeSpecificData[T](records: Seq[T])(implicit classTag: ClassTag[T]): Array[Byte] = {
val outputStream = new ByteArrayOutputStream()
val schema = ReflectData.get().getSchema(classTag.runtimeClass)
val datumWriter = new ReflectDatumWriter[T](schema)
val dataFileWriter = new DataFileWriter[T](datumWriter)
dataFileWriter.create(schema, outputStream)
for (record <- records)
dataFileWriter.append(record)
dataFileWriter.flush()
dataFileWriter.close()
outputStream.toByteArray
}
def readSpecificData[T](bytes: Array[Byte])(implicit classTag: ClassTag[T]): List[T] = {
val schema = ReflectData.get().getSchema(classTag.runtimeClass)
readSpecificData(bytes, schema)
}
def readSpecificData[T](bytes: Array[Byte], schema: org.apache.avro.Schema): List[T] = {
val datumReader = new ReflectDatumReader[T](schema)
val inputStream = new SeekableByteArrayInput(bytes)
val dataFileReader = new DataFileReader[T](inputStream, datumReader)
val list = dataFileReader.iterator().asScala.toList
dataFileReader.close()
list
}
def generateSchema[T]()(implicit classTag: ClassTag[T]): String = {
val schema = ReflectData.get().getSchema(classTag.runtimeClass)
schema.toString(true)
}
def writeSpecificBinary[T](value: T, schema: org.apache.avro.Schema): Array[Byte] = {
val datumWriter = new ReflectDatumWriter[T](schema)
val outputStream = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(outputStream, null)
datumWriter.write(value, encoder)
encoder.flush()
outputStream.toByteArray
}
def readSpecificBinary[T](
bytes: Array[Byte],
readerSchema: org.apache.avro.Schema, // destination schema
writerSchema: org.apache.avro.Schema, // source schema
reuse: T): T = {
val datumReader = new ReflectDatumReader[T](writerSchema, readerSchema)
val inputStream = new SeekableByteArrayInput(bytes)
val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)
datumReader.read(reuse, decoder)
}
def writeFile(fileName: String, bytes: Array[Byte]): Unit = {
val file = new FileOutputStream(fileName)
try {
file.write(bytes)
} finally {
file.close()
}
}
def readFile(fileName: String): Array[Byte] = {
Files.readAllBytes(Paths.get(fileName))
}
}
import org.apache.avro.Schema
import test.avro.User
object SampleAvroEvolve {
def main(args: Array[String]): Unit = {
// writeV1()
val result = readV2fromV1()
println(result)
}
def readV1() = {
val schemaV1 = new Schema.Parser().parse(
"""
|{
| "namespace": "test.avro",
| "type": "record",
| "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": "int"},
| {"name": "favorite_color", "type": "string"}
| ]
|}
""".stripMargin
)
val bytesV1 = avroUtils.readFile("/tmp/userv1.bin")
avroUtils.readSpecificBinary(bytesV1, schemaV1, schemaV1, new User)
}
def readV2fromV1() = {
val schemaV1 = new Schema.Parser().parse(
"""
|{
| "namespace": "test.avro",
| "type": "record",
| "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": "int"},
| {"name": "favorite_color", "type": "string"}
| ]
|}
""".stripMargin
)
val schemaV2 = new Schema.Parser().parse(
"""
|{
| "namespace": "test.avro",
| "type": "record",
| "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": "int"},
| {"name": "favorite_animal", "type": "string", "default": "tiger"},
| {"name": "favorite_color", "type": "string"}
| ]
|}
""".stripMargin
)
val bytesV1 = avroUtils.readFile("/tmp/userv1.bin")
avroUtils.readSpecificBinary(bytesV1, schemaV2, schemaV1, new User)
}
def writeV1(): Unit = {
val schemaV1 = new Schema.Parser().parse(
"""
|{
| "namespace": "test.avro",
| "type": "record",
| "name": "User",
| "fields": [
| {"name": "name", "type": "string"},
| {"name": "favorite_number", "type": "int"},
| {"name": "favorite_color", "type": "string"}
| ]
|}
""".stripMargin
)
val userV1 = new User
userV1.name = "Alyssa"
userV1.favorite_number = 256
userV1.favorite_color = "blue"
val bytesV1 = avroUtils.writeSpecificBinary(userV1, schemaV1)
avroUtils.writeFile("/tmp/userv1.bin", bytesV1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment