Skip to content

Instantly share code, notes, and snippets.

@t-mat
Last active August 29, 2015 14:04
Show Gist options
  • Save t-mat/c4e3b213ce5e88dcdc64 to your computer and use it in GitHub Desktop.
Save t-mat/c4e3b213ce5e88dcdc64 to your computer and use it in GitHub Desktop.
LZ4 streaming API example 3 (pseudo logger application of the ring buffer method)

Results

Note : -my-streaming method is line-by-line LZ4 streaming compression.

Bytes Ratio File
205242368 1.0 access_log_Jul95
51008234 4.02 access_log_Jul95.lz4 -my-streaming
39127074 5.25 access_log_Jul95.lz4 -1
24648061 8.33 access_log_Jul95.lz4 -9
20213868 10.2 access_log_Jul95.zip
Bytes Ratio File
100000000 1.00 enwik8
65419906 1.53 enwik8.lz4 -my-streaming
56995438 1.75 enwik8.lz4 -1
42283904 2.36 enwik8.lz4 -9
35194836 2.84 enwik8.zip

Data set

#define _CRT_SECURE_NO_WARNINGS // for MSVC
/////////////////////////////////////////////////
// Header
/////////////////////////////////////////////////
#include <stdarg.h>
typedef enum {
MyLz4Write_ErrorWriteSize = -0x7fffffff,
MyLz4Write_ErrorComp,
} MyLz4Write_Error;
typedef enum {
MyLz4Read_ErrorReadSize = -0x7fffffff,
MyLz4Read_ErrorCompSize,
MyLz4Read_ErrorDecomp,
MyLz4Read_ErrorDecompSize,
} MyLz4Read_Error;
typedef struct MyLz4Write_ctx_t MyLz4Write_ctx_t;
typedef struct MyLz4Read_ctx_t MyLz4Read_ctx_t;
typedef size_t (*MyLz4Write_func_t)(void* userPtr, const void* buf, size_t n);
typedef size_t (*MyLz4Read_func_t)(void* userPtr, void* buf, size_t n);
MyLz4Write_ctx_t* MyLz4Write_create(MyLz4Write_func_t writeFunc, void* userPtr,
size_t ringBufBytes, size_t msgMaxBytes);
void MyLz4Write_release (MyLz4Write_ctx_t* ctx);
int MyLz4Write_begin (MyLz4Write_ctx_t* ctx, void** ppBuf);
int MyLz4Write_end (MyLz4Write_ctx_t* ctx, size_t n);
MyLz4Read_ctx_t* MyLz4Read_create(MyLz4Read_func_t readFunc, void* userPtr);
void MyLz4Read_release (MyLz4Read_ctx_t* ctx);
int MyLz4Read_packet (MyLz4Read_ctx_t* ctx, const void** ppBuf);
/////////////////////////////////////////////////
// Implementation
/////////////////////////////////////////////////
#include <stdint.h>
#include <stdarg.h>
#include <stdio.h>
#include <malloc.h>
#include <memory.h>
#include "lz4.h"
typedef struct {
int32_t bytes;
uint8_t* buf;
int32_t msgMax;
int offset;
} RingBuf;
typedef struct {
size_t bytes;
uint8_t* buf;
} MsgBuf;
struct MyLz4Write_ctx_t {
LZ4_stream_t* lz4s;
uint32_t flags;
RingBuf ringBuf;
MsgBuf msgBuf;
MyLz4Write_func_t writeFunc;
void* userPtr;
};
struct MyLz4Read_ctx_t {
LZ4_streamDecode_t* lz4sd;
uint32_t flags;
RingBuf ringBuf;
MsgBuf msgBuf;
MyLz4Read_func_t readFunc;
void* userPtr;
};
enum {
Flags_Terminated = 1 << 0,
};
static void ringBufForward(RingBuf* ringBuf, int n)
{
int i = ringBuf->offset + n;
if (i + ringBuf->msgMax >= ringBuf->bytes)
i = 0;
ringBuf->offset = i;
}
static size_t MyLz4Write_raw(MyLz4Write_ctx_t* ctx, const void* buf, size_t n)
{
return ctx->writeFunc(ctx->userPtr, buf, n);
}
MyLz4Write_ctx_t* MyLz4Write_create(
MyLz4Write_func_t writeFunc,
void* userPtr,
size_t ringBufBytes,
size_t msgMaxBytes)
{
MyLz4Write_ctx_t* ctx = malloc(sizeof(MyLz4Write_ctx_t));
ctx->lz4s = LZ4_createStream();
ctx->flags = 0;
ctx->ringBuf.bytes = (int32_t) ringBufBytes;
ctx->ringBuf.buf = malloc(ringBufBytes);
ctx->ringBuf.msgMax = (int32_t) msgMaxBytes;
ctx->ringBuf.offset = 0;
ctx->msgBuf.bytes = 2 + LZ4_compressBound((int) msgMaxBytes);
ctx->msgBuf.buf = malloc(ctx->msgBuf.bytes);
ctx->writeFunc = writeFunc;
ctx->userPtr = userPtr;
MyLz4Write_raw(ctx, &ctx->ringBuf.msgMax, sizeof(ctx->ringBuf.msgMax));
MyLz4Write_raw(ctx, &ctx->ringBuf.bytes, sizeof(ctx->ringBuf.bytes));
return ctx;
}
void MyLz4Write_release(MyLz4Write_ctx_t* ctx)
{
{
const uint16_t zero = 0;
MyLz4Write_raw(ctx, &zero, sizeof(zero));
}
free(ctx->msgBuf.buf);
free(ctx->ringBuf.buf);
LZ4_freeStream(ctx->lz4s);
free(ctx);
}
int MyLz4Write_begin(MyLz4Write_ctx_t* ctx, void** ppBuf)
{
*ppBuf = (void*) &ctx->ringBuf.buf[ctx->ringBuf.offset];
return (int) ctx->ringBuf.msgMax;
}
int MyLz4Write_end(MyLz4Write_ctx_t* ctx, size_t n)
{
RingBuf* const rb = &ctx->ringBuf;
MsgBuf* const mb = &ctx->msgBuf;
if (n == 0)
return 0;
if (n > rb->msgMax)
return MyLz4Write_ErrorWriteSize;
{
uint8_t* const s = &rb->buf[rb->offset];
const int cmpBytes = LZ4_compress_continue(
ctx->lz4s, (char*)s, (char*)&mb->buf[2], (int)n);
if (cmpBytes <= 0)
return MyLz4Write_ErrorComp;
ringBufForward(&ctx->ringBuf, (int) n);
mb->buf[0] = (uint8_t) (cmpBytes);
mb->buf[1] = (uint8_t) (cmpBytes >> 8);
return (int) MyLz4Write_raw(ctx, mb->buf, 2 + cmpBytes);
}
}
static size_t MyLz4Read_raw(MyLz4Read_ctx_t* ctx, void* buf, size_t n)
{
return ctx->readFunc(ctx->userPtr, buf, n);
}
MyLz4Read_ctx_t* MyLz4Read_create(MyLz4Read_func_t readFunc, void* userPtr)
{
MyLz4Read_ctx_t* ctx = (MyLz4Read_ctx_t*) malloc(sizeof(MyLz4Read_ctx_t));
ctx->lz4sd = LZ4_createStreamDecode();
ctx->flags = 0;
ctx->ringBuf.bytes = 0;
ctx->ringBuf.buf = NULL;
ctx->ringBuf.msgMax = 0;
ctx->ringBuf.offset = 0;
ctx->msgBuf.bytes = 0;
ctx->msgBuf.buf = NULL;
ctx->readFunc = readFunc;
ctx->userPtr = userPtr;
MyLz4Read_raw(ctx, &ctx->ringBuf.msgMax, sizeof(ctx->ringBuf.msgMax));
MyLz4Read_raw(ctx, &ctx->ringBuf.bytes, sizeof(ctx->ringBuf.bytes));
ctx->ringBuf.buf = malloc(ctx->ringBuf.bytes);
ctx->msgBuf.bytes = 2 + LZ4_compressBound((int) ctx->ringBuf.msgMax);
ctx->msgBuf.buf = malloc(ctx->msgBuf.bytes);
return ctx;
}
void MyLz4Read_release(MyLz4Read_ctx_t* ctx)
{
free(ctx->msgBuf.buf);
free(ctx->ringBuf.buf);
LZ4_freeStreamDecode(ctx->lz4sd);
free(ctx);
}
int MyLz4Read_packet(MyLz4Read_ctx_t* ctx, const void** ppBuf)
{
RingBuf* const rb = &ctx->ringBuf;
MsgBuf* const mb = &ctx->msgBuf;
unsigned int cmpBytes = 0;
if(ctx->flags & Flags_Terminated)
return 0;
if (2 != MyLz4Read_raw(ctx, mb->buf, 2))
return MyLz4Read_ErrorReadSize;
cmpBytes = mb->buf[0] | (mb->buf[1] << 8);
if(cmpBytes == 0) {
ctx->flags |= Flags_Terminated;
return 0;
}
if (2 + cmpBytes > mb->bytes)
return MyLz4Read_ErrorCompSize;
if (cmpBytes != MyLz4Read_raw(ctx, &mb->buf[2], cmpBytes))
return MyLz4Read_ErrorDecomp;
{
uint8_t* const p = &rb->buf[rb->offset];
const int decBytes = LZ4_decompress_safe_continue(
ctx->lz4sd, (char*) &mb->buf[2], (char*) p,
cmpBytes, (int) rb->msgMax);
if (decBytes <= 0)
return decBytes;
if (decBytes > rb->msgMax)
return MyLz4Read_ErrorDecompSize;
ringBufForward(&ctx->ringBuf, decBytes);
*ppBuf = p;
return decBytes;
}
}
////////////////////////////////////////
// Test application
////////////////////////////////////////
#include <stdio.h>
#include <stdlib.h>
#include <malloc.h>
#include <string.h>
static size_t testFwrite(void* userPtr, const void* buf, size_t n)
{
return fwrite(buf, 1, n, (FILE*) userPtr);
}
static size_t testFread(void* userPtr, void* buf, size_t n)
{
return fread(buf, 1, n, (FILE*) userPtr);
}
static void testCompress(FILE* outFp, FILE* inpFp)
{
MyLz4Write_ctx_t* ctx = NULL;
{
const int maxMsgBytes = 1024;
const int ringBufBytes = 1024 * 256 + maxMsgBytes;
// MyLz4Write_create() will call testFwrite() to write header information.
ctx = MyLz4Write_create(testFwrite, outFp, ringBufBytes, maxMsgBytes);
}
for ( ; ; )
{
// MyLz4Write_begin() allocates internal memory for writing.
// And returns pointer and maximum size to write.
char* writePtr = NULL;
const int maxBytes = MyLz4Write_begin(ctx, (void**) &writePtr);
if (!fgets(writePtr, maxBytes, inpFp))
break;
// MyLz4Write_end() will call testFwrite() to write compressed stream.
// It also needs actual length of data.
// If the return value is negative, an error occured.
if (MyLz4Write_end(ctx, strlen(writePtr)) < 0) {
break;
}
}
// MyLz4Write_release() will call testFwrite() to write the terminal marker.
MyLz4Write_release(ctx);
}
static void testDecompress(FILE* outFp, FILE* inpFp)
{
// MyLz4Read_create() will call testFread() to read header information.
MyLz4Read_ctx_t* const ctx = MyLz4Read_create(testFread, inpFp);
for ( ; ; )
{
// MyLz4Read_packet() will call testFread() to read compressed stream.
// And return decompressed data pointer and its size.
// If the return value is 0, it read terminal marker.
// If the return value is negative, an error occured.
const char* readPtr = NULL;
const int readBytes = MyLz4Read_packet(ctx, (const void**) &readPtr);
if (readBytes <= 0)
break;
fwrite(readPtr, 1, readBytes, outFp);
}
MyLz4Read_release(ctx);
}
int main(int argc, char* argv[])
{
if (argc < 2)
{
printf("Please specify input filename\n");
return 0;
}
{
const char* inpFilename = argv[1];
char lz4Filename[256];
char decFilename[256];
sprintf(lz4Filename, "%s.lz4-stream", inpFilename);
sprintf(decFilename, "%s.dec-stream", inpFilename);
// compress
{
FILE* inpFp = fopen(inpFilename, "rb");
FILE* outFp = fopen(lz4Filename, "wb");
testCompress(outFp, inpFp);
fclose(outFp);
fclose(inpFp);
}
// decompress
{
FILE* inpFp = fopen(lz4Filename, "rb");
FILE* outFp = fopen(decFilename, "wb");
testDecompress(outFp, inpFp);
fclose(outFp);
fclose(inpFp);
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment