Skip to content

Instantly share code, notes, and snippets.

@JoaquinSV
Created April 12, 2019 15:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JoaquinSV/de9432e8ac0478934d3affdacd463762 to your computer and use it in GitHub Desktop.
Save JoaquinSV/de9432e8ac0478934d3affdacd463762 to your computer and use it in GitHub Desktop.
package org.apache.kudu.flume.sink;
import java.util.Collections;
import java.util.List;
import java.sql.Timestamp;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.kudu.client.Insert;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.Operation;
import org.apache.kudu.client.PartialRow;
 
/**
* Convierte cada payload de un Event en columnas de la tabla.
* El payload es una linea de texto separado por coma.
*
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class SimpleKuduOperationsProducer implements KuduOperationsProducer {
public static final String PAYLOAD_COLUMN_PROP = "payloadColumn";
public static final String PAYLOAD_COLUMN_DEFAULT = "payload";
 
private KuduTable table;
private String payloadColumn;
 
public SimpleKuduOperationsProducer() {
}
 
@Override
public void configure(Context context) {
payloadColumn = context.getString(PAYLOAD_COLUMN_PROP, PAYLOAD_COLUMN_DEFAULT);
}
 
@Override
public void initialize(KuduTable table) {
this.table = table;
}
 
}
 
  @Override
public List<Operation> getOperations(Event event)
throws FlumeException {
try {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
//Convertir event en String
String stringEveny = new String(event.getBody());
String[] values = stringEveny.split(",");
//Extraer los campos del Event
Integer orderNumber = Integer.parseInt(values[0]);
Timestamp orderdate = Timestamp.valueOf(values[1]);
Timestamp requiredDate = Timestamp.valueOf(values[2]);
Timestamp shippedDate = Timestamp.valueOf(values[3]);
String status = values[4];
String comments = values[5];
// agregar los valores a la fila
// OJO con addTimestamp, algunas versiones
// del cliente de kudu no lo soportan
row.addInt("ordernumber", orderNumber);
row.addTimestamp("orderdate", orderdate);
row.addTimestamp("requireddate", requiredDate);
row.addTimestamp("shippeddate", shippedDate);
row.addString("status", status);
row.addString("comments", comments);
return Collections.singletonList((Operation) insert);
} catch (Exception e) {
throw new FlumeException("Failed to create Kudu Insert object",
e);
}
}
 
@Override
public void close() {
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment