Skip to content

Instantly share code, notes, and snippets.

@sundy-li
Created October 20, 2020 08:49
Show Gist options
  • Save sundy-li/71d21ccd2f0dd7ac4685231efa1e2296 to your computer and use it in GitHub Desktop.
Save sundy-li/71d21ccd2f0dd7ac4685231efa1e2296 to your computer and use it in GitHub Desktop.
test-jdbc
/**
*
*/
public class SparkJdbcMain {
public static void main(String[] args) throws SQLException, ClassNotFoundException {
SparkSession spark = SparkSession
.builder()
.appName("spark-jdbc-test")
.enableHiveSupport()
.getOrCreate();
// mocked 170 rows data
Dataset<Row> rdd = spark.sql("SELECT count(1) from test_table where day = '2020-10-10'");
rdd.foreachPartition((ForeachPartitionFunction<Row>) iterator -> {
int columnNum = 34;
String params = Strings.repeat("?, ", columnNum);
Enumeration<Driver> drivers = DriverManager.getDrivers();
while (drivers.hasMoreElements()) {
DriverManager.deregisterDriver(drivers.nextElement());
}
DriverManager.registerDriver(new ClickHouseDriver());
Connection connection = DriverManager.getConnection("jdbc:clickhouse://127.0.0.1:9000");
Statement statement = connection.createStatement();
statement.execute("DROP TABLE IF EXISTS xm_user");
statement.execute("CREATE TABLE xm_user (\n"
+ " create_time DateTime('Asia/Shanghai'),\n"
+ " update_time DateTime('Asia/Shanghai') COMMENT 'keep the larger record when merge',\n"
+ " org_id String COMMENT 'pk_1',\n"
+ " user_id String COMMENT 'pk_2',\n"
+ " cf001 Nullable(Float64),\n"
+ " cf002 Nullable(Float64),\n"
+ " cf003 Nullable(Float64),\n"
+ " cf004 Nullable(Float64),\n"
+ " cf005 Nullable(Float64),\n"
+ " cs001 Nullable(String),\n"
+ " cs002 Nullable(String),\n"
+ " cs003 Nullable(String),\n"
+ " cs004 Nullable(String),\n"
+ " cs005 Nullable(String),\n"
+ " cs006 Nullable(String),\n"
+ " cs007 Nullable(String),\n"
+ " cs008 Nullable(String),\n"
+ " cs009 Nullable(String),\n"
+ " cs010 Nullable(String),\n"
+ " cs011 Nullable(String),\n"
+ " cs012 Nullable(String),\n"
+ " cs013 Nullable(String),\n"
+ " cs014 Nullable(String),\n"
+ " cs015 Nullable(String),\n"
+ " ct001 Nullable(DateTime('Asia/Shanghai')),\n"
+ " ct002 Nullable(DateTime('Asia/Shanghai')),\n"
+ " ct003 Nullable(DateTime('Asia/Shanghai')),\n"
+ " ct004 Nullable(DateTime('Asia/Shanghai')),\n"
+ " ct005 Nullable(DateTime('Asia/Shanghai')),\n"
+ " cl001 Nullable(Int64),\n"
+ " cl002 Nullable(Int64),\n"
+ " cl003 Nullable(Int64),\n"
+ " cl004 Nullable(Int64),\n"
+ " cl005 Nullable(Int64)\n"
+ ") ENGINE = ReplacingMergeTree(\"update_time\")\n"
+ "ORDER BY (org_id, user_id);");
PreparedStatement
pstmt = connection.prepareStatement("INSERT INTO xm_user(create_time, update_time, org_id, user_id, cf001,cf002,cf003,cf004,cf005,"
+ "cs001,cs002,cs003,cs004,cs005,cs006,cs007,cs008,cs009,cs010,cs011,cs012,cs013,cs014,cs015,"
+ "ct001,ct002,ct003,ct004,ct005,cl001,cl002,cl003,cl004,cl005) VALUES("
+ params.substring(0, params.length() - 2) + ")");
while(iterator.hasNext()) {
iterator.next();
pstmt.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
pstmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
pstmt.setString(3, "4444");
pstmt.setString(4, "5555");
pstmt.setLong(34, 333);
pstmt.addBatch();
}
pstmt.executeBatch();
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment