Created
November 25, 2020 01:17
-
-
Save kmuthukk/75c774405b14623000d493745fa10455 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.yugabyte.sample.apps; | |
import java.sql.Connection; | |
import java.sql.DriverManager; | |
import java.sql.ResultSet; | |
import java.sql.SQLException; | |
import java.sql.Statement; | |
import java.sql.PreparedStatement; | |
import java.sql.Array; | |
import java.util.ArrayList; | |
// | |
// The following program assumes you have a table like: | |
// | |
// CREATE TABLE IF NOT EXISTS users( | |
// id text, | |
// ename text, | |
// age int, | |
// city text, | |
// about_me text, | |
// PRIMARY KEY(id, ename)); | |
// | |
// with lots of rows, and you want to scan the table for some non-indexed filter condition | |
// like "age=22", and delete those rows in batches. | |
// | |
// Note: One option would have been to perform the delete in one giant ACID transaction, | |
// but on very large data sets, that's a little bit of an anti-pattern in a distributed database. | |
// For many use cases, this deletion is OK to be a background activity that's done in | |
// smaller batches, and doesn't need the to be done in one atomic operations across | |
// millions of rows. | |
// | |
// The program uses one connection to do a long-running scan where the primary key of the | |
// rows matching the condition is fetched in chunks; and then another connection is used to | |
// delete the rows in batches. | |
// | |
// If the scan is expected to be long-running, you must also increase the setting for | |
// the yb-tserver gflag 'timestamp_history_retention_interval_sec' from its default of 120 | |
// to something like 3600. | |
// | |
public class ScanAndDelete { | |
public static void deleteRows(Connection del_conn, PreparedStatement del_ps, ArrayList<String> ids) { | |
try { | |
// Some munging to convert the ArrayList to a java.sql.Array | |
final String[] array_of_ids = ids.toArray(new String[ids.size()]); | |
java.sql.Array sql_array_of_ids = del_conn.createArrayOf("TEXT", array_of_ids); | |
System.out.println("Got " + ids.size() + " ids to delete..."); | |
// Now perform the delete. Note: The session is in auto-commit mode (done in the caller). | |
// So after deleting this batch of rows, the operation will be committed. | |
del_ps.setArray(1, sql_array_of_ids); | |
int rows_deleted = del_ps.executeUpdate(); | |
System.out.println("Deleted " + rows_deleted + " rows."); | |
} catch (SQLException e) { | |
System.err.println("Error in deleteRows(): " + e.getMessage()); | |
} | |
} | |
public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException { | |
Class.forName("org.postgresql.Driver"); | |
try { | |
// String host = "localhost"; | |
String host = "172.151.28.148"; | |
String connect_string = "jdbc:postgresql://" + host + ":5433/yugabyte"; | |
// We'll use this connection to do the long-running scan. | |
Connection scan_conn = DriverManager.getConnection(connect_string, "yugabyte", "yugabyte"); | |
// We'll use this connection to do the deletes in a (N-rows at a time; commit) loop. | |
Connection del_conn = DriverManager.getConnection(connect_string, "yugabyte", "yugabyte"); | |
del_conn.setAutoCommit(true); | |
PreparedStatement del_ps = del_conn.prepareStatement("DELETE FROM users WHERE id = ANY(?)"); | |
System.out.println("Connected to the PostgreSQL server successfully."); | |
// By default, the driver collects all results for the query at once. The JDBC driver provides | |
// a means of basing a ResultSet on a database cursor and only fetching a small number of rows. | |
// But to use this feature, the Connection must not be in autocommit mode. | |
// | |
// See documentation here https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor | |
scan_conn.setAutoCommit(false); | |
// Additionally, for long running scans, with concurrent writes, set READ ONLY, DEFERRABLE to | |
// avoid read-restarts. | |
Statement stmt = scan_conn.createStatement(); | |
stmt.executeUpdate("BEGIN ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE"); | |
Statement selectStmt = scan_conn.createStatement(); | |
selectStmt.setFetchSize(100); | |
ResultSet rs = selectStmt.executeQuery("select id, ename, age, city from users where age=23"); | |
int rows = 0; | |
ArrayList<String> ids = new ArrayList<String>(); | |
while (rs.next()) { | |
rows++; | |
ids.add(rs.getString(1)); | |
// If we have accumulated 100 ids to cleanup, clean up now. | |
if (ids.size() == 100) { | |
deleteRows(del_conn, del_ps, ids); | |
ids.clear(); | |
} | |
// Print every 10K rows; this is just some debug logging. | |
if ((rows % 10000) == 0) { | |
System.out.println("Query returned: "+ | |
"ename=" + rs.getString(2) + | |
", age=" + rs.getString(3) + | |
", city=" + rs.getString(4)); | |
} | |
} | |
// Process any remaining (the last batch) of ids. | |
if (ids.size() > 0) { | |
deleteRows(del_conn, del_ps, ids); | |
ids.clear(); | |
} | |
rs.close(); | |
System.out.println("Rows = " + rows); | |
System.out.println("Closing cursor"); | |
del_conn.close(); | |
scan_conn.close(); | |
} catch (SQLException e) { | |
System.err.println("Error: " + e.getMessage()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment