Skip to content

Instantly share code, notes, and snippets.

@aroch
Last active May 24, 2018 07:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aroch/7fb4219e7ada74f30654f1effe9d2f43 to your computer and use it in GitHub Desktop.
Save aroch/7fb4219e7ada74f30654f1effe9d2f43 to your computer and use it in GitHub Desktop.
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import java.nio.charset.Charset;
import java.util.Properties;
public class KinesisStringEventsProducerJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.setParallelism(1);
DataStream<String> simpleStringStream = see.addSource(new StringEventsGenerator());
Properties kinesisProducerConfig = new Properties();
kinesisProducerConfig.setProperty(AWSConfigConstants.AWS_REGION, "eu-west-1");
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>((SerializationSchema<String>) event -> {
String eventLine = event + System.lineSeparator();
return eventLine.getBytes(Charset.forName("UTF-8"));
}, kinesisProducerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("<STREAM_NAME>");
kinesis.setDefaultPartition("0");
simpleStringStream.addSink(kinesis);
see.execute();
}
public static class StringEventsGenerator implements SourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
int textRepeated = 100;
for (int i = 0; i < 100; i++) {
Thread.sleep(100);
String sampleEvent = i + "-" + StringUtils.repeat("A", textRepeated);
int messageLength = sampleEvent.length();
System.out.println("Message length: " + messageLength);
ctx.collect(sampleEvent);
}
}
@Override
public void cancel() {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment