Skip to content

Instantly share code, notes, and snippets.

@IgorBerman
Created April 27, 2016 16:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save IgorBerman/e84f2bf93856274f6043b9e66948148d to your computer and use it in GitHub Desktop.
Save IgorBerman/e84f2bf93856274f6043b9e66948148d to your computer and use it in GitHub Desktop.
Avro writer for flink rolling sink
package org.apache.flink.streaming.connectors.fs.avro;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileConstants;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.streaming.connectors.fs.DateTimeBucketer;
import org.apache.flink.streaming.connectors.fs.RollingSink;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* Implementation of AvroKeyValue writer that can be used in Sink.
<pre>
Usage:
{@code
RollingSink<Tuple2<Long , Long>> sink = new RollingSink<Tuple2<Long , Long>>("/tmp/path");
sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd/HH/mm/"));
sink.setPendingSuffix(".avro");
Map<String,String> properties = new HashMap<>();
Schema longSchema = Schema.create(Type.LONG);
String keySchema = longSchema.toString();
String valueSchema = longSchema.toString();
properties.put(AvroSinkWriter.CONF_OUTPUT_KEY_SCHEMA, keySchema);
properties.put(AvroSinkWriter.CONF_OUTPUT_VALUE_SCHEMA, valueSchema);
properties.put(AvroSinkWriter.CONF_COMPRESS, Boolean.toString(true));
properties.put(AvroSinkWriter.CONF_COMPRESS_CODEC, DataFileConstants.SNAPPY_CODEC);
sink.setWriter(new AvroSinkWriter<Long , Long>(properties));
sink.setBatchSize(1024 * 1024 * 64); // this is 64 MB,
}
</pre>
to test with s3:
<pre>
{@code
create core-site.xml(I haven't other way to test locally)
<configuration>
<property>
<name>fs.s3.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
<property>
<name>fs.s3a.access.key</name>
<value>xxx</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>yyy</value>
</property>
<property>
<!-- probably with hdfs installation it won't be needed -->
<name>fs.s3a.buffer.dir</name>
<value>/tmp</value>
</property>
</configuration>
and add following dependencies(not sure what is best option here):
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>2.7.0</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
}
</pre>
*/
public class AvroSinkWriter<K, V> implements Writer<Tuple2<K, V>>, InputTypeConfigurable {
private static final long serialVersionUID = 1L;
public static final String CONF_OUTPUT_KEY_SCHEMA = "avro.schema.output.key";
public static final String CONF_OUTPUT_VALUE_SCHEMA = "avro.schema.output.value";
public static final String CONF_COMPRESS = FileOutputFormat.COMPRESS;
public static final String CONF_COMPRESS_CODEC = FileOutputFormat.COMPRESS_CODEC;
public static final String CONF_DEFLATE_LEVEL = "avro.deflate.level";
public static final String CONF_XZ_LEVEL = "avro.xz.level";
private transient FSDataOutputStream outputStream;
private transient AvroKeyValueWriter<K, V> writer;
private Class<K> keyClass;
private Class<V> valueClass;
private final Map<String, String> properties;
/**
* C'tor for the writer
* <p>
* You can provide different properties that will be used to configure avro key-value writer as simple properties map(see example above)
* @param properties
*/
public AvroSinkWriter(Map<String, String> properties) {
this.properties = properties;
}
private AvroSinkWriter(Class<K> keyClass, Class<V> valueClass, Map<String, String> properties) {
this.properties = properties;
this.keyClass = keyClass;
this.valueClass = valueClass;
}
private boolean getBoolean(Map<String,String> conf, String key, boolean def) {
String value = conf.get(key);
if (value == null) {
return def;
}
return Boolean.parseBoolean(value);
}
private int getInt(Map<String,String> conf, String key, int def) {
String value = conf.get(key);
if (value == null) {
return def;
}
return Integer.parseInt(value);
}
//this derived from AvroOutputFormatBase.getCompressionCodec(..)
private CodecFactory getCompressionCodec(Map<String,String> conf) {
if (getBoolean(conf, CONF_COMPRESS, false)) {
int deflateLevel = getInt(conf, CONF_DEFLATE_LEVEL, CodecFactory.DEFAULT_DEFLATE_LEVEL);
int xzLevel = getInt(conf, CONF_XZ_LEVEL, CodecFactory.DEFAULT_XZ_LEVEL);
String outputCodec = conf.get(CONF_COMPRESS_CODEC);
if (DataFileConstants.DEFLATE_CODEC.equals(outputCodec)) {
return CodecFactory.deflateCodec(deflateLevel);
} else if (DataFileConstants.XZ_CODEC.equals(outputCodec)) {
return CodecFactory.xzCodec(xzLevel);
} else {
return CodecFactory.fromString(outputCodec);
}
}
return CodecFactory.nullCodec();
}
@Override
public void open(FSDataOutputStream outStream) throws IOException {
if (outputStream != null) {
throw new IllegalStateException("AvroSinkWriter has already been opened.");
}
if (keyClass == null) {
throw new IllegalStateException("Key Class has not been initialized.");
}
if (valueClass == null) {
throw new IllegalStateException("Value Class has not been initialized.");
}
this.outputStream = outStream;
CodecFactory compressionCodec = getCompressionCodec(properties);
String keySchemaString = properties.get(CONF_OUTPUT_KEY_SCHEMA);
if (keySchemaString == null) {
throw new IllegalStateException("No key schema provided, set '" + CONF_OUTPUT_KEY_SCHEMA + "' property");
}
@SuppressWarnings("deprecation")
Schema keySchema = Schema.parse(keySchemaString);
String valueSchemaString = properties.get(CONF_OUTPUT_VALUE_SCHEMA);
if (valueSchemaString == null) {
throw new IllegalStateException("No value schema provided, set '" + CONF_OUTPUT_VALUE_SCHEMA + "' property");
}
@SuppressWarnings("deprecation")
Schema valueSchema = Schema.parse(valueSchemaString);
writer = new AvroKeyValueWriter<K, V>(keySchema, valueSchema, compressionCodec, outputStream);
}
@Override
public void flush() throws IOException {
if (writer != null) {
writer.sync();
}
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
}
writer = null;
outputStream = null;
}
@Override
public void write(Tuple2<K, V> element) throws IOException {
if (outputStream == null) {
throw new IllegalStateException("SequenceFileWriter has not been opened.");
}
writer.write(element.f0, element.f1);
}
@Override
public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
if (!type.isTupleType()) {
throw new IllegalArgumentException("Input TypeInformation is not a tuple type.");
}
TupleTypeInfoBase<?> tupleType = (TupleTypeInfoBase<?>) type;
if (tupleType.getArity() != 2) {
throw new IllegalArgumentException("Input TypeInformation must be a Tuple2 type.");
}
TypeInformation<K> keyType = tupleType.getTypeAt(0);
TypeInformation<V> valueType = tupleType.getTypeAt(1);
this.keyClass = keyType.getTypeClass();
this.valueClass = valueType.getTypeClass();
}
@Override
public Writer<Tuple2<K, V>> duplicate() {
return new AvroSinkWriter<K, V>(keyClass, valueClass, properties);
}
private static final class AvroKeyValueWriter<K,V> {
/** A writer for the Avro container file. */
private final DataFileWriter<GenericRecord> mAvroFileWriter;
/** The writer schema for the generic record entries of the Avro container file. */
private final Schema mKeyValuePairSchema;
/** A reusable Avro generic record for writing key/value pairs to the file. */
private final AvroKeyValue<Object, Object> mOutputRecord;
AvroKeyValueWriter(Schema keySchema, Schema valueSchema, CodecFactory compressionCodec, OutputStream outputStream, int syncInterval) throws IOException {
// Create the generic record schema for the key/value pair.
mKeyValuePairSchema = AvroKeyValue.getSchema(keySchema, valueSchema);
// Create an Avro container file and a writer to it.
DatumWriter<GenericRecord> genericDatumWriter = new GenericDatumWriter<GenericRecord>(mKeyValuePairSchema);
mAvroFileWriter = new DataFileWriter<GenericRecord>(genericDatumWriter);
mAvroFileWriter.setCodec(compressionCodec);
mAvroFileWriter.setSyncInterval(syncInterval);
mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
// Create a reusable output record.
mOutputRecord = new AvroKeyValue<Object, Object>(new GenericData.Record(mKeyValuePairSchema));
}
AvroKeyValueWriter(Schema keySchema, Schema valueSchema, CodecFactory compressionCodec, OutputStream outputStream) throws IOException {
this(keySchema, valueSchema, compressionCodec, outputStream, DataFileConstants.DEFAULT_SYNC_INTERVAL);
}
void write(K key, V value) throws IOException {
mOutputRecord.setKey(key);
mOutputRecord.setValue(value);
mAvroFileWriter.append(mOutputRecord.get());
}
void close() throws IOException {
mAvroFileWriter.close();
}
long sync() throws IOException {
return mAvroFileWriter.sync();
}
}
//taken from AvroKeyValue avro-mapr lib
private static class AvroKeyValue<K, V> {
/** The name of the key value pair generic record. */
static final String KEY_VALUE_PAIR_RECORD_NAME = "KeyValuePair";
/** The namespace of the key value pair generic record. */
static final String KEY_VALUE_PAIR_RECORD_NAMESPACE = "org.apache.avro.mapreduce";
/** The name of the generic record field containing the key. */
static final String KEY_FIELD = "key";
/** The name of the generic record field containing the value. */
static final String VALUE_FIELD = "value";
/** The key/value generic record wrapped by this class. */
final GenericRecord mKeyValueRecord;
/**
* Wraps a GenericRecord that is a key value pair.
*/
AvroKeyValue(GenericRecord keyValueRecord) {
mKeyValueRecord = keyValueRecord;
}
GenericRecord get() {
return mKeyValueRecord;
}
void setKey(K key) {
mKeyValueRecord.put(KEY_FIELD, key);
}
void setValue(V value) {
mKeyValueRecord.put(VALUE_FIELD, value);
}
/**
* Creates a KeyValuePair generic record schema.
*
* @return A schema for a generic record with two fields: 'key' and 'value'.
*/
static Schema getSchema(Schema keySchema, Schema valueSchema) {
Schema schema = Schema.createRecord(
KEY_VALUE_PAIR_RECORD_NAME, "A key/value pair", KEY_VALUE_PAIR_RECORD_NAMESPACE, false);
schema.setFields(Arrays.asList(
new Schema.Field(KEY_FIELD, keySchema, "The key", null),
new Schema.Field(VALUE_FIELD, valueSchema, "The value", null)));
return schema;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment