Last active
January 6, 2017 08:05
-
-
Save garyschulte/b6f340a3e1a643c8a52a24f429b7b3b5 to your computer and use it in GitHub Desktop.
tenant date partitioned s3 writer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.jivesoftware.cloudalytics.util; | |
import com.amazonaws.services.s3.AmazonS3; | |
import com.amazonaws.services.s3.model.CreateBucketRequest; | |
import com.amazonaws.services.s3.model.ObjectMetadata; | |
import com.amazonaws.services.s3.model.PutObjectRequest; | |
import com.amazonaws.services.s3.model.PutObjectResult; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.elasticsearch.action.search.SearchResponse; | |
import org.elasticsearch.action.search.SearchType; | |
import org.elasticsearch.client.transport.TransportClient; | |
import org.elasticsearch.common.unit.TimeValue; | |
import org.elasticsearch.index.query.BoolQueryBuilder; | |
import org.elasticsearch.search.SearchHit; | |
import java.io.ByteArrayInputStream; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.time.Instant; | |
import java.time.ZoneOffset; | |
import java.time.ZonedDateTime; | |
import java.time.temporal.ChronoUnit; | |
import java.util.UUID; | |
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery; | |
import static org.elasticsearch.index.query.QueryBuilders.termQuery; | |
/** | |
* @since 1/4/17. | |
* | |
* class to partition by tenant all the events for a particular DAY in S3 | |
*/ | |
public class TenantDatePartitionWriter { | |
private static final Log LOG = LogFactory.getLog(TenantDatePartitionWriter.class); | |
public static final String RECORD_DELIMITER = "\n"; | |
public static final Integer TARGET_FILE_SIZE = 5 * 1024 * 1024; | |
// 64kb should easily cover current deflate buffer contents, a single compressed event doc and a gzip trailer: | |
public static final Integer FUDGE_FACTOR = 65535; | |
private final TransportClient esClient; | |
private final AmazonS3 s3Client; | |
private final ObjectMapper mapper; | |
private final String s3BucketName; | |
//TODO: possibly inject | |
public TenantDatePartitionWriter(TransportClient esClient, AmazonS3 s3Client, ObjectMapper mapper, String s3BucketName) { | |
this.esClient = esClient; | |
this.s3Client = s3Client; | |
this.mapper = mapper; | |
this.s3BucketName = s3BucketName; | |
if (!s3Client.doesBucketExist(s3BucketName)) { | |
s3Client.createBucket(new CreateBucketRequest(s3BucketName)); | |
} | |
} | |
public void dumpQueryToS3(String tenantId, String indexName, Long dayMillis) { | |
ZonedDateTime zdt = ZonedDateTime.ofInstant(Instant.ofEpochMilli(dayMillis), ZoneOffset.UTC); | |
zdt.truncatedTo(ChronoUnit.DAYS); | |
long fromMillis = zdt.toEpochSecond() * 1000; | |
long toMillis = fromMillis + 86400000; | |
S3PartitionedWriter writer = getWriter(tenantId, zdt); | |
BoolQueryBuilder bqb = new BoolQueryBuilder() | |
.must(termQuery("context.service.tenantId", tenantId)) | |
.must(rangeQuery("timestamp").gte(fromMillis).lt(toMillis)); | |
SearchResponse scrollResp = esClient.prepareSearch(indexName) | |
.setSearchType(SearchType.SCAN) | |
.setScroll(new TimeValue(60000)) | |
.setQuery(bqb) | |
.setSize(1000).execute().actionGet(); | |
//Scroll until no hits are returned | |
while(true) { | |
for (SearchHit hit : scrollResp.getHits().getHits()) { | |
try { | |
writer.write(hit.getSourceAsString() + RECORD_DELIMITER); | |
} catch (IOException e) { | |
LOG.error("failed to write to s3 stream, hit source: " + hit.getSourceAsString(), e); | |
} | |
} | |
scrollResp = esClient.prepareSearchScroll(scrollResp.getScrollId()).setScroll(new TimeValue(600000)).execute().actionGet(); | |
//Break condition: No hits are returned | |
if (scrollResp.getHits().getHits().length == 0) { | |
break; | |
} | |
} | |
writer.flush(); | |
} | |
public S3PartitionedWriter getWriter(String tenantId, ZonedDateTime zdt) { | |
S3PartitionedWriter tenantStream = new S3PartitionedWriter(s3BucketName, tenantId, zdt); | |
return tenantStream; | |
} | |
public class S3PartitionedWriter { | |
GzipCompressorOutputStream gzos = null; | |
BadIdeaOutputStream baos = null; | |
final String tenantId; | |
final String bucketName; | |
final ZonedDateTime dt; | |
public S3PartitionedWriter(String bucketName, String tenantId, ZonedDateTime dt) { | |
this.bucketName = bucketName; | |
this.tenantId = tenantId; | |
this.dt = dt; | |
constructStreams(); | |
} | |
public void flush() { | |
try { | |
gzos.finish(); | |
gzos.flush(); | |
sendFile(baos); | |
IOUtils.closeQuietly(gzos); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
throw new RuntimeException(e); | |
} | |
} | |
private void constructStreams() { | |
try { | |
baos = new BadIdeaOutputStream(TARGET_FILE_SIZE + FUDGE_FACTOR); | |
gzos = new GzipCompressorOutputStream(baos); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
throw new RuntimeException(e); | |
} | |
} | |
private void sendFile(BadIdeaOutputStream os) { | |
ByteArrayInputStream bais = new ByteArrayInputStream(os.toByteArray(), 0, os.size()); | |
ObjectMetadata om = new ObjectMetadata(); | |
om.setContentLength(os.size()); | |
om.setContentType("application/x-gzip"); | |
String key = genPartitionedFileName(tenantId, dt); | |
PutObjectRequest putReq = new PutObjectRequest(bucketName, key, bais, om); | |
LOG.info("Starting putobject for tenant " + tenantId); | |
PutObjectResult s3TargetResult = s3Client.putObject(putReq); | |
LOG.info("finished putting object " + s3TargetResult + " for tenant " + tenantId); | |
} | |
public void write(String event) throws IOException { | |
gzos.write(event.getBytes()); | |
//break when we reach our target compressed file size | |
if (baos.size() >= TARGET_FILE_SIZE) { | |
flush(); | |
constructStreams(); | |
} | |
} | |
protected String genPartitionedFileName(String tenantId, ZonedDateTime dt) { | |
String name = null; | |
if (dt != null) { | |
String suffix = String.format("year=%04d/month=%02d/day=%02d/%s.gz", | |
dt.getYear(), | |
dt.getMonthValue(), | |
dt.getDayOfMonth(), | |
UUID.randomUUID()); | |
return tenantId + "/" + suffix; | |
} | |
return name; | |
} | |
} | |
class BadIdeaOutputStream extends ByteArrayOutputStream { | |
public BadIdeaOutputStream(int size) { | |
super(size); | |
} | |
// heap saving bad idea | |
@Override | |
public byte[] toByteArray() { | |
return buf; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment