Skip to content

Instantly share code, notes, and snippets.

@garyschulte
Last active January 6, 2017 08:05
Show Gist options
  • Save garyschulte/b6f340a3e1a643c8a52a24f429b7b3b5 to your computer and use it in GitHub Desktop.
Save garyschulte/b6f340a3e1a643c8a52a24f429b7b3b5 to your computer and use it in GitHub Desktop.
tenant date partitioned s3 writer
package com.jivesoftware.cloudalytics.util;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CreateBucketRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.util.UUID;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
/**
* @since 1/4/17.
*
* class to partition by tenant all the events for a particular DAY in S3
*/
public class TenantDatePartitionWriter {
private static final Log LOG = LogFactory.getLog(TenantDatePartitionWriter.class);
public static final String RECORD_DELIMITER = "\n";
public static final Integer TARGET_FILE_SIZE = 5 * 1024 * 1024;
// 64kb should easily cover current deflate buffer contents, a single compressed event doc and a gzip trailer:
public static final Integer FUDGE_FACTOR = 65535;
private final TransportClient esClient;
private final AmazonS3 s3Client;
private final ObjectMapper mapper;
private final String s3BucketName;
//TODO: possibly inject
public TenantDatePartitionWriter(TransportClient esClient, AmazonS3 s3Client, ObjectMapper mapper, String s3BucketName) {
this.esClient = esClient;
this.s3Client = s3Client;
this.mapper = mapper;
this.s3BucketName = s3BucketName;
if (!s3Client.doesBucketExist(s3BucketName)) {
s3Client.createBucket(new CreateBucketRequest(s3BucketName));
}
}
public void dumpQueryToS3(String tenantId, String indexName, Long dayMillis) {
ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dayMillis), ZoneOffset.UTC);
zdt.truncatedTo(ChronoUnit.DAYS);
long fromMillis = zdt.toEpochSecond() * 1000;
long toMillis = fromMillis + 86400000;
S3PartitionedWriter writer = getWriter(tenantId, zdt);
BoolQueryBuilder bqb = new BoolQueryBuilder()
.must(termQuery("context.service.tenantId", tenantId))
.must(rangeQuery("timestamp").gte(fromMillis).lt(toMillis));
SearchResponse scrollResp = esClient.prepareSearch(indexName)
.setSearchType(SearchType.SCAN)
.setScroll(new TimeValue(60000))
.setQuery(bqb)
.setSize(1000).execute().actionGet();
//Scroll until no hits are returned
while(true) {
for (SearchHit hit : scrollResp.getHits().getHits()) {
try {
writer.write(hit.getSourceAsString() + RECORD_DELIMITER);
} catch (IOException e) {
LOG.error("failed to write to s3 stream, hit source: " + hit.getSourceAsString(), e);
}
}
scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet();
//Break condition: No hits are returned
if (scrollResp.getHits().getHits().length == 0) {
break;
}
}
writer.flush();
}
public S3PartitionedWriter getWriter(String tenantId, ZonedDateTime zdt) {
S3PartitionedWriter tenantStream = new S3PartitionedWriter(s3BucketName, tenantId, zdt);
return tenantStream;
}
public class S3PartitionedWriter {
GzipCompressorOutputStream gzos = null;
BadIdeaOutputStream baos = null;
final String tenantId;
final String bucketName;
final ZonedDateTime dt;
public S3PartitionedWriter(String bucketName, String tenantId, ZonedDateTime dt) {
this.bucketName = bucketName;
this.tenantId = tenantId;
this.dt = dt;
constructStreams();
}
public void flush() {
try {
gzos.finish();
gzos.flush();
sendFile(baos);
IOUtils.closeQuietly(gzos);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private void constructStreams() {
try {
baos = new BadIdeaOutputStream(TARGET_FILE_SIZE + FUDGE_FACTOR);
gzos = new GzipCompressorOutputStream(baos);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
private void sendFile(BadIdeaOutputStream os) {
ByteArrayInputStream bais = new ByteArrayInputStream(os.toByteArray(), 0, os.size());
ObjectMetadata om = new ObjectMetadata();
om.setContentLength(os.size());
om.setContentType("application/x-gzip");
String key = genPartitionedFileName(tenantId, dt);
PutObjectRequest putReq = new PutObjectRequest(bucketName, key, bais, om);
LOG.info("Starting putobject for tenant " + tenantId);
PutObjectResult s3TargetResult = s3Client.putObject(putReq);
LOG.info("finished putting object " + s3TargetResult + " for tenant " + tenantId);
}
public void write(String event) throws IOException {
gzos.write(event.getBytes());
//break when we reach our target compressed file size
if (baos.size() >= TARGET_FILE_SIZE) {
flush();
constructStreams();
}
}
protected String genPartitionedFileName(String tenantId, ZonedDateTime dt) {
String name = null;
if (dt != null) {
String suffix = String.format("year=%04d/month=%02d/day=%02d/%s.gz",
dt.getYear(),
dt.getMonthValue(),
dt.getDayOfMonth(),
UUID.randomUUID());
return tenantId + "/" + suffix;
}
return name;
}
}
class BadIdeaOutputStream extends ByteArrayOutputStream {
public BadIdeaOutputStream(int size) {
super(size);
}
// heap saving bad idea
@Override
public byte[] toByteArray() {
return buf;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment