Skip to content

Instantly share code, notes, and snippets.

@yaseminn
Created January 8, 2016 19:24
Show Gist options
  • Save yaseminn/f5a2b78b126df71dfd0b to your computer and use it in GitHub Desktop.
Save yaseminn/f5a2b78b126df71dfd0b to your computer and use it in GitHub Desktop.
public static void writeToDB(JavaSparkContext ctx, JavaPairRDD<String,String> shipResult) {
SQLContext sqlContext = new SQLContext(ctx);
//people.saveAsTextFile("/home/inosens/FEDSS/people");
String schemaString = "track_id roc";
// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<StructField>();
for (String fieldName : schemaString.split(" ")) {
fields.add(DataTypes.createStructField(fieldName,
DataTypes.StringType, true));
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = shipResult.map(new Function<Tuple2<String,String>, Row>() {
@Override
public Row call(Tuple2<String, String> arg0) throws Exception {
return RowFactory.create(arg0._1(), arg0._2());
}
});
// Apply the schema to the RDD.
DataFrame peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
// peopleDataFrame.createJDBCTable(MYSQL_CONNECTION_URL_WRITE, "track_on_alarm",
// false);
peopleDataFrame.insertIntoJDBC(MYSQL_CONNECTION_URL_WRITE, "track_on_alarm",false);
//peopleDataFrame.insertInto("track_on_alarm",false);
}
// error
Exception in thread "main" java.lang.RuntimeException: Table track_on_alarm already exists.
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:263)
at org.apache.spark.sql.DataFrame.insertIntoJDBC(DataFrame.scala:1630)
at parser.ConnectMySQL.writeToDB(ConnectMySQL.java:30)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment