Last active
May 26, 2023 07:38
Star
You must be signed in to star a gist
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package avro | |
import org.apache.avro.file.{DataFileReader, DataFileWriter} | |
import org.apache.avro.generic._ | |
import org.apache.avro.io._ | |
import org.apache.avro.specific.{SpecificDatumReader, SpecificDatumWriter} | |
import org.apache.avro.{Schema, SchemaBuilder} | |
import org.c02e.jpgpj._ | |
import java.io._ | |
import scala.concurrent.duration.{Duration, NANOSECONDS} | |
/* | |
Code to encrypt and decrypt AVRO messages, the kind that are usually sent and received with a kafka topic. | |
Maybe it's unnecessary and overkill, but using something like this adds an extra layer of security by controlling | |
that authorized producers and consumers can exchange messages. | |
It would be nice to try to encrypt a protobuff message using this library. | |
@author | |
Alonso Isidoro Román @alonsoir | |
https://gpgtools.tenderapp.com/kb/gpg-keychain-faq/how-to-find-and-share-your-public-key | |
Identify your private key: | |
gpg --list-secret-keys --keyid-format=long alonsoir@gmail.com | |
... | |
gpg --export-secret-keys 564BC40362AC6282 > private.key | |
Copy the key file to the other machine using a secure transport (scp is your friend). | |
To import, run | |
gpg --import private.key | |
**/ | |
object EncryptAndDecryptAvroMessages { | |
case class testRecord (data:String) | |
// | |
val passphrase = "YOUR-PASSPHRASE" | |
def instantiatePGPEncryptor = { | |
val encryptor = new Encryptor(new Key(new File("src/main/resources/public.pgp"))) | |
encryptor.setEncryptionAlgorithm(EncryptionAlgorithm.AES256) | |
encryptor.setSigningAlgorithm(HashingAlgorithm.Unsigned) | |
encryptor.setCompressionAlgorithm(CompressionAlgorithm.ZLIB) | |
encryptor | |
} | |
def main(args: Array[String]): Unit = { | |
encryptAndCypherCustomerWithGenericRecord | |
//encryptAndCypherSimpleAVROmessage | |
} | |
private def encryptAndCypherCustomerWithGenericRecord = { | |
val parser = new Schema.Parser | |
val schema = parser.parse("{\n" + " \"type\": \"record\",\n" + " \"namespace\": \"com.example\",\n" + " \"name\": \"Customer\",\n" + " \"fields\": [\n" + " { \"name\": \"first_name\", \"type\": \"string\", \"doc\": \"First Name of Customer\" },\n" + " { \"name\": \"last_name\", \"type\": \"string\", \"doc\": \"Last Name of Customer\" },\n" + " { \"name\": \"age\", \"type\": \"int\", \"doc\": \"Age at the time of registration\" },\n" + " { \"name\": \"height\", \"type\": \"float\", \"doc\": \"Height at the time of registration in cm\" },\n" + " { \"name\": \"weight\", \"type\": \"float\", \"doc\": \"Weight at the time of registration in kg\" },\n" + " { \"name\": \"automated_email\", \"type\": \"boolean\", \"default\": true, \"doc\": \"Field indicating if the user is enrolled in marketing emails\" }\n" + " ]\n" + "}") | |
// we build our second customer which has defaults | |
val customerBuilderWithDefault = new GenericRecordBuilder(schema) | |
customerBuilderWithDefault.set("first_name", "Alonso") | |
customerBuilderWithDefault.set("last_name", "Isidoro") | |
customerBuilderWithDefault.set("age", 44) | |
customerBuilderWithDefault.set("height", 175f) | |
customerBuilderWithDefault.set("weight", 120.6f) | |
val customerWithDefault = customerBuilderWithDefault.build | |
System.out.println("customerWithDefault" + customerWithDefault) | |
val writer = new SpecificDatumWriter[GenericRecord](schema) | |
val avroMessageCustomerWithDefault = new ByteArrayOutputStream() | |
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(avroMessageCustomerWithDefault, null) | |
writer.write(customerWithDefault, encoder) | |
encoder.flush() | |
println("The avro message size is: " + avroMessageCustomerWithDefault.size() + " bytes") | |
avroMessageCustomerWithDefault.close() | |
val serializedcustomerWithDefaultAvroBytes: Array[Byte] = avroMessageCustomerWithDefault.toByteArray() | |
println("serializedcustomerWithDefaultAvroBytes.length: " + serializedcustomerWithDefaultAvroBytes.length + " bytes") | |
println("serializedcustomerWithDefaultAvroBytes.size: " + serializedcustomerWithDefaultAvroBytes.size + " bytes") | |
// byteArrayAvroInputStream just serialized in AVRO format. | |
val byteArrayAvroInputStream: ByteArrayInputStream = new ByteArrayInputStream(serializedcustomerWithDefaultAvroBytes) | |
println("byteArrayAvroInputStream.available: " + byteArrayAvroInputStream.available() + " bytes") | |
val encryptor: Encryptor = instantiatePGPEncryptor | |
val avroMyCustomerEncryptedPGPFile: FileOutputStream = new FileOutputStream("src/main/resources/avro-customerWithDefault-encrypted.pgp") | |
val t_ini_encrypt = System.nanoTime() | |
encryptor.encrypt(byteArrayAvroInputStream, avroMyCustomerEncryptedPGPFile) | |
val t_fin_encrypt = System.nanoTime() | |
val tiempo_encrypt = t_fin_encrypt - t_ini_encrypt | |
val durationEncrypt = Duration( t_fin_encrypt - t_ini_encrypt , NANOSECONDS) | |
println("Tiempo transcurrido en encryptar el AVRO: " + tiempo_encrypt + " ns. " + durationEncrypt.toMillis + " ms. " + durationEncrypt.toSeconds + " s") | |
println("avroMyCustomerEncryptedPGPFile.getChannel().position(): " + avroMyCustomerEncryptedPGPFile.getChannel().position() + " bytes") | |
byteArrayAvroInputStream.close() | |
//avroEncryptedFile serialized in AVRO format and encrypted using encryptor, initialized with public.pgp key. | |
avroMyCustomerEncryptedPGPFile.close() | |
// now i am going to open the encrypted file and decrypt it using the | |
// private key and passphrase. | |
val encryptedMyCustomerAvroFile: FileInputStream = new FileInputStream("src/main/resources/avro-customerWithDefault-encrypted.pgp") | |
println("This is the encrypted avro file: " + "src/main/resources/avro-myCustomer-encrypted.pgp. " + encryptedMyCustomerAvroFile.available() + " bytes") | |
val decryptedMyCustomerAvroFile: ByteArrayOutputStream = new ByteArrayOutputStream() | |
val decryptor = new Decryptor(new Key(new File("src/main/resources/private.key"), passphrase)) | |
decryptor.setVerificationRequired(false) | |
val t_ini_decrypt = System.nanoTime() | |
val decriptionResult: DecryptionResult = decryptor.decryptWithFullDetails(encryptedMyCustomerAvroFile, decryptedMyCustomerAvroFile) | |
val t_fin_decrypt = System.nanoTime() | |
val tiempo_decrypt = t_fin_decrypt - t_ini_decrypt | |
val durationDecrypt = Duration( t_fin_decrypt - t_ini_decrypt , NANOSECONDS) | |
println("Tiempo transcurrido en desencryptar el AVRO: " + tiempo_decrypt + "ns. " + durationDecrypt.toMillis + "ms. " + durationDecrypt.toSeconds + "s") | |
println("This is the decrypted avro file: " + "src/main/resources/avro-customerWithDefault-encrypted.pgp. " + decryptedMyCustomerAvroFile.size() + " bytes") | |
// do something useful with decription if you want... | |
println("decriptionResult: " + decriptionResult.toString) | |
// Deserialize and create custom record | |
val decryptedAvromessage = decryptedMyCustomerAvroFile.toByteArray | |
val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema) | |
val decoder: Decoder = DecoderFactory.get().binaryDecoder(decryptedAvromessage, null) | |
val someData: GenericRecord = reader.read(null, decoder) | |
// Make user object | |
val first_name = someData.get("first_name").toString | |
println("The content should be Alonso. Data is " + first_name + " " + first_name.equals("Alonso")) | |
println("Cleaning up. The conclusion is that, logically, adding a PGP encryption layer to an AVRO serialized message adds an extra size when sending it anywhere, between 25 times. Is it worth adding asymmetric encryption to an AVRO message? ") | |
encryptedMyCustomerAvroFile.close() | |
decryptedMyCustomerAvroFile.close() | |
} | |
private def readCustomerFromFile = { | |
// reading from a file | |
val file = new File("customer-generic.avro") | |
val datumReader = new GenericDatumReader[GenericRecord] | |
var customerRead = null | |
try { | |
val dataFileReader = new DataFileReader[GenericRecord](file, datumReader) | |
try { | |
val customerRead = dataFileReader.next | |
System.out.println("Successfully read avro file") | |
System.out.println(customerRead.toString) | |
// get the data from the generic record | |
System.out.println("First name: " + customerRead.get("first_name")) | |
// read a non existent field | |
//System.out.println("Non existent field: " + customerRead.get("not_here")) | |
} catch { | |
case e: IOException => | |
e.printStackTrace() | |
} finally if (dataFileReader != null) dataFileReader.close() | |
} | |
} | |
private def writeCustomerToFile(schema: Schema, myCustomer: GenericData.Record):Unit = { | |
// writing to a file | |
val datumWriter = new GenericDatumWriter[GenericRecord](schema) | |
try { | |
val dataFileWriter = new DataFileWriter[GenericRecord](datumWriter) | |
try { | |
val path = "customer-generic-" + myCustomer.get("first_name") + ".avro" | |
val fileCustomer = new File(path) | |
dataFileWriter.create(myCustomer.getSchema, fileCustomer) | |
dataFileWriter.append(myCustomer) | |
System.out.println("Written customer-generic.avro") | |
dataFileWriter.close() | |
} catch { | |
case e: NullPointerException => | |
System.out.println("Couldn't write file") | |
e.printStackTrace() | |
} finally if (dataFileWriter != null) dataFileWriter.close() | |
} | |
} | |
private def encryptAndCypherSimpleAVROmessage = { | |
println("This is a test that try to encrypt an avro message") | |
val schema: Schema = SchemaBuilder | |
.record("testRecord").fields() | |
.requiredString("data") | |
.endRecord() | |
val testRandomData = "test data" | |
val genericRecord: GenericRecord = new GenericData.Record(schema) | |
genericRecord.put("data", testRandomData) | |
val writer = new SpecificDatumWriter[GenericRecord](schema) | |
val avroMessage = new ByteArrayOutputStream() | |
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(avroMessage, null) | |
writer.write(genericRecord, encoder) | |
encoder.flush() | |
println("The avro message size is: " + avroMessage.size() + " bytes") | |
avroMessage.close() | |
val serializedAvroBytes: Array[Byte] = avroMessage.toByteArray() | |
println("serializedAvroBytes.length: " + serializedAvroBytes.length + " bytes") | |
println("serializedAvroBytes.size: " + serializedAvroBytes.size + " bytes") | |
// byteArrayAvroInputStream just serialized in AVRO format. | |
val byteArrayAvroInputStream: ByteArrayInputStream = new ByteArrayInputStream(serializedAvroBytes) | |
println("byteArrayAvroInputStream.available: " + byteArrayAvroInputStream.available() + " bytes") | |
val encryptor: Encryptor = instantiatePGPEncryptor | |
val avroEncryptedPGPFile: FileOutputStream = new FileOutputStream("src/main/resources/avro-encrypted.pgp") | |
encryptor.encrypt(byteArrayAvroInputStream, avroEncryptedPGPFile) | |
println("avroEncryptedPGPFile.getChannel().position(): " + avroEncryptedPGPFile.getChannel().position() + " bytes") | |
byteArrayAvroInputStream.close() | |
//avroEncryptedFile serialized in AVRO format and encrypted using encryptor, initialized with public.pgp key. | |
avroEncryptedPGPFile.close() | |
// now i am going to open the encrypted file and decrypt it using the | |
// private key and passphrase. | |
val encryptedAvroFile: FileInputStream = new FileInputStream("src/main/resources/avro-encrypted.pgp") | |
println("This is the encrypted avro file: " + "src/main/resources/avro-encrypted.pgp. " + encryptedAvroFile.available() + " bytes") | |
val decryptedAvroFile: ByteArrayOutputStream = new ByteArrayOutputStream() | |
val decryptor = new Decryptor(new Key(new File("src/main/resources/private.key"), passphrase)) | |
decryptor.setVerificationRequired(false) | |
val decriptionResult: DecryptionResult = decryptor.decryptWithFullDetails(encryptedAvroFile, decryptedAvroFile) | |
println("This is the decrypted avro file: " + "src/main/resources/avro-encrypted.pgp. " + decryptedAvroFile.size() + " bytes") | |
// do something useful with decription if you want... | |
println(decriptionResult.toString) | |
// Deserialize and create generic record | |
val decryptedAvromessage = decryptedAvroFile.toByteArray | |
val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema) | |
val decoder: Decoder = DecoderFactory.get().binaryDecoder(decryptedAvromessage, null) | |
val someData: GenericRecord = reader.read(null, decoder) | |
// Make user object | |
val testdata = testRecord(someData.get("data").toString) | |
println("The content should be test data. " + testdata + " " + testdata.data.equals(testRandomData)) | |
println("Cleaning up. The conclusion is that, logically, adding a PGP encryption layer to an AVRO serialized message adds an extra size when sending it anywhere, between 55 and 60 times. Is it worth adding asymmetric encryption to an AVRO message? ") | |
encryptedAvroFile.close() | |
decryptedAvroFile.close() | |
} | |
} |
Yeah, sorry for the clumsy code, it is just a poc made without refactoring.
I just wanted to know if i can encode an avro message, yes, it is possible,
and how much bigger would be. It is like 16x original size.
El vie, 17 mar 2023 a las 13:00, Callum Macdonald ***@***.***>)
escribió:
… ***@***.**** commented on this gist.
------------------------------
FYI, I think if you add a file extension to encryptAvromessage then the
code would be syntax highlighted.
—
Reply to this email directly, view it on GitHub
<https://gist.github.com/acfff23335eae53b5a7cd8f80171e951#gistcomment-4506372>
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AASLMOWJE262KXXWMWXMQODW4RG4NBFKMF2HI4TJMJ2XIZLTSKBKK5TBNR2WLJDHNFZXJJDOMFWWLK3UNBZGKYLEL52HS4DFQKSXMYLMOVS2I5DSOVS2I3TBNVS3W5DIOJSWCZC7OBQXE5DJMNUXAYLOORPWCY3UNF3GS5DZVRZXKYTKMVRXIX3UPFYGLK2HNFZXIQ3PNVWWK3TUUZ2G64DJMNZZDAVEOR4XAZNEM5UXG5FFOZQWY5LFVEYTAOBSGE2TCNBVU52HE2LHM5SXFJTDOJSWC5DF>
.
You are receiving this email because you authored the thread.
Triage notifications on the go with GitHub Mobile for iOS
<https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675>
or Android
<https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub>
.
--
Alonso Isidoro Roman
[image: https://]about.me/alonso.isidoro.roman
<https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
Oh wow, 16x, that's wild! Well, thanks for sharing it anyway, was useful for me in my research on how to encrypt avro data. :-)
It makes me happy that it's useful for something. The purpose of free software is for all of us to learn.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
FYI, I think if you add a file extension to
encryptAvromessage
then the code would be syntax highlighted.