Skip to content

Instantly share code, notes, and snippets.

@jrask
Last active November 17, 2019 08:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jrask/ef4a8531b0563f1420ce276e7b0f59ce to your computer and use it in GitHub Desktop.
Save jrask/ef4a8531b0563f1420ce276e7b0f59ce to your computer and use it in GitHub Desktop.
Flink hdfs -> hdfs does to "commit" files, they stay as ".in-progress" files
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1), CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("hdfs://<server><dir>/checkpoints",true));
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
StreamingFileSink<JsonObject> hdfsSink = StreamingFileSink
.<JsonObject>forRowFormat(new Path("hdfs://<server>/<dir>"), new SimpleStringEncoder<>("UTF-8"))
.withBucketAssigner(new EventTimeDateTimeBuckerAssigner<>("'/year'=YYYY/'month'=MM/'day'=dd/'hour'=HH"))
.build();
env.readTextFile("hdfs://<server><dir><file>")
.map(Parser::parse)
.addSink(hdfsSink);
env.execute("some-pipeline");
}
/*
hdfs -> hdfs esults in the following. However if I use kafka as source, it works properly.
rw-rw----+ 3 someuser supergroup 87792789 2019-11-16 20:57 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-62.inprogress.8f9c6104-4c6c-4eee-8650-dd5d1d12d668
-rw-rw----+ 3 someuser supergroup 64696413 2019-11-16 20:58 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-63.inprogress.42589a04-601b-496d-ae20-7db1d56089dc
-rw-rw----+ 3 someuser supergroup 71108086 2019-11-16 20:59 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-64.inprogress.97b00857-808d-4c4e-9a50-a42af6c604f5
-rw-rw----+ 3 someuser supergroup 74191577 2019-11-16 21:00 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-65.inprogress.41ff8792-2647-4a50-b2b1-f74cb94aeafe
-rw-rw----+ 3 someuser supergroup 68633473 2019-11-16 21:01 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-66.inprogress.62bf4ee9-f897-4782-823b-23ccc7d58d96
-rw-rw----+ 3 someuser supergroup 71332201 2019-11-16 21:03 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-67.inprogress.d51af4c9-c200-4cee-84e4-979fb6b1e958
-rw-rw----+ 3 someuser supergroup 21657645 2019-11-16 21:04 /data/some_dir/year=2019/month=05/day=21/hour=23/.part-0-68.inprogress.de9a2497-e99c-48ef-85b8-a73dd9f62643
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment