Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package avro
import org.apache.avro.file.{DataFileReader, DataFileWriter}
import org.apache.avro.generic._
import org.apache.avro.io._
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter}
import org.apache.avro.{Schema, SchemaBuilder}
import org.c02e.jpgpj._
import java.io._
import scala.concurrent.duration.{Duration, NANOSECONDS}
/*
Code to encrypt and decrypt AVRO messages, the kind that are usually sent and received with a kafka topic.
Maybe it's unnecessary and overkill, but using something like this adds an extra layer of security by controlling
that authorized producers and consumers can exchange messages.
It would be nice to try to encrypt a protobuff message using this library.
@author
Alonso Isidoro Román @alonsoir
https://gpgtools.tenderapp.com/kb/gpg-keychain-faq/how-to-find-and-share-your-public-key
Identify your private key:
gpg --list-secret-keys --keyid-format=long alonsoir@gmail.com
...
gpg --export-secret-keys 564BC40362AC6282 > private.key
Copy the key file to the other machine using a secure transport (scp is your friend).
To import, run
gpg --import private.key
**/
object EncryptAndDecryptAvroMessages {
case class testRecord (data:String)
//
val passphrase = "YOUR-PASSPHRASE"
def instantiatePGPEncryptor = {
val encryptor = new Encryptor(new Key(new File("src/main/resources/public.pgp")))
encryptor.setEncryptionAlgorithm(EncryptionAlgorithm.AES256)
encryptor.setSigningAlgorithm(HashingAlgorithm.Unsigned)
encryptor.setCompressionAlgorithm(CompressionAlgorithm.ZLIB)
encryptor
}
def main(args: Array[String]): Unit = {
encryptAndCypherCustomerWithGenericRecord
//encryptAndCypherSimpleAVROmessage
}
private def encryptAndCypherCustomerWithGenericRecord = {
val parser = new Schema.Parser
val schema = parser.parse("{\n" + " \"type\": \"record\",\n" + " \"namespace\": \"com.example\",\n" + " \"name\": \"Customer\",\n" + " \"fields\": [\n" + " { \"name\": \"first_name\", \"type\": \"string\", \"doc\": \"First Name of Customer\" },\n" + " { \"name\": \"last_name\", \"type\": \"string\", \"doc\": \"Last Name of Customer\" },\n" + " { \"name\": \"age\", \"type\": \"int\", \"doc\": \"Age at the time of registration\" },\n" + " { \"name\": \"height\", \"type\": \"float\", \"doc\": \"Height at the time of registration in cm\" },\n" + " { \"name\": \"weight\", \"type\": \"float\", \"doc\": \"Weight at the time of registration in kg\" },\n" + " { \"name\": \"automated_email\", \"type\": \"boolean\", \"default\": true, \"doc\": \"Field indicating if the user is enrolled in marketing emails\" }\n" + " ]\n" + "}")
// we build our second customer which has defaults
val customerBuilderWithDefault = new GenericRecordBuilder(schema)
customerBuilderWithDefault.set("first_name", "Alonso")
customerBuilderWithDefault.set("last_name", "Isidoro")
customerBuilderWithDefault.set("age", 44)
customerBuilderWithDefault.set("height", 175f)
customerBuilderWithDefault.set("weight", 120.6f)
val customerWithDefault = customerBuilderWithDefault.build
System.out.println("customerWithDefault" + customerWithDefault)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val avroMessageCustomerWithDefault = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(avroMessageCustomerWithDefault, null)
writer.write(customerWithDefault, encoder)
encoder.flush()
println("The avro message size is: " + avroMessageCustomerWithDefault.size() + " bytes")
avroMessageCustomerWithDefault.close()
val serializedcustomerWithDefaultAvroBytes: Array[Byte] = avroMessageCustomerWithDefault.toByteArray()
println("serializedcustomerWithDefaultAvroBytes.length: " + serializedcustomerWithDefaultAvroBytes.length + " bytes")
println("serializedcustomerWithDefaultAvroBytes.size: " + serializedcustomerWithDefaultAvroBytes.size + " bytes")
// byteArrayAvroInputStream just serialized in AVRO format.
val byteArrayAvroInputStream: ByteArrayInputStream = new ByteArrayInputStream(serializedcustomerWithDefaultAvroBytes)
println("byteArrayAvroInputStream.available: " + byteArrayAvroInputStream.available() + " bytes")
val encryptor: Encryptor = instantiatePGPEncryptor
val avroMyCustomerEncryptedPGPFile: FileOutputStream = new FileOutputStream("src/main/resources/avro-customerWithDefault-encrypted.pgp")
val t_ini_encrypt = System.nanoTime()
encryptor.encrypt(byteArrayAvroInputStream, avroMyCustomerEncryptedPGPFile)
val t_fin_encrypt = System.nanoTime()
val tiempo_encrypt = t_fin_encrypt - t_ini_encrypt
val durationEncrypt = Duration( t_fin_encrypt - t_ini_encrypt , NANOSECONDS)
println("Tiempo transcurrido en encryptar el AVRO: " + tiempo_encrypt + " ns. " + durationEncrypt.toMillis + " ms. " + durationEncrypt.toSeconds + " s")
println("avroMyCustomerEncryptedPGPFile.getChannel().position(): " + avroMyCustomerEncryptedPGPFile.getChannel().position() + " bytes")
byteArrayAvroInputStream.close()
//avroEncryptedFile serialized in AVRO format and encrypted using encryptor, initialized with public.pgp key.
avroMyCustomerEncryptedPGPFile.close()
// now i am going to open the encrypted file and decrypt it using the
// private key and passphrase.
val encryptedMyCustomerAvroFile: FileInputStream = new FileInputStream("src/main/resources/avro-customerWithDefault-encrypted.pgp")
println("This is the encrypted avro file: " + "src/main/resources/avro-myCustomer-encrypted.pgp. " + encryptedMyCustomerAvroFile.available() + " bytes")
val decryptedMyCustomerAvroFile: ByteArrayOutputStream = new ByteArrayOutputStream()
val decryptor = new Decryptor(new Key(new File("src/main/resources/private.key"), passphrase))
decryptor.setVerificationRequired(false)
val t_ini_decrypt = System.nanoTime()
val decriptionResult: DecryptionResult = decryptor.decryptWithFullDetails(encryptedMyCustomerAvroFile, decryptedMyCustomerAvroFile)
val t_fin_decrypt = System.nanoTime()
val tiempo_decrypt = t_fin_decrypt - t_ini_decrypt
val durationDecrypt = Duration( t_fin_decrypt - t_ini_decrypt , NANOSECONDS)
println("Tiempo transcurrido en desencryptar el AVRO: " + tiempo_decrypt + "ns. " + durationDecrypt.toMillis + "ms. " + durationDecrypt.toSeconds + "s")
println("This is the decrypted avro file: " + "src/main/resources/avro-customerWithDefault-encrypted.pgp. " + decryptedMyCustomerAvroFile.size() + " bytes")
// do something useful with decription if you want...
println("decriptionResult: " + decriptionResult.toString)
// Deserialize and create custom record
val decryptedAvromessage = decryptedMyCustomerAvroFile.toByteArray
val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema)
val decoder: Decoder = DecoderFactory.get().binaryDecoder(decryptedAvromessage, null)
val someData: GenericRecord = reader.read(null, decoder)
// Make user object
val first_name = someData.get("first_name").toString
println("The content should be Alonso. Data is " + first_name + " " + first_name.equals("Alonso"))
println("Cleaning up. The conclusion is that, logically, adding a PGP encryption layer to an AVRO serialized message adds an extra size when sending it anywhere, between 25 times. Is it worth adding asymmetric encryption to an AVRO message? ")
encryptedMyCustomerAvroFile.close()
decryptedMyCustomerAvroFile.close()
}
private def readCustomerFromFile = {
// reading from a file
val file = new File("customer-generic.avro")
val datumReader = new GenericDatumReader[GenericRecord]
var customerRead = null
try {
val dataFileReader = new DataFileReader[GenericRecord](file, datumReader)
try {
val customerRead = dataFileReader.next
System.out.println("Successfully read avro file")
System.out.println(customerRead.toString)
// get the data from the generic record
System.out.println("First name: " + customerRead.get("first_name"))
// read a non existent field
//System.out.println("Non existent field: " + customerRead.get("not_here"))
} catch {
case e: IOException =>
e.printStackTrace()
} finally if (dataFileReader != null) dataFileReader.close()
}
}
private def writeCustomerToFile(schema: Schema, myCustomer: GenericData.Record):Unit = {
// writing to a file
val datumWriter = new GenericDatumWriter[GenericRecord](schema)
try {
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter)
try {
val path = "customer-generic-" + myCustomer.get("first_name") + ".avro"
val fileCustomer = new File(path)
dataFileWriter.create(myCustomer.getSchema, fileCustomer)
dataFileWriter.append(myCustomer)
System.out.println("Written customer-generic.avro")
dataFileWriter.close()
} catch {
case e: NullPointerException =>
System.out.println("Couldn't write file")
e.printStackTrace()
} finally if (dataFileWriter != null) dataFileWriter.close()
}
}
private def encryptAndCypherSimpleAVROmessage = {
println("This is a test that try to encrypt an avro message")
val schema: Schema = SchemaBuilder
.record("testRecord").fields()
.requiredString("data")
.endRecord()
val testRandomData = "test data"
val genericRecord: GenericRecord = new GenericData.Record(schema)
genericRecord.put("data", testRandomData)
val writer = new SpecificDatumWriter[GenericRecord](schema)
val avroMessage = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(avroMessage, null)
writer.write(genericRecord, encoder)
encoder.flush()
println("The avro message size is: " + avroMessage.size() + " bytes")
avroMessage.close()
val serializedAvroBytes: Array[Byte] = avroMessage.toByteArray()
println("serializedAvroBytes.length: " + serializedAvroBytes.length + " bytes")
println("serializedAvroBytes.size: " + serializedAvroBytes.size + " bytes")
// byteArrayAvroInputStream just serialized in AVRO format.
val byteArrayAvroInputStream: ByteArrayInputStream = new ByteArrayInputStream(serializedAvroBytes)
println("byteArrayAvroInputStream.available: " + byteArrayAvroInputStream.available() + " bytes")
val encryptor: Encryptor = instantiatePGPEncryptor
val avroEncryptedPGPFile: FileOutputStream = new FileOutputStream("src/main/resources/avro-encrypted.pgp")
encryptor.encrypt(byteArrayAvroInputStream, avroEncryptedPGPFile)
println("avroEncryptedPGPFile.getChannel().position(): " + avroEncryptedPGPFile.getChannel().position() + " bytes")
byteArrayAvroInputStream.close()
//avroEncryptedFile serialized in AVRO format and encrypted using encryptor, initialized with public.pgp key.
avroEncryptedPGPFile.close()
// now i am going to open the encrypted file and decrypt it using the
// private key and passphrase.
val encryptedAvroFile: FileInputStream = new FileInputStream("src/main/resources/avro-encrypted.pgp")
println("This is the encrypted avro file: " + "src/main/resources/avro-encrypted.pgp. " + encryptedAvroFile.available() + " bytes")
val decryptedAvroFile: ByteArrayOutputStream = new ByteArrayOutputStream()
val decryptor = new Decryptor(new Key(new File("src/main/resources/private.key"), passphrase))
decryptor.setVerificationRequired(false)
val decriptionResult: DecryptionResult = decryptor.decryptWithFullDetails(encryptedAvroFile, decryptedAvroFile)
println("This is the decrypted avro file: " + "src/main/resources/avro-encrypted.pgp. " + decryptedAvroFile.size() + " bytes")
// do something useful with decription if you want...
println(decriptionResult.toString)
// Deserialize and create generic record
val decryptedAvromessage = decryptedAvroFile.toByteArray
val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema)
val decoder: Decoder = DecoderFactory.get().binaryDecoder(decryptedAvromessage, null)
val someData: GenericRecord = reader.read(null, decoder)
// Make user object
val testdata = testRecord(someData.get("data").toString)
println("The content should be test data. " + testdata + " " + testdata.data.equals(testRandomData))
println("Cleaning up. The conclusion is that, logically, adding a PGP encryption layer to an AVRO serialized message adds an extra size when sending it anywhere, between 55 and 60 times. Is it worth adding asymmetric encryption to an AVRO message? ")
encryptedAvroFile.close()
decryptedAvroFile.close()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment