Skip to content

Instantly share code, notes, and snippets.

@kmuthukk
Created November 25, 2020 01:17
Show Gist options
  • Save kmuthukk/75c774405b14623000d493745fa10455 to your computer and use it in GitHub Desktop.
Save kmuthukk/75c774405b14623000d493745fa10455 to your computer and use it in GitHub Desktop.
package com.yugabyte.sample.apps;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.PreparedStatement;
import java.sql.Array;
import java.util.ArrayList;
//
// The following program assumes you have a table like:
//
// CREATE TABLE IF NOT EXISTS users(
// id text,
// ename text,
// age int,
// city text,
// about_me text,
// PRIMARY KEY(id, ename));
//
// with lots of rows, and you want to scan the table for some non-indexed filter condition
// like "age=22", and delete those rows in batches.
//
// Note: One option would have been to perform the delete in one giant ACID transaction,
// but on very large data sets, that's a little bit of an anti-pattern in a distributed database.
// For many use cases, this deletion is OK to be a background activity that's done in
// smaller batches, and doesn't need the to be done in one atomic operations across
// millions of rows.
//
// The program uses one connection to do a long-running scan where the primary key of the
// rows matching the condition is fetched in chunks; and then another connection is used to
// delete the rows in batches.
//
// If the scan is expected to be long-running, you must also increase the setting for
// the yb-tserver gflag 'timestamp_history_retention_interval_sec' from its default of 120
// to something like 3600.
//
public class ScanAndDelete {
public static void deleteRows(Connection del_conn, PreparedStatement del_ps, ArrayList<String> ids) {
try {
// Some munging to convert the ArrayList to a java.sql.Array
final String[] array_of_ids = ids.toArray(new String[ids.size()]);
java.sql.Array sql_array_of_ids = del_conn.createArrayOf("TEXT", array_of_ids);
System.out.println("Got " + ids.size() + " ids to delete...");
// Now perform the delete. Note: The session is in auto-commit mode (done in the caller).
// So after deleting this batch of rows, the operation will be committed.
del_ps.setArray(1, sql_array_of_ids);
int rows_deleted = del_ps.executeUpdate();
System.out.println("Deleted " + rows_deleted + " rows.");
} catch (SQLException e) {
System.err.println("Error in deleteRows(): " + e.getMessage());
}
}
public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException {
Class.forName("org.postgresql.Driver");
try {
// String host = "localhost";
String host = "172.151.28.148";
String connect_string = "jdbc:postgresql://" + host + ":5433/yugabyte";
// We'll use this connection to do the long-running scan.
Connection scan_conn = DriverManager.getConnection(connect_string, "yugabyte", "yugabyte");
// We'll use this connection to do the deletes in a (N-rows at a time; commit) loop.
Connection del_conn = DriverManager.getConnection(connect_string, "yugabyte", "yugabyte");
del_conn.setAutoCommit(true);
PreparedStatement del_ps = del_conn.prepareStatement("DELETE FROM users WHERE id = ANY(?)");
System.out.println("Connected to the PostgreSQL server successfully.");
// By default, the driver collects all results for the query at once. The JDBC driver provides
// a means of basing a ResultSet on a database cursor and only fetching a small number of rows.
// But to use this feature, the Connection must not be in autocommit mode.
//
// See documentation here https://jdbc.postgresql.org/documentation/head/query.html#query-with-cursor
scan_conn.setAutoCommit(false);
// Additionally, for long running scans, with concurrent writes, set READ ONLY, DEFERRABLE to
// avoid read-restarts.
Statement stmt = scan_conn.createStatement();
stmt.executeUpdate("BEGIN ISOLATION LEVEL SERIALIZABLE, READ ONLY, DEFERRABLE");
Statement selectStmt = scan_conn.createStatement();
selectStmt.setFetchSize(100);
ResultSet rs = selectStmt.executeQuery("select id, ename, age, city from users where age=23");
int rows = 0;
ArrayList<String> ids = new ArrayList<String>();
while (rs.next()) {
rows++;
ids.add(rs.getString(1));
// If we have accumulated 100 ids to cleanup, clean up now.
if (ids.size() == 100) {
deleteRows(del_conn, del_ps, ids);
ids.clear();
}
// Print every 10K rows; this is just some debug logging.
if ((rows % 10000) == 0) {
System.out.println("Query returned: "+
"ename=" + rs.getString(2) +
", age=" + rs.getString(3) +
", city=" + rs.getString(4));
}
}
// Process any remaining (the last batch) of ids.
if (ids.size() > 0) {
deleteRows(del_conn, del_ps, ids);
ids.clear();
}
rs.close();
System.out.println("Rows = " + rows);
System.out.println("Closing cursor");
del_conn.close();
scan_conn.close();
} catch (SQLException e) {
System.err.println("Error: " + e.getMessage());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment