Created
March 2, 2017 04:00
-
-
Save deanproctor/4b276e95b3883d071b2738349d4dfad1 to your computer and use it in GitHub Desktop.
GzipTranscoder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2016 Couchbase, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.couchbase.client.java.document; | |
import com.couchbase.client.core.message.kv.MutationToken; | |
/** | |
* This document is fully compatible with Java SDK 1.* stored documents. | |
* | |
* It is not compatible with other SDKs. It should be used to interact with legacy documents and code, but it is | |
* recommended to switch to the unifying document types (Json* and String) if possible to guarantee better | |
* interoperability in the future. | |
* | |
* @author Michael Nitschinger | |
* @since 2.0 | |
*/ | |
public class GzipDocument extends AbstractDocument<Object> { | |
/** | |
* Creates a {@link GzipDocument} which the document id. | |
* | |
* @param id the per-bucket unique document id. | |
* @return a {@link GzipDocument}. | |
*/ | |
public static GzipDocument create(String id) { | |
return new GzipDocument(id, 0, null, 0, null); | |
} | |
/** | |
* Creates a {@link GzipDocument} which the document id and JSON content. | |
* | |
* @param id the per-bucket unique document id. | |
* @param content the content of the document. | |
* @return a {@link GzipDocument}. | |
*/ | |
public static GzipDocument create(String id, Object content) { | |
return new GzipDocument(id, 0, content, 0, null); | |
} | |
/** | |
* Creates a {@link GzipDocument} which the document id, JSON content and the CAS value. | |
* | |
* @param id the per-bucket unique document id. | |
* @param content the content of the document. | |
* @param cas the CAS (compare and swap) value for optimistic concurrency. | |
* @return a {@link GzipDocument}. | |
*/ | |
public static GzipDocument create(String id, Object content, long cas) { | |
return new GzipDocument(id, 0, content, cas, null); | |
} | |
/** | |
* Creates a {@link GzipDocument} which the document id, JSON content and the expiration time. | |
* | |
* @param id the per-bucket unique document id. | |
* @param expiry the expiration time of the document. | |
* @param content the content of the document. | |
* @return a {@link GzipDocument}. | |
*/ | |
public static GzipDocument create(String id, int expiry, Object content) { | |
return new GzipDocument(id, expiry, content, 0, null); | |
} | |
/** | |
* Creates a {@link GzipDocument} which the document id, JSON content, CAS value, expiration time and status code. | |
* | |
* This factory method is normally only called within the client library when a response is analyzed and a document | |
* is returned which is enriched with the status code. It does not make sense to pre populate the status field from | |
* the user level code. | |
* | |
* @param id the per-bucket unique document id. | |
* @param expiry the expiration time of the document. | |
* @param content the content of the document. | |
* @param cas the CAS (compare and swap) value for optimistic concurrency. | |
* @return a {@link GzipDocument}. | |
*/ | |
public static GzipDocument create(String id, int expiry, Object content, long cas) { | |
return new GzipDocument(id, expiry, content, cas, null); | |
} | |
/** | |
* Creates a {@link GzipDocument} which the document id, JSON content, CAS value, expiration time and status code. | |
* | |
* This factory method is normally only called within the client library when a response is analyzed and a document | |
* is returned which is enriched with the status code. It does not make sense to pre populate the status field from | |
* the user level code. | |
* | |
* @param id the per-bucket unique document id. | |
* @param expiry the expiration time of the document. | |
* @param content the content of the document. | |
* @param cas the CAS (compare and swap) value for optimistic concurrency. | |
* @return a {@link GzipDocument}. | |
*/ | |
public static GzipDocument create(String id, int expiry, Object content, long cas, MutationToken mutationToken) { | |
return new GzipDocument(id, expiry, content, cas, mutationToken); | |
} | |
/** | |
* Creates a copy from a different {@link GzipDocument}, but changes the document ID. | |
* | |
* @param doc the original {@link GzipDocument} to copy. | |
* @param id the per-bucket unique document id. | |
* @return a copied {@link GzipDocument} with the changed properties. | |
*/ | |
public static GzipDocument from(GzipDocument doc, String id) { | |
return GzipDocument.create(id, doc.expiry(), doc.content(), doc.cas(), doc.mutationToken()); | |
} | |
/** | |
* Creates a copy from a different {@link GzipDocument}, but changes the content. | |
* | |
* @param doc the original {@link GzipDocument} to copy. | |
* @param content the content of the document. | |
* @return a copied {@link GzipDocument} with the changed properties. | |
*/ | |
public static GzipDocument from(GzipDocument doc, Object content) { | |
return GzipDocument.create(doc.id(), doc.expiry(), content, doc.cas(), doc.mutationToken()); | |
} | |
/** | |
* Creates a copy from a different {@link GzipDocument}, but changes the document ID and content. | |
* | |
* @param doc the original {@link GzipDocument} to copy. | |
* @param id the per-bucket unique document id. | |
* @param content the content of the document. | |
* @return a copied {@link GzipDocument} with the changed properties. | |
*/ | |
public static GzipDocument from(GzipDocument doc, String id, Object content) { | |
return GzipDocument.create(id, doc.expiry(), content, doc.cas(), doc.mutationToken()); | |
} | |
/** | |
* Creates a copy from a different {@link GzipDocument}, but changes the CAS value. | |
* | |
* @param doc the original {@link GzipDocument} to copy. | |
* @param cas the CAS (compare and swap) value for optimistic concurrency. | |
* @return a copied {@link GzipDocument} with the changed properties. | |
*/ | |
public static GzipDocument from(GzipDocument doc, long cas) { | |
return GzipDocument.create(doc.id(), doc.expiry(), doc.content(), cas, doc.mutationToken()); | |
} | |
private GzipDocument(String id, int expiry, Object content, long cas, MutationToken mutationToken) { | |
super(id, expiry, content, cas, mutationToken); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2016 Couchbase, Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.couchbase.client.java.transcoder; | |
import com.couchbase.client.core.lang.Tuple; | |
import com.couchbase.client.core.lang.Tuple2; | |
import com.couchbase.client.core.logging.CouchbaseLogger; | |
import com.couchbase.client.core.logging.CouchbaseLoggerFactory; | |
import com.couchbase.client.core.message.ResponseStatus; | |
import com.couchbase.client.core.message.kv.MutationToken; | |
import com.couchbase.client.deps.io.netty.buffer.ByteBuf; | |
import com.couchbase.client.deps.io.netty.buffer.Unpooled; | |
import com.couchbase.client.deps.io.netty.util.CharsetUtil; | |
import com.couchbase.client.java.document.GzipDocument; | |
import java.io.*; | |
import java.util.zip.GZIPInputStream; | |
import java.util.zip.GZIPOutputStream; | |
public class GzipTranscoder extends AbstractTranscoder<GzipDocument, Object> { | |
/** | |
* The logger used. | |
*/ | |
private static final CouchbaseLogger LOGGER = CouchbaseLoggerFactory.getInstance(GzipTranscoder.class); | |
public static final int DEFAULT_COMPRESSION_THRESHOLD = 1; | |
// General flags | |
static final int SERIALIZED = 1; | |
static final int COMPRESSED = 2; | |
private final int compressionThreshold; | |
public GzipTranscoder() { | |
this(DEFAULT_COMPRESSION_THRESHOLD); | |
} | |
public GzipTranscoder(int compressionThreshold) { | |
this.compressionThreshold = compressionThreshold; | |
} | |
@Override | |
public Class<GzipDocument> documentType() { | |
return GzipDocument.class; | |
} | |
@Override | |
protected GzipDocument doDecode(String id, ByteBuf content, long cas, int expiry, int flags, ResponseStatus status) | |
throws Exception { | |
byte[] data = new byte[content.readableBytes()]; | |
content.readBytes(data); | |
Object decoded = null; | |
if ((flags & COMPRESSED) != 0) { | |
data = decompress(data); | |
} | |
if ((flags & SERIALIZED) != 0 && data != null) { | |
decoded = deserialize(data); | |
} else { | |
decoded = new String(data, CharsetUtil.UTF_8); | |
} | |
return newDocument(id, expiry, decoded, cas); | |
} | |
@Override | |
public GzipDocument newDocument(String id, int expiry, Object content, long cas) { | |
return GzipDocument.create(id, expiry, content, cas); | |
} | |
@Override | |
public GzipDocument newDocument(String id, int expiry, Object content, long cas, | |
MutationToken mutationToken) { | |
return GzipDocument.create(id, expiry, content, cas, mutationToken); | |
} | |
@Override | |
protected Tuple2<ByteBuf, Integer> doEncode(GzipDocument document) | |
throws Exception { | |
int flags = 0; | |
Object content = document.content(); | |
ByteBuf encoded = Unpooled.buffer(); | |
flags |= SERIALIZED; | |
encoded.writeBytes(serialize(content)); | |
if (encoded.readableBytes() >= compressionThreshold) { | |
byte[] compressed = compress(encoded.copy().array()); | |
if (compressed.length < encoded.array().length) { | |
encoded.clear().writeBytes(compressed); | |
flags |= COMPRESSED; | |
} | |
} | |
return Tuple.create(encoded, flags); | |
} | |
private static byte[] serialize(final Object content) { | |
if (content == null) { | |
throw new NullPointerException("Can't serialize null"); | |
} | |
byte[] rv=null; | |
ByteArrayOutputStream bos = null; | |
ObjectOutputStream os = null; | |
try { | |
bos = new ByteArrayOutputStream(); | |
os = new ObjectOutputStream(bos); | |
os.writeObject(content); | |
os.close(); | |
bos.close(); | |
rv = bos.toByteArray(); | |
} catch (IOException e) { | |
throw new IllegalArgumentException("Non-serializable object", e); | |
} finally { | |
try { | |
if (os != null) { | |
os.close(); | |
} | |
} catch(Exception ex) { | |
LOGGER.error("Could not close output stream.", ex); | |
} | |
try { | |
if (bos != null) { | |
bos.close(); | |
} | |
} catch(Exception ex) { | |
LOGGER.error("Could not close byte output stream.", ex); | |
} | |
} | |
return rv; | |
} | |
protected Object deserialize(byte[] in) { | |
Object rv=null; | |
ByteArrayInputStream bis = null; | |
ObjectInputStream is = null; | |
try { | |
if(in != null) { | |
bis=new ByteArrayInputStream(in); | |
is=new ObjectInputStream(bis); | |
rv=is.readObject(); | |
is.close(); | |
bis.close(); | |
} | |
} catch (IOException e) { | |
LOGGER.warn("Caught IOException decoding %d bytes of data", | |
in == null ? 0 : in.length, e); | |
} catch (ClassNotFoundException e) { | |
LOGGER.warn("Caught CNFE decoding %d bytes of data", | |
in == null ? 0 : in.length, e); | |
} finally { | |
try { | |
if (is != null) { | |
is.close(); | |
} | |
} catch(Exception ex) { | |
LOGGER.error("Could not close input stream.", ex); | |
} | |
try { | |
if (bis != null) { | |
bis.close(); | |
} | |
} catch(Exception ex) { | |
LOGGER.error("Could not close byte input stream.", ex); | |
} | |
} | |
return rv; | |
} | |
protected byte[] compress(byte[] in) { | |
if (in == null) { | |
throw new NullPointerException("Can't compress null"); | |
} | |
ByteArrayOutputStream bos = new ByteArrayOutputStream(); | |
GZIPOutputStream gz = null; | |
try { | |
gz = new GZIPOutputStream(bos); | |
gz.write(in); | |
} catch (IOException e) { | |
throw new RuntimeException("IO exception compressing data", e); | |
} finally { | |
try { | |
if (gz != null) { | |
gz.close(); | |
} | |
} catch(Exception ex) { | |
LOGGER.error("Could not close gzip output stream.", ex); | |
} | |
try { | |
bos.close(); | |
} catch(Exception ex) { | |
LOGGER.error("Could not close byte output stream.", ex); | |
} | |
} | |
return bos.toByteArray(); | |
} | |
protected byte[] decompress(byte[] in) { | |
ByteArrayOutputStream bos = null; | |
if(in != null) { | |
ByteArrayInputStream bis = new ByteArrayInputStream(in); | |
bos = new ByteArrayOutputStream(); | |
GZIPInputStream gis = null; | |
try { | |
gis = new GZIPInputStream(bis); | |
byte[] buf = new byte[8192]; | |
int r = -1; | |
while ((r = gis.read(buf)) > 0) { | |
bos.write(buf, 0, r); | |
} | |
} catch (IOException e) { | |
LOGGER.error("Could not decompress data.", e); | |
bos = null; | |
} finally { | |
try { | |
if (bos != null) { | |
bos.close(); | |
} | |
} catch(Exception ex) { | |
LOGGER.error("Could not close byte output stream.", ex); | |
} | |
try { | |
if (gis != null) { | |
gis.close(); | |
} | |
} catch(Exception ex) { | |
LOGGER.error("Could not close gzip input stream.", ex); | |
} | |
try { | |
bis.close(); | |
} catch(Exception ex) { | |
LOGGER.error("Could not close byte input stream.", ex); | |
} | |
} | |
} | |
return bos == null ? null : bos.toByteArray(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment