Skip to content

Instantly share code, notes, and snippets.

@deanproctor
Created March 2, 2017 04:00
Show Gist options
  • Save deanproctor/4b276e95b3883d071b2738349d4dfad1 to your computer and use it in GitHub Desktop.
Save deanproctor/4b276e95b3883d071b2738349d4dfad1 to your computer and use it in GitHub Desktop.
GzipTranscoder
/*
* Copyright (c) 2016 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.couchbase.client.java.document;
import com.couchbase.client.core.message.kv.MutationToken;
/**
* This document is fully compatible with Java SDK 1.* stored documents.
*
* It is not compatible with other SDKs. It should be used to interact with legacy documents and code, but it is
* recommended to switch to the unifying document types (Json* and String) if possible to guarantee better
* interoperability in the future.
*
* @author Michael Nitschinger
* @since 2.0
*/
public class GzipDocument extends AbstractDocument<Object> {
/**
* Creates a {@link GzipDocument} which the document id.
*
* @param id the per-bucket unique document id.
* @return a {@link GzipDocument}.
*/
public static GzipDocument create(String id) {
return new GzipDocument(id, 0, null, 0, null);
}
/**
* Creates a {@link GzipDocument} which the document id and JSON content.
*
* @param id the per-bucket unique document id.
* @param content the content of the document.
* @return a {@link GzipDocument}.
*/
public static GzipDocument create(String id, Object content) {
return new GzipDocument(id, 0, content, 0, null);
}
/**
* Creates a {@link GzipDocument} which the document id, JSON content and the CAS value.
*
* @param id the per-bucket unique document id.
* @param content the content of the document.
* @param cas the CAS (compare and swap) value for optimistic concurrency.
* @return a {@link GzipDocument}.
*/
public static GzipDocument create(String id, Object content, long cas) {
return new GzipDocument(id, 0, content, cas, null);
}
/**
* Creates a {@link GzipDocument} which the document id, JSON content and the expiration time.
*
* @param id the per-bucket unique document id.
* @param expiry the expiration time of the document.
* @param content the content of the document.
* @return a {@link GzipDocument}.
*/
public static GzipDocument create(String id, int expiry, Object content) {
return new GzipDocument(id, expiry, content, 0, null);
}
/**
* Creates a {@link GzipDocument} which the document id, JSON content, CAS value, expiration time and status code.
*
* This factory method is normally only called within the client library when a response is analyzed and a document
* is returned which is enriched with the status code. It does not make sense to pre populate the status field from
* the user level code.
*
* @param id the per-bucket unique document id.
* @param expiry the expiration time of the document.
* @param content the content of the document.
* @param cas the CAS (compare and swap) value for optimistic concurrency.
* @return a {@link GzipDocument}.
*/
public static GzipDocument create(String id, int expiry, Object content, long cas) {
return new GzipDocument(id, expiry, content, cas, null);
}
/**
* Creates a {@link GzipDocument} which the document id, JSON content, CAS value, expiration time and status code.
*
* This factory method is normally only called within the client library when a response is analyzed and a document
* is returned which is enriched with the status code. It does not make sense to pre populate the status field from
* the user level code.
*
* @param id the per-bucket unique document id.
* @param expiry the expiration time of the document.
* @param content the content of the document.
* @param cas the CAS (compare and swap) value for optimistic concurrency.
* @return a {@link GzipDocument}.
*/
public static GzipDocument create(String id, int expiry, Object content, long cas, MutationToken mutationToken) {
return new GzipDocument(id, expiry, content, cas, mutationToken);
}
/**
* Creates a copy from a different {@link GzipDocument}, but changes the document ID.
*
* @param doc the original {@link GzipDocument} to copy.
* @param id the per-bucket unique document id.
* @return a copied {@link GzipDocument} with the changed properties.
*/
public static GzipDocument from(GzipDocument doc, String id) {
return GzipDocument.create(id, doc.expiry(), doc.content(), doc.cas(), doc.mutationToken());
}
/**
* Creates a copy from a different {@link GzipDocument}, but changes the content.
*
* @param doc the original {@link GzipDocument} to copy.
* @param content the content of the document.
* @return a copied {@link GzipDocument} with the changed properties.
*/
public static GzipDocument from(GzipDocument doc, Object content) {
return GzipDocument.create(doc.id(), doc.expiry(), content, doc.cas(), doc.mutationToken());
}
/**
* Creates a copy from a different {@link GzipDocument}, but changes the document ID and content.
*
* @param doc the original {@link GzipDocument} to copy.
* @param id the per-bucket unique document id.
* @param content the content of the document.
* @return a copied {@link GzipDocument} with the changed properties.
*/
public static GzipDocument from(GzipDocument doc, String id, Object content) {
return GzipDocument.create(id, doc.expiry(), content, doc.cas(), doc.mutationToken());
}
/**
* Creates a copy from a different {@link GzipDocument}, but changes the CAS value.
*
* @param doc the original {@link GzipDocument} to copy.
* @param cas the CAS (compare and swap) value for optimistic concurrency.
* @return a copied {@link GzipDocument} with the changed properties.
*/
public static GzipDocument from(GzipDocument doc, long cas) {
return GzipDocument.create(doc.id(), doc.expiry(), doc.content(), cas, doc.mutationToken());
}
private GzipDocument(String id, int expiry, Object content, long cas, MutationToken mutationToken) {
super(id, expiry, content, cas, mutationToken);
}
}
/*
* Copyright (c) 2016 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.couchbase.client.java.transcoder;
import com.couchbase.client.core.lang.Tuple;
import com.couchbase.client.core.lang.Tuple2;
import com.couchbase.client.core.logging.CouchbaseLogger;
import com.couchbase.client.core.logging.CouchbaseLoggerFactory;
import com.couchbase.client.core.message.ResponseStatus;
import com.couchbase.client.core.message.kv.MutationToken;
import com.couchbase.client.deps.io.netty.buffer.ByteBuf;
import com.couchbase.client.deps.io.netty.buffer.Unpooled;
import com.couchbase.client.deps.io.netty.util.CharsetUtil;
import com.couchbase.client.java.document.GzipDocument;
import java.io.*;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
public class GzipTranscoder extends AbstractTranscoder<GzipDocument, Object> {
/**
* The logger used.
*/
private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(GzipTranscoder.class);
public static final int DEFAULT_COMPRESSION_THRESHOLD = 1;
// General flags
static final int SERIALIZED = 1;
static final int COMPRESSED = 2;
private final int compressionThreshold;
public GzipTranscoder() {
this(DEFAULT_COMPRESSION_THRESHOLD);
}
public GzipTranscoder(int compressionThreshold) {
this.compressionThreshold = compressionThreshold;
}
@Override
public Class<GzipDocument> documentType() {
return GzipDocument.class;
}
@Override
protected GzipDocument doDecode(String id, ByteBuf content, long cas, int expiry, int flags, ResponseStatus status)
throws Exception {
byte[] data = new byte[content.readableBytes()];
content.readBytes(data);
Object decoded = null;
if ((flags & COMPRESSED) != 0) {
data = decompress(data);
}
if ((flags & SERIALIZED) != 0 && data != null) {
decoded = deserialize(data);
} else {
decoded = new String(data, CharsetUtil.UTF_8);
}
return newDocument(id, expiry, decoded, cas);
}
@Override
public GzipDocument newDocument(String id, int expiry, Object content, long cas) {
return GzipDocument.create(id, expiry, content, cas);
}
@Override
public GzipDocument newDocument(String id, int expiry, Object content, long cas,
MutationToken mutationToken) {
return GzipDocument.create(id, expiry, content, cas, mutationToken);
}
@Override
protected Tuple2<ByteBuf, Integer> doEncode(GzipDocument document)
throws Exception {
int flags = 0;
Object content = document.content();
ByteBuf encoded = Unpooled.buffer();
flags |= SERIALIZED;
encoded.writeBytes(serialize(content));
if (encoded.readableBytes() >= compressionThreshold) {
byte[] compressed = compress(encoded.copy().array());
if (compressed.length < encoded.array().length) {
encoded.clear().writeBytes(compressed);
flags |= COMPRESSED;
}
}
return Tuple.create(encoded, flags);
}
private static byte[] serialize(final Object content) {
if (content == null) {
throw new NullPointerException("Can't serialize null");
}
byte[] rv=null;
ByteArrayOutputStream bos = null;
ObjectOutputStream os = null;
try {
bos = new ByteArrayOutputStream();
os = new ObjectOutputStream(bos);
os.writeObject(content);
os.close();
bos.close();
rv = bos.toByteArray();
} catch (IOException e) {
throw new IllegalArgumentException("Non-serializable object", e);
} finally {
try {
if (os != null) {
os.close();
}
} catch(Exception ex) {
LOGGER.error("Could not close output stream.", ex);
}
try {
if (bos != null) {
bos.close();
}
} catch(Exception ex) {
LOGGER.error("Could not close byte output stream.", ex);
}
}
return rv;
}
protected Object deserialize(byte[] in) {
Object rv=null;
ByteArrayInputStream bis = null;
ObjectInputStream is = null;
try {
if(in != null) {
bis=new ByteArrayInputStream(in);
is=new ObjectInputStream(bis);
rv=is.readObject();
is.close();
bis.close();
}
} catch (IOException e) {
LOGGER.warn("Caught IOException decoding %d bytes of data",
in == null ? 0 : in.length, e);
} catch (ClassNotFoundException e) {
LOGGER.warn("Caught CNFE decoding %d bytes of data",
in == null ? 0 : in.length, e);
} finally {
try {
if (is != null) {
is.close();
}
} catch(Exception ex) {
LOGGER.error("Could not close input stream.", ex);
}
try {
if (bis != null) {
bis.close();
}
} catch(Exception ex) {
LOGGER.error("Could not close byte input stream.", ex);
}
}
return rv;
}
protected byte[] compress(byte[] in) {
if (in == null) {
throw new NullPointerException("Can't compress null");
}
ByteArrayOutputStream bos = new ByteArrayOutputStream();
GZIPOutputStream gz = null;
try {
gz = new GZIPOutputStream(bos);
gz.write(in);
} catch (IOException e) {
throw new RuntimeException("IO exception compressing data", e);
} finally {
try {
if (gz != null) {
gz.close();
}
} catch(Exception ex) {
LOGGER.error("Could not close gzip output stream.", ex);
}
try {
bos.close();
} catch(Exception ex) {
LOGGER.error("Could not close byte output stream.", ex);
}
}
return bos.toByteArray();
}
protected byte[] decompress(byte[] in) {
ByteArrayOutputStream bos = null;
if(in != null) {
ByteArrayInputStream bis = new ByteArrayInputStream(in);
bos = new ByteArrayOutputStream();
GZIPInputStream gis = null;
try {
gis = new GZIPInputStream(bis);
byte[] buf = new byte[8192];
int r = -1;
while ((r = gis.read(buf)) > 0) {
bos.write(buf, 0, r);
}
} catch (IOException e) {
LOGGER.error("Could not decompress data.", e);
bos = null;
} finally {
try {
if (bos != null) {
bos.close();
}
} catch(Exception ex) {
LOGGER.error("Could not close byte output stream.", ex);
}
try {
if (gis != null) {
gis.close();
}
} catch(Exception ex) {
LOGGER.error("Could not close gzip input stream.", ex);
}
try {
bis.close();
} catch(Exception ex) {
LOGGER.error("Could not close byte input stream.", ex);
}
}
}
return bos == null ? null : bos.toByteArray();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment