Skip to content

Instantly share code, notes, and snippets.

@dalegaspi
Last active May 9, 2018 15:54
Show Gist options
  • Save dalegaspi/2d494f37bec09f11171fec406865102c to your computer and use it in GitHub Desktop.
Save dalegaspi/2d494f37bec09f11171fec406865102c to your computer and use it in GitHub Desktop.
Simple wrapper object for various compression algorithms (GZip, LZ4, Snappy) using generics
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.util.Try
/**
* aped from https://gist.github.com/owainlewis/1e7d1e68a6818ee4d50e (gzip compression)
* and from https://stackoverflow.com/a/39371571/918858 (ser/deser from Array[Byte])
*/
object CompressionUtils {
/**
* serialize [[T]] to byte array
*
* @param value
* @tparam T
* @return
*/
def serialize[T](value: T): Array[Byte] = {
val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(stream)
oos.writeObject(value)
oos.close()
stream.toByteArray
}
/**
* deserialie from byte array to [[T]]
*
* @param bytes
* @tparam T
* @return
*/
def deserialize[T](bytes: Array[Byte]): T = {
val ois = new ObjectInputStream(new ByteArrayInputStream(bytes))
val value = ois.readObject.asInstanceOf[T]
ois.close()
value
}
trait Compressor {
def compress[T](input: T): Array[Byte]
def decompress[T](input: Array[Byte]): Option[T]
}
object Lz4 extends Compressor {
/**
* compress from [[T]]
*
* @param input
* @tparam T
* @return
*/
override def compress[T](input: T): Array[Byte] = ByteArray.compress(serialize(input))
/**
* decompress to [[T]]
*
* @param input
* @tparam T
* @return
*/
override def decompress[T](input: Array[Byte]): Option[T] = ByteArray.decompress(input).map(deserialize[T])
object ByteArray {
import net.jpountz.lz4.LZ4Factory
val factory = LZ4Factory.fastestInstance
val compressor = factory.fastCompressor
val decompressor = factory.fastDecompressor
val COMPRESSED_LENGTH = 4
/**
* generic byte array compression
*
* for some bizarre reason the authors of LZ4 compression needs the decompressed length when decompressing
* but doesn't provide a facility OOB to store it in the compressed data; you can either create a buffer
* which is over-allocated (wasting memory), or store the original uncompressed size in the payload
* i decided to do the latter: the first 4 bytes is the uncompressed size
*
* @param input
* @return
*/
def compress(input: Array[Byte]): Array[Byte] = {
val decompressedLength = input.length
val bbLength = java.nio.ByteBuffer.allocate(COMPRESSED_LENGTH)
bbLength.putInt(decompressedLength)
val maxCompressedLength = compressor.maxCompressedLength(decompressedLength)
val compressed = new Array[Byte](maxCompressedLength)
compressor.compress(input, 0, decompressedLength, compressed, 0, maxCompressedLength)
bbLength.array() ++ compressed
}
/**
* generic byte array decompression
*
* for some bizarre reason the authors of LZ4 compression needs the decompressed length when decompressing
* but doesn't provide a facility OOB to store it in the compressed data; you can either create a buffer
* which is over-allocated (wasting memory), or store the original uncompressed size in the payload
* i decided to do the latter: the first 4 bytes is the uncompressed size
*
* @param compressed
* @return
*/
def decompress(compressed: Array[Byte]): Option[Array[Byte]] =
try {
val compressedLength = compressed.length - COMPRESSED_LENGTH
val bbDecompressedLengthArray = compressed.slice(0, COMPRESSED_LENGTH)
val bbDecompressedLength = java.nio.ByteBuffer.wrap(bbDecompressedLengthArray)
val decompressedLength = bbDecompressedLength.getInt
val restored = new Array[Byte](decompressedLength)
val actualDecompressedLength = decompressor
.decompress(compressed.slice(COMPRESSED_LENGTH, compressedLength), 0, restored, 0, decompressedLength)
Some(restored)
} catch {
case _: Throwable => None
}
}
}
object Snappy extends Compressor {
/**
* compress from [[T]]
*
* @param input
* @tparam T
* @return
*/
override def compress[T](input: T): Array[Byte] = ByteArray.compress(serialize(input))
/**
* decompress to [[T]]
*
* @param input
* @tparam T
* @return
*/
override def decompress[T](input: Array[Byte]): Option[T] = ByteArray.decompress(input).map(deserialize[T])
object ByteArray {
import org.xerial.snappy.Snappy.{compress => snappycompress, uncompress}
/**
* generic byte array compression
*
* @param input
* @return
*/
def compress(input: Array[Byte]): Array[Byte] = {
snappycompress(input)
}
/**
* generic byte array decompression
*
* @param compressed
* @return
*/
def decompress(compressed: Array[Byte]): Option[Array[Byte]] =
Try { uncompress(compressed) } toOption
}
}
object Gzip extends Compressor {
/**
* compress from [[T]]
*
* @param input
* @tparam T
* @return
*/
override def compress[T](input: T): Array[Byte] = ByteArray.compress(serialize(input))
/**
* decompress to [[T]]
*
* @param input
* @tparam T
* @return
*/
override def decompress[T](input: Array[Byte]): Option[T] = ByteArray.decompress(input).map(deserialize[T])
object ByteArray {
/**
* generic byte array compression
*
* @param input
* @return
*/
def compress(input: Array[Byte]): Array[Byte] = {
val bos = new ByteArrayOutputStream(input.length)
val gzip = new GZIPOutputStream(bos)
gzip.write(input)
gzip.close()
val compressed = bos.toByteArray
bos.close()
compressed
}
/**
* generic byte array decompression
*
* @param compressed
* @return
*/
def decompress(compressed: Array[Byte]): Option[Array[Byte]] =
Try {
val inputStream = new GZIPInputStream(new ByteArrayInputStream(compressed))
org.apache.commons.io.IOUtils.toByteArray(inputStream)
}.toOption
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment