Created
January 11, 2021 18:26
-
-
Save viveknaskar/dfbf8d8ca67a503eaa156e0ac7168791 to your computer and use it in GitHub Desktop.
Main Class of Dataflow for reading, processing & transforming data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) { | |
/** | |
* Constructed StorageToRedisOptions object using the method PipelineOptionsFactory.fromArgs to read options from command-line | |
*/ | |
StorageToRedisOptions options = PipelineOptionsFactory.fromArgs(args) | |
.withValidation() | |
.as(StorageToRedisOptions.class); | |
Pipeline p = Pipeline.create(options); | |
p.apply("Reading Lines...", TextIO.read().from(options.getInputFile())) | |
.apply("Transforming data...", | |
ParDo.of(new DoFn<String, String[]>() { | |
@ProcessElement | |
public void TransformData(@Element String line, OutputReceiver<String[]> out) { | |
String[] fields = line.split("\\|"); | |
out.output(fields); | |
} | |
})) | |
.apply("Processing data...", | |
ParDo.of(new DoFn<String[], KV<String, String>>() { | |
@ProcessElement | |
public void ProcessData(@Element String[] fields, OutputReceiver<KV<String, String>> out) { | |
if (fields[RedisIndex.GUID.getValue()] != null) { | |
out.output(KV.of("firstname:" | |
.concat(fields[RedisIndex.FIRSTNAME.getValue()]), fields[RedisIndex.GUID.getValue()])); | |
out.output(KV.of("lastname:" | |
.concat(fields[RedisIndex.LASTNAME.getValue()]), fields[RedisIndex.GUID.getValue()])); | |
out.output(KV.of("dob:" | |
.concat(fields[RedisIndex.DOB.getValue()]), fields[RedisIndex.GUID.getValue()])); | |
out.output(KV.of("postalcode:" | |
.concat(fields[RedisIndex.POSTAL_CODE.getValue()]), fields[RedisIndex.GUID.getValue()])); | |
} | |
} | |
})) | |
.apply("Writing field indexes into redis", | |
RedisIO.write().withMethod(RedisIO.Write.Method.SADD) | |
.withEndpoint(options.getRedisHost(), options.getRedisPort())); | |
p.run(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment