Skip to content

Instantly share code, notes, and snippets.

@aikar
Last active October 17, 2019 17:11
Show Gist options
  • Save aikar/752cb7452a64c8abeb54 to your computer and use it in GitHub Desktop.
Save aikar/752cb7452a64c8abeb54 to your computer and use it in GitHub Desktop.
Empire Minecraft DB Wrapper - EmpireDb - Released as MIT - Updated 6/24/2017 with less EMC related code. depends on https://github.com/aikar/minecraft-timings/
package com.empireminecraft.systems.db;
import com.empireminecraft.util.Log;
import java.sql.SQLException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class AsyncDbQueue implements Runnable {
private static final Queue<AsyncDbStatement> queue = new ConcurrentLinkedQueue<>();
private static final Lock lock = new ReentrantLock();
@Override
public void run() {
processQueue();
}
static void processQueue() {
if (queue.isEmpty() || !lock.tryLock()) {
return;
}
AsyncDbStatement stm;
DbStatement dbStatement;
try {
dbStatement = new DbStatement();
} catch (Exception e) {
lock.unlock();
Log.exception("Exception getting DbStatement in AsyncDbQueue", e);
return;
}
while ((stm = queue.poll()) != null) {
try {
if (dbStatement.isClosed()) {
dbStatement = new DbStatement();
}
stm.process(dbStatement);
} catch (SQLException e) {
stm.onError(e);
}
}
dbStatement.close();
lock.unlock();
}
static boolean queue(AsyncDbStatement stm) {
return queue.offer(stm);
}
}
package com.empireminecraft.systems.db;
import com.empireminecraft.util.Log;
import org.intellij.lang.annotations.Language;
import java.sql.SQLException;
/**
* Template class for user to override. Will run on a different thread so
* you can run SQL queries safely without impacting main thread.
* <p/>
* Will automatically close the connection once run() is done!
* <p/>
* Calls onError when a SQLException is fired, and provides
* an onResultsSync method to be overridden to receive all DB Results back on main thread,
* by calling getResultsSync() on the Async run(DbStatement) call.
*/
public abstract class AsyncDbStatement {
@Language("MySQL")
protected String query;
private boolean done = false;
public AsyncDbStatement() {
queue(null);
}
public AsyncDbStatement(@Language("MySQL") String query) {
queue(query);
}
/**
* Schedules this async statement to run on anther thread. This is the only method that should be
* called on the main thread and it should only be called once.
*
* @param query
*/
private void queue(@Language("MySQL") final String query) {
this.query = query;
AsyncDbQueue.queue(this);
}
/**
* Implement this method with your code that does Async SQL logic.
*
* @param statement
* @throws SQLException
*/
protected abstract void run(DbStatement statement) throws SQLException;
/**
* Override this event to have special logic for when an exception is fired.
*
* @param e
*/
public void onError(SQLException e) {
Log.exception("Exception in AsyncDbStatement" + query, e);
}
public void process(DbStatement stm) throws SQLException {
synchronized (this) {
if (!done) {
if (query != null) {
stm.query(query);
}
run(stm);
done = true;
}
}
}
}
package com.empireminecraft.systems.db;
import co.aikar.timings.lib.MCTiming;
import co.aikar.timings.lib.TimingManager;
import com.empireminecraft.util.Log;
import com.empireminecraft.util.SneakyThrow;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.bukkit.Bukkit;
import org.bukkit.plugin.Plugin;
import org.intellij.lang.annotations.Language;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
public final class DB {
private static HikariDataSource pooledDataSource;
private static TimingManager timingsManager;
private static MCTiming sqlTiming;
private static Plugin plugin;
private DB() {}
/**
* Called in onDisable, destroys the Data source and nulls out references.
*/
public static void close() {
AsyncDbQueue.processQueue();
pooledDataSource.close();
pooledDataSource = null;
}
/**
* Called in onEnable, initializes the pool and configures it and opens the first connection to spawn the pool.
*/
public static void initialize(Plugin plugin, String user, String pass, String db, String hostAndPort) {
if (hostAndPort == null) {
hostAndPort = "localhost:3306";
}
initialize(plugin, user, pass, "mysql://" + hostAndPort + "/" + db);
}
public static void initialize(Plugin plugin, String user, String pass, String jdbcUrl) {
try {
DB.plugin = plugin;
timingsManager = TimingManager.of(plugin);
sqlTiming = timingsManager.of("Database");
HikariConfig config = new HikariConfig();
config.setPoolName(plugin.getDescription().getName() + " DB"); // Update to your DB name
plugin.getLogger().info("Connecting to Database: " + jdbcUrl);
config.setDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
config.addDataSourceProperty("url", "jdbc:" + jdbcUrl);
config.addDataSourceProperty("user", user);
config.addDataSourceProperty("password", pass);
config.addDataSourceProperty("cachePrepStmts", true);
config.addDataSourceProperty("prepStmtCacheSize", 250);
config.addDataSourceProperty("prepStmtCacheSqlLimit", 2048);
config.addDataSourceProperty("useServerPrepStmts", true);
config.addDataSourceProperty("cacheCallableStmts", true);
config.addDataSourceProperty("cacheResultSetMetadata", true);
config.addDataSourceProperty("cacheServerConfiguration", true);
config.addDataSourceProperty("useLocalSessionState", true);
config.addDataSourceProperty("elideSetAutoCommits", true);
config.addDataSourceProperty("alwaysSendSetIsolation", false);
config.setConnectionTestQuery("SELECT 1");
config.setInitializationFailFast(true);
config.setMinimumIdle(3);
config.setMaximumPoolSize(5);
pooledDataSource = new HikariDataSource(config);
pooledDataSource.setTransactionIsolation("TRANSACTION_READ_COMMITTED");
Bukkit.getScheduler().runTaskTimerAsynchronously(plugin, new AsyncDbQueue(), 0, 1);
} catch (Exception ex) {
pooledDataSource = null;
Log.exception("EmpireDB: Error Creating Database Pool", ex);
Bukkit.getServer().shutdown();
}
}
/**
* Initiates a new DbStatement and prepares the first query.
* <p/>
* YOU MUST MANUALLY CLOSE THIS STATEMENT IN A finally {} BLOCK!
*
* @param query
* @return
* @throws SQLException
*/
public static DbStatement query(@Language("MySQL") String query) throws SQLException {
return (new DbStatement()).query(query);
}
/**
* Initiates a new DbStatement and prepares the first query.
* <p/>
* YOU MUST MANUALLY CLOSE THIS STATEMENT IN A finally {} BLOCK!
*
* @param query
* @return
* @throws SQLException
*/
public static CompletableFuture<DbStatement> queryAsync(@Language("MySQL") String query) throws SQLException {
CompletableFuture<DbStatement> future = new CompletableFuture<>();
Bukkit.getScheduler().runTaskAsynchronously(plugin, () -> {
try {
future.complete(new DbStatement().query(query));
} catch (SQLException e) {
future.completeExceptionally(e);
}
});
return future;
}
/**
* Utility method to execute a query and retrieve the first row, then close statement.
* You should ensure result will only return 1 row for maximum performance.
*
* @param query The query to run
* @param params The parameters to execute the statement with
* @return DbRow of your results (HashMap with template return type)
* @throws SQLException
*/
public static DbRow getFirstRow(@Language("MySQL") String query, Object... params) throws SQLException {
try (DbStatement statement = DB.query(query).execute(params)) {
return statement.getNextRow();
}
}
/**
* Utility method to execute a query and retrieve the first row, then close statement.
* You should ensure result will only return 1 row for maximum performance.
*
* @param query The query to run
* @param params The parameters to execute the statement with
* @return DbRow of your results (HashMap with template return type)
* @throws SQLException
*/
public static CompletableFuture<DbRow> getFirstRowAsync(@Language("MySQL") String query, Object... params) throws SQLException {
CompletableFuture<DbRow> future = new CompletableFuture<>();
new AsyncDbStatement(query) {
@Override
protected void run(DbStatement statement) throws SQLException {
try {
future.complete(statement.getNextRow());
} catch (Exception e) {
future.completeExceptionally(e);
}
}
};
return future;
}
/**
* Utility method to execute a query and retrieve the first column of the first row, then close statement.
* You should ensure result will only return 1 row for maximum performance.
*
* @param query The query to run
* @param params The parameters to execute the statement with
* @return DbRow of your results (HashMap with template return type)
* @throws SQLException
*/
public static <T> T getFirstColumn(@Language("MySQL") String query, Object... params) throws SQLException {
try (DbStatement statement = DB.query(query).execute(params)) {
return statement.getFirstColumn();
}
}
/**
* Utility method to execute a query and retrieve the first column of the first row, then close statement.
* You should ensure result will only return 1 row for maximum performance.
*
* @param query The query to run
* @param params The parameters to execute the statement with
* @return DbRow of your results (HashMap with template return type)
* @throws SQLException
*/
public static <T> CompletableFuture<T> getFirstColumnAsync(@Language("MySQL") String query, Object... params) throws SQLException {
CompletableFuture<T> future = new CompletableFuture<>();
new AsyncDbStatement(query) {
@Override
protected void run(DbStatement statement) throws SQLException {
try {
future.complete(statement.getFirstColumn());
} catch (Exception e) {
future.completeExceptionally(e);
}
}
};
return future;
}
/**
* Utility method to execute a query and retrieve first column of all results, then close statement.
*
* Meant for single queries that will not use the statement multiple times.
* @param query
* @param params
* @param <T>
* @return
* @throws SQLException
*/
public static <T> List<T> getFirstColumnResults(@Language("MySQL") String query, Object... params) throws SQLException {
List<T> dbRows = new ArrayList<>();
T result;
try (DbStatement statement = DB.query(query).execute(params)) {
while ((result = statement.getFirstColumn()) != null) {
dbRows.add(result);
}
}
return dbRows;
}
/**
* Utility method to execute a query and retrieve first column of all results, then close statement.
*
* Meant for single queries that will not use the statement multiple times.
* @param query
* @param params
* @param <T>
* @return
* @throws SQLException
*/
public static <T> CompletableFuture<List<T>> getFirstColumnResultsAsync(@Language("MySQL") String query, Object... params) throws SQLException {
CompletableFuture<List<T>> future = new CompletableFuture<>();
new AsyncDbStatement(query) {
@Override
protected void run(DbStatement statement) throws SQLException {
try {
List<T> dbRows = new ArrayList<>();
T result;
while ((result = statement.getFirstColumn()) != null) {
dbRows.add(result);
}
future.complete(dbRows);
} catch (Exception e) {
future.completeExceptionally(e);
}
}
};
return future;
}
/**
* Utility method to execute a query and retrieve all results, then close statement.
*
* Meant for single queries that will not use the statement multiple times.
*
* @param query The query to run
* @param params The parameters to execute the statement with
* @return List of DbRow of your results (HashMap with template return type)
* @throws SQLException
*/
public static List<DbRow> getResults(@Language("MySQL") String query, Object... params) throws SQLException {
try (DbStatement statement = DB.query(query).execute(params)) {
return statement.getResults();
}
}
/**
* Utility method to execute a query and retrieve all results, then close statement.
*
* Meant for single queries that will not use the statement multiple times.
*
* @param query The query to run
* @param params The parameters to execute the statement with
* @return List of DbRow of your results (HashMap with template return type)
* @throws SQLException
*/
public static CompletableFuture<List<DbRow>> getResultsAsync(@Language("MySQL") String query, Object... params) throws SQLException {
CompletableFuture<List<DbRow>> future = new CompletableFuture<>();
new AsyncDbStatement(query) {
@Override
protected void run(DbStatement statement) throws SQLException {
try {
future.complete(statement.getResults());
} catch (Exception e) {
future.completeExceptionally(e);
}
}
};
return future;
}
/**
* Utility method for executing an update synchronously that does an insert,
* closes the statement, and returns the last insert ID.
*
* @param query Query to run
* @param params Params to execute the statement with.
* @return Inserted Row Id.
* @throws SQLException
*/
public static Long executeInsert(@Language("MySQL") String query, Object... params) throws SQLException {
try (DbStatement statement = DB.query(query)) {
int i = statement.executeUpdate(params);
if (i > 0) {
return statement.getLastInsertId();
}
}
return null;
}
/**
* Utility method for executing an update synchronously, and then close the statement.
*
* @param query Query to run
* @param params Params to execute the statement with.
* @return Number of rows modified.
* @throws SQLException
*/
public static int executeUpdate(@Language("MySQL") String query, Object... params) throws SQLException {
try (DbStatement statement = DB.query(query)) {
return statement.executeUpdate(params);
}
}
/**
* Utility method to execute an update statement asynchronously and close the connection.
*
* @param query Query to run
* @param params Params to execute the update with
*/
public static void executeUpdateAsync(@Language("MySQL") String query, final Object... params) {
new AsyncDbStatement(query) {
@Override
public void run(DbStatement statement) throws SQLException {
statement.executeUpdate(params);
}
};
}
static Connection getConnection() throws SQLException {
return pooledDataSource != null ? pooledDataSource.getConnection() : null;
}
public static void createTransactionAsync(TransactionCallback run) {
createTransactionAsync(run, null, null);
}
public static void createTransactionAsync(TransactionCallback run, Runnable onSuccess, Runnable onFail) {
Bukkit.getScheduler().runTaskAsynchronously(plugin, () -> {
if (!createTransaction(run)) {
if (onFail != null) {
onFail.run();
}
} else if (onSuccess != null) {
onSuccess.run();
}
});
}
public static boolean createTransaction(TransactionCallback run) {
try (DbStatement stm = new DbStatement()) {
try {
stm.startTransaction();
if (!run.apply(stm)) {
stm.rollback();
return false;
} else {
stm.commit();
return true;
}
} catch (Exception e) {
stm.rollback();
Log.exception(e);
}
} catch (SQLException e) {
Log.exception(e);
}
return false;
}
@SuppressWarnings("WeakerAccess")
public static MCTiming timings(String name) {
return timingsManager.ofStart(name, sqlTiming);
}
public interface TransactionCallback extends Function<DbStatement, Boolean> {
@Override
default Boolean apply(DbStatement dbStatement) {
try {
return this.runTransaction(dbStatement);
} catch (Exception e) {
SneakyThrow.sneaky(e);
}
return false;
}
Boolean runTransaction(DbStatement stm) throws SQLException;
}
}
package com.empireminecraft.systems.db;
import java.util.HashMap;
/**
* TypeDef alias for results with a template return type getter
* so casting/implicit getInt type calls are not needed.
*/
public class DbRow extends HashMap<String, Object> {
/**
* Get the result as proper type.
* <p/>
* VALID: Long myLong = row.get("someUnsignedIntColumn");
* INVALID: String myString = row.get("someUnsignedIntColumn");
*
* @param <T>
* @param column
* @return Object of the matching type of the result.
*/
public <T> T get(String column) {
return (T) super.get(column);
}
/**
* Get the result as proper type., returning default if not found.
* <p/>
* VALID: Long myLong = row.get("someUnsignedIntColumn");
* INVALID: String myString = row.get("someUnsignedIntColumn");
*
* @param <T>
* @param column
* @return Object of the matching type of the result.
*/
public <T> T get(String column, T def) {
T res = (T) super.get(column);
if (res == null) {
return def;
}
return res;
}
/**
* Removes a result, returning as proper type.
* <p/>
* VALID: Long myLong = row.remove("someUnsignedIntColumn");
* INVALID: String myString = row.remove("someUnsignedIntColumn");
*
* @param <T>
* @param column
* @return Object of the matching type of the result.
*/
public <T> T remove(String column) {
return (T) super.remove(column);
}
/**
* Removes a result, returning as proper type, returning default if not found
* <p/>
* VALID: Long myLong = row.get("someUnsignedIntColumn");
* INVALID: String myString = row.get("someUnsignedIntColumn");
*
* @param <T>
* @param column
* @return Object of the matching type of the result.
*/
public <T> T remove(String column, T def) {
T res = (T) super.remove(column);
if (res == null) {
return def;
}
return res;
}
public DbRow clone() {
DbRow row = new DbRow();
row.putAll(this);
return row;
}
}
package com.empireminecraft.systems.db;
import co.aikar.timings.lib.MCTiming;
import com.empireminecraft.util.Log;
import org.intellij.lang.annotations.Language;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import static org.bukkit.Bukkit.getServer;
/**
* Manages a connection to the database pool and lets you work with an active
* prepared statement.
* <p/>
* Must close after you are done with it, preferably wrapping in a try/catch/finally
* DbStatement statement = null;
* try {
* statement = new DbStatement();
* // use it
* } catch (Exception e) {
* // handle exception
* } finally {
* if (statement != null) {
* statement.close();
* }
* }
*/
public class DbStatement implements AutoCloseable {
private Connection dbConn;
private PreparedStatement preparedStatement;
private ResultSet resultSet;
private String[] resultCols;
public String query = "";
// Has changes been made to a transaction w/o commit/rollback on close
private volatile boolean isDirty = false;
public DbStatement() throws SQLException {
dbConn = DB.getConnection();
if (dbConn == null) {
Log.exception("No database connection, shutting down", new SQLException("We do not have a database"));
getServer().shutdown();
}
}
public DbStatement(Connection connection) throws SQLException {
dbConn = connection;
}
/**
* Starts a transaction on this connection
*
* @return
* @throws SQLException
*/
public void startTransaction() throws SQLException {
try (MCTiming ignored = DB.timings("SQL - start transaction")) {
dbConn.setAutoCommit(false);
isDirty = true;
}
}
/**
* Commits a pending transaction on this connection
*
* @return
* @throws SQLException
*/
public void commit() {
if (!isDirty) {
return;
}
try (MCTiming ignored = DB.timings("SQL - commit")) {
isDirty = false;
dbConn.commit();
dbConn.setAutoCommit(true);
} catch (SQLException e) {
Log.exception(e);
}
}
/**
* Rollsback a pending transaction on this connection.
*
* @return
* @throws SQLException
*/
public synchronized void rollback() {
if (!isDirty) {
return;
}
try (MCTiming ignored = DB.timings("SQL - rollback")) {
isDirty = false;
dbConn.rollback();
dbConn.setAutoCommit(true);
} catch (SQLException e) {
Log.exception(e);
}
}
/**
* Initiates a new prepared statement on this connection.
*
* @param query
* @throws SQLException
*/
public DbStatement query(@Language("MySQL") String query) throws SQLException {
this.query = query;
try (MCTiming ignored = DB.timings("SQL - query: " + query)) {
closeStatement();
try {
preparedStatement = dbConn.prepareStatement(query, Statement.RETURN_GENERATED_KEYS);
} catch (SQLException e) {
close();
throw e;
}
}
return this;
}
/**
* Utility method used by execute calls to set the statements parameters to execute on.
*
* @param params Array of Objects to use for each parameter.
* @return
* @throws SQLException
*/
private void prepareExecute(Object... params) throws SQLException {
try (MCTiming ignored = DB.timings("SQL - prepareExecute: " + query)) {
closeResult();
if (preparedStatement == null) {
throw new IllegalStateException("Run Query first on statement before executing!");
}
for (int i = 0; i < params.length; i++) {
preparedStatement.setObject(i + 1, params[i]);
}
}
}
/**
* Execute an update query with the supplied parameters
*
* @param params
* @return
* @throws SQLException
*/
public int executeUpdate(Object... params) throws SQLException {
try (MCTiming ignored = DB.timings("SQL - executeUpdate: " + query)) {
try {
prepareExecute(params);
return preparedStatement.executeUpdate();
} catch (SQLException e) {
close();
throw e;
}
}
}
/**
* Executes the prepared statement with the supplied parameters.
*
* @param params
* @return
* @throws SQLException
*/
public DbStatement execute(Object... params) throws SQLException {
try (MCTiming ignored = DB.timings("SQL - execute: " + query)) {
try {
prepareExecute(params);
resultSet = preparedStatement.executeQuery();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
int numberOfColumns = resultSetMetaData.getColumnCount();
resultCols = new String[numberOfColumns];
// get the column names; column indexes start from 1
for (int i = 1; i < numberOfColumns + 1; i++) {
resultCols[i - 1] = resultSetMetaData.getColumnLabel(i);
}
} catch (SQLException e) {
close();
throw e;
}
}
return this;
}
/**
* Gets the Id of last insert
*
* @return Long
*/
public Long getLastInsertId() throws SQLException {
try (MCTiming ignored = DB.timings("SQL - getLastInsertId")) {
try (ResultSet genKeys = preparedStatement.getGeneratedKeys()) {
if (genKeys == null) {
return null;
}
Long result = null;
if (genKeys.next()) {
result = genKeys.getLong(1);
}
return result;
}
}
}
/**
* Gets all results as an array of DbRow
*
* @return
* @throws SQLException
*/
public ArrayList<DbRow> getResults() throws SQLException {
if (resultSet == null) {
return null;
}
try (MCTiming ignored = DB.timings("SQL - getResults")) {
ArrayList<DbRow> result = new ArrayList<>();
DbRow row;
while ((row = getNextRow()) != null) {
result.add(row);
}
return result;
}
}
/**
* Gets the next DbRow from the result set.
*
* @return DbRow containing a hashmap of the columns
* @throws SQLException
*/
public DbRow getNextRow() throws SQLException {
if (resultSet == null) {
return null;
}
ResultSet nextResultSet = getNextResultSet();
if (nextResultSet != null) {
DbRow row = new DbRow();
for (String col : resultCols) {
row.put(col, nextResultSet.getObject(col));
}
return row;
}
return null;
}
public <T> T getFirstColumn() throws SQLException {
ResultSet resultSet = getNextResultSet();
if (resultSet != null) {
return (T) resultSet.getObject(1);
}
return null;
}
/**
* Util method to get the next result set and close it when done.
*
* @return
* @throws SQLException
*/
private ResultSet getNextResultSet() throws SQLException {
if (resultSet != null && resultSet.next()) {
return resultSet;
} else {
closeResult();
return null;
}
}
private void closeResult() throws SQLException {
if (resultSet != null) {
resultSet.close();
resultSet = null;
}
}
private void closeStatement() throws SQLException {
closeResult();
if (preparedStatement != null) {
preparedStatement.close();
preparedStatement = null;
}
}
/**
* Closes all resources associated with this statement and returns the connection to the pool.
*/
public void close() {
try (MCTiming ignored = DB.timings("SQL - close")) {
try {
closeStatement();
if (dbConn != null) {
if (isDirty && !dbConn.getAutoCommit()) {
Log.exception(new Exception("Statement was not finalized: " + query));
rollback();
}
dbConn.close();
dbConn = null;
}
} catch (SQLException ex) {
Log.exception("Failed to close DB connection: " + query, ex);
}
}
}
public boolean isClosed() throws SQLException {
return dbConn == null || dbConn.isClosed();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment