Skip to content

Instantly share code, notes, and snippets.

@polleyg
Created January 23, 2019 06:37
Show Gist options
  • Save polleyg/e8569f9c9378ad51b9a9eab7cecce8a0 to your computer and use it in GitHub Desktop.
Save polleyg/e8569f9c9378ad51b9a9eab7cecce8a0 to your computer and use it in GitHub Desktop.
beam_sql_part_2
[..]
public static final Schema SCHEMA = Schema.builder()
.addStringField("lang")
.addInt32Field("views")
.build();
[..]
.apply("transform_to_row", ParDo.of(new RowParDo())).setRowSchema(SCHEMA)
[..]
//ParDo for String -> Row (SQL)
public static class RowParDo extends DoFn<String, Row> {
@ProcessElement
public void processElement(ProcessContext c) {
if (!c.element().equalsIgnoreCase(HEADER)) {
String[] vals = c.element().split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)");
Row appRow = Row.withSchema(SCHEMA)
.addValues(vals[4], Integer.valueOf(vals[6]))
.build();
c.output(appRow);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment