Skip to content

Instantly share code, notes, and snippets.

@ricston-git
Last active May 5, 2016 13:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ricston-git/ff2834164300bff8f9723302707f7b2a to your computer and use it in GitHub Desktop.
Save ricston-git/ff2834164300bff8f9723302707f7b2a to your computer and use it in GitHub Desktop.
{"namespace": "com.ricston.poc.avro",
"type": "record",
"name": "AvroPoc",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": ["string","null"]},
{"name": "surname", "type": ["string","null"]},
{"name": "age", "type": ["int","null"]}
]
}
@Override
public Object transformMessage(MuleMessage message, String outputEncoding)
throws TransformerException {
FileInputStream stream = (FileInputStream) message.getPayload();
DatumReader<AvroPoc> dwDatumReader = new SpecificDatumReader<AvroPoc>(
AvroPoc.class);
List<AvroPoc> allUsers = new ArrayList<AvroPoc>();
try {
final DataFileStream<AvroPoc> dataFileReader = new DataFileStream<AvroPoc>(
stream, dwDatumReader);
while (dataFileReader.hasNext()) {
AvroPoc record = new AvroPoc();
record = dataFileReader.next(record);
allUsers.add(record);
}
stream.close();
dataFileReader.close();
} catch (Exception e) {
throw new RuntimeException("Failed to serialize records",e);
}
message.setPayload(allUsers);
return message;
}
public Object transformMessage(MuleMessage message, String outputEncoding)
throws TransformerException {
List<AvroPoc> payload = (List<AvroPoc>) message.getPayload();
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Schema schema = ReflectData.get().getSchema(AvroPoc.class);
ReflectDatumWriter<Object> reflectDatumWriter = new ReflectDatumWriter<Object>(
schema);
try {
DataFileWriter<Object> writer = new DataFileWriter<Object>(
reflectDatumWriter).create(schema, outputStream);
for (AvroPoc currentRecord : payload) {
writer.append(currentRecord);
}
writer.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
return outputStream.toByteArray();
}
%dw 1.0
%output application/java
---
payload map ((records) -> ({
id: records.id as :number,
name: records.name,
surname: records.surname,
age : records.age as :number
}) as :object { class: "com.ricston.poc.avro.AvroPoc"})
%dw 1.0
%output application/csv
---
payload map ({
id : $.id default 0,
name : $.name default "",
surname : $.surname default "",
age : $.age default 0
})
<file:outbound-endpoint path="Avro-Processed"
responseTimeout="10000" doc:name="File" outputPattern="avropoc-example-#[server.dateTime].avro"/>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment