Skip to content

Instantly share code, notes, and snippets.

@aikar
Created June 19, 2014 14:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aikar/96207c4d2012e02927f1 to your computer and use it in GitHub Desktop.
Save aikar/96207c4d2012e02927f1 to your computer and use it in GitHub Desktop.
package com.empireminecraft.systems.db;
import com.empireminecraft.util.Util;
import java.sql.SQLException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class AsyncDbQueue implements Runnable {
private static Queue<AsyncDbStatement> queue = new ConcurrentLinkedQueue<>();
private static final Lock lock = new ReentrantLock();
@Override
public void run() {
processQueue();
}
public static void processQueue() {
if (queue.isEmpty() || !lock.tryLock()) {
return;
}
AsyncDbStatement stm = null;
DbStatement dbStatement;
try {
dbStatement = new DbStatement();
} catch (Exception e) {
lock.unlock();
Util.printException("Exception getting DbStatement in AsyncDbQueue", e);
return;
}
while ((stm = queue.poll()) != null) {
try {
if (dbStatement.isClosed()) {
dbStatement = new DbStatement();
}
stm.process(dbStatement);
} catch (SQLException e) {
stm.onError(e);
}
}
dbStatement.close();
lock.unlock();
}
public static boolean queue(AsyncDbStatement stm) {
return queue.offer(stm);
}
}
package com.empireminecraft.systems.db;
import com.empireminecraft.util.BukkitUtil;
import com.empireminecraft.util.Util;
import java.sql.SQLException;
import java.util.ArrayList;
/**
* Template class for user to override. Will run on a different thread so
* you can run SQL queries safely without impacting main thread.
* <p/>
* Will automatically close the connection once run() is done!
* <p/>
* Calls onError when a SQLException is fired, and provides
* an onResultsSync method to be overridden to receive all DB Results back on main thread,
* by calling getResultsSync() on the Async run(DbStatement) call.
*/
public abstract class AsyncDbStatement {
protected String query = null;
private boolean done = false;
public AsyncDbStatement() {
queue(null);
}
public AsyncDbStatement(String query) {
queue(query);
}
/**
* Schedules this async statement to run on anther thread. This is the only method that should be
* called on the main thread and it should only be called once.
*
* @param query
*/
private void queue(final String query) {
this.query = query;
AsyncDbQueue.queue(this);
}
/**
* Implement this method with your code that does Async SQL logic.
*
* @param statement
* @throws SQLException
*/
protected abstract void run(DbStatement statement) throws SQLException;
/**
* Override this event to have special logic for when an exception is fired.
*
* @param e
*/
public void onError(SQLException e) {
Util.printException("Exception in AsyncDbStatement" + query, e);
}
/**
* Override this event if you need to retrieve the results back on the main thread.
*
* @param results
*/
public void onResultsSync(ArrayList<DbRow> results) {
}
/**
* Will get the results on the onResultsSync callback back on the main thread.
*/
protected void getResultsSync(DbStatement dbStatement) throws SQLException {
final ArrayList<DbRow> results = dbStatement.getResults();
BukkitUtil.runTask(new Runnable() {
@Override
public void run() {
onResultsSync(results);
}
});
}
public void process(DbStatement stm) throws SQLException {
synchronized (this) {
if (!done) {
if (query != null) {
stm.query(query);
}
run(stm);
done = true;
}
}
}
}
package com.empireminecraft.systems.db;
import java.util.HashMap;
/**
* TypeDef alias for results with a template return type getter
* so casting/implicit getInt type calls are not needed.
*/
public class DbRow extends HashMap<String, Object> {
/**
* Get the result as proper type.
* <p/>
* VALID: Long myLong = row.get("someUnsignedIntColumn");
* INVALID: String myString = row.get("someUnsignedIntColumn");
*
* @param <T>
* @param column
* @return Object of the matching type of the result.
*/
public <T> T get(String column) {
return (T) super.get(column);
}
}
package com.empireminecraft.systems.db;
import com.empireminecraft.Empire;
import com.empireminecraft.util.Util;
import org.spigotmc.CustomTimingsHandler;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
import static org.bukkit.Bukkit.getServer;
/**
* Manages a connection to the database pool and lets you work with an active
* prepared statement.
* <p/>
* Must close after you are done with it, preferably wrapping in a try/catch/finally
* DbStatement statement = null;
* try {
* statement = new DbStatement();
* // use it
* } catch (Exception e) {
* // handle exception
* } finally {
* if (statement != null) {
* statement.close();
* }
* }
*/
public class DbStatement implements AutoCloseable {
Connection dbConn = null;
PreparedStatement preparedStatement = null;
ResultSet resultSet = null;
private final boolean isAsync;
public ResultSetMetaData resultSetMetaData = null;
public String[] resultCols = null;
public String query = "";
public List<DbStatement> children = new ArrayList<>();
static CustomTimingsHandler sqlTimings = new CustomTimingsHandler("** Empire - SQL");
public DbStatement() throws SQLException {
isAsync = !getServer().isPrimaryThread();
if (EmpireDb.pooledDataSource == null) {
Util.printException("No database connection, shutting down", new SQLException("We do not have a database"));
getServer().shutdown();
return;
}
dbConn = EmpireDb.pooledDataSource.getConnection();
}
public DbStatement(Connection connection) throws SQLException {
isAsync = !getServer().isPrimaryThread();
dbConn = connection;
}
/**
* Creates a new DbStatement on the same connection so multiple
* prepared statements can be used on same connection
*
* @return DbStatement
* @throws SQLException
*/
public DbStatement createSecondaryStatement() throws SQLException {
DbStatement statement = new DbStatement(dbConn) {
@Override
public void close() {
dbConn = null;
super.close();
}
};
children.add(statement);
return statement;
}
/**
* Starts a transaction on this connection
*
* @return
* @throws SQLException
*/
public boolean startTransaction() throws SQLException {
if (!isAsync) sqlTimings.startTiming();
try (Statement statement = dbConn.createStatement()) {
return statement.execute("START TRANSACTION");
} finally {
if (!isAsync) sqlTimings.stopTiming();
}
}
/**
* Commits a pending transaction on this connection
*
* @return
* @throws SQLException
*/
public boolean commit() throws SQLException {
if (!isAsync) sqlTimings.startTiming();
try (Statement statement = dbConn.createStatement()) {
return statement.execute("COMMIT");
} finally {
if (!isAsync) sqlTimings.stopTiming();
}
}
/**
* Rollsback a pending transaction on this connection.
*
* @return
* @throws SQLException
*/
public boolean rollback() throws SQLException {
if (!isAsync) sqlTimings.startTiming();
try (Statement statement = dbConn.createStatement()) {
return statement.execute("ROLLBACK");
} finally {
if (!isAsync) sqlTimings.stopTiming();
}
}
/**
* Initiates a new prepared statement on this connection.
*
* @param query
* @throws SQLException
*/
public DbStatement query(String query) throws SQLException {
if (!isAsync) sqlTimings.startTiming();
if (resultSet != null) {
resultSet.close();
resultSet = null;
}
if (preparedStatement != null) {
preparedStatement.close();
preparedStatement = null;
}
long time = 0;
try {
if (Empire.debugPerformance) {
time = System.currentTimeMillis();
}
preparedStatement = dbConn.prepareStatement(query, Statement.RETURN_GENERATED_KEYS);
if (Empire.debugPerformance) {
time = System.currentTimeMillis() - time;
if (time > 10) {
Util.logInfo("[DEBUG] " + (isAsync ? "ASYNC " : "") +
"prepareStatement(\"" + query + "\") took " + time + "ms");
}
}
} catch (SQLException e) {
close();
throw e;
} finally {
if (!isAsync) sqlTimings.stopTiming();
}
this.query = query;
return this;
}
/**
* Utility method used by execute calls to set the statements parameters to execute on.
*
* @param params Array of Objects to use for each parameter.
* @return
* @throws SQLException
*/
protected String prepareExecute(Object... params) throws SQLException {
if (resultSet != null) {
resultSet.close();
resultSet = null;
}
if (preparedStatement == null) {
throw new IllegalStateException("Run Query first on statement before executing!");
}
StringBuilder paramsAsString = null;
if (Empire.debugPerformance) {
paramsAsString = new StringBuilder();
}
for (int i = 0; i < params.length; i++) {
if (Empire.debugPerformance) {
if (i > 0) {
paramsAsString.append(',');
}
paramsAsString.append("\"" + (params[i] != null ? params[i].toString() : "null") + "\"");
}
preparedStatement.setObject(i + 1, params[i]);
}
return paramsAsString != null ? paramsAsString.toString() : null;
}
/**
* Execute an update query with the supplied parameters
*
* @param params
* @return
* @throws SQLException
*/
public int executeUpdate(Object... params) throws SQLException {
if (!isAsync) sqlTimings.startTiming();
try {
long time = 0;
if (Empire.debugPerformance) {
time = System.currentTimeMillis();
}
int returnValue = 0;
String paramsAsString;
try {
paramsAsString = prepareExecute(params);
returnValue = preparedStatement.executeUpdate();
} catch (Exception e) {
throw e;
}
if (Empire.debugPerformance) {
time = System.currentTimeMillis() - time;
if (time > 10) {
Util.logInfo("[DEBUG] " + (isAsync ? "ASYNC " : "") +
"executeUpdate(\"" + query + "\") with: (" + paramsAsString + ") took " + time + "ms");
}
}
return returnValue;
} catch (SQLException e) {
close();
throw e;
} finally {
if (!isAsync) sqlTimings.stopTiming();
}
}
/**
* Executes the prepared statement with the supplied parameters.
*
* @param params
* @return
* @throws SQLException
*/
public DbStatement execute(Object... params) throws SQLException {
if (!isAsync) sqlTimings.startTiming();
try {
long time = 0;
if (Empire.debugPerformance) {
time = System.currentTimeMillis();
}
String paramsAsString;
try {
paramsAsString = prepareExecute(params);
resultSet = preparedStatement.executeQuery();
} catch (Exception e) {
throw e;
}
sqlTimings.stopTiming();
if (Empire.debugPerformance) {
time = System.currentTimeMillis() - time;
if (time > 10) {
Util.logInfo("[DEBUG] " + (isAsync ? "ASYNC " : "") +
"execute(\"" + query + "\") with: (" + paramsAsString + ") took " + time + "ms");
}
}
resultSetMetaData = resultSet.getMetaData();
int numberOfColumns = resultSetMetaData.getColumnCount();
resultCols = new String[numberOfColumns];
// get the column names; column indexes start from 1
for (int i = 1; i < numberOfColumns + 1; i++) {
resultCols[i - 1] = resultSetMetaData.getColumnLabel(i);
}
} catch (SQLException e) {
close();
throw e;
} finally {
if (!isAsync) sqlTimings.stopTiming();
}
return this;
}
/**
* Gets the Id of last insert
*
* @return Long
*/
public Long getLastInsertId() throws SQLException {
if (preparedStatement == null) {
return null;
}
ResultSet genKeys = preparedStatement.getGeneratedKeys();
if (genKeys == null) {
return null;
}
Long result = null;
if (genKeys.next()) {
result = genKeys.getLong(1);
}
genKeys.close();
return result;
}
/**
* Gets all results as an array of DbRow
*
* @return
* @throws SQLException
*/
public ArrayList<DbRow> getResults() throws SQLException {
if (resultSet == null) {
return null;
}
ArrayList<DbRow> result = new ArrayList<>();
DbRow row;
while ((row = getNextRow()) != null) {
result.add(row);
}
return result;
}
/**
* Gets the next DbRow from the result set.
*
* @return DbRow containing a hashmap of the columns
* @throws SQLException
*/
public DbRow getNextRow() throws SQLException {
if (resultSet == null) {
return null;
}
ResultSet nextResultSet = getNextResultSet();
if (nextResultSet != null) {
DbRow row = new DbRow();
for (String col : resultCols) {
row.put(col, nextResultSet.getObject(col));
}
return row;
}
return null;
}
public <T> T getFirstColumn() throws SQLException {
ResultSet resultSet = getNextResultSet();
if (resultSet != null) {
return (T) resultSet.getObject(1);
}
return null;
}
/**
* Util method to get the next result set and close it when done.
*
* @return
* @throws SQLException
*/
protected ResultSet getNextResultSet() throws SQLException {
if (resultSet != null && resultSet.next()) {
return resultSet;
} else {
resultSet.close();
resultSet = null;
return null;
}
}
/**
* Closes all resources associated with this statement and returns the connection to the pool.
*/
public void close() {
try {
if (resultSet != null) {
resultSet.close();
}
} catch (SQLException ex) {
Util.printException("Failed to close ResultSet: " + query, ex);
}
try {
if (preparedStatement != null) {
preparedStatement.close();
}
} catch (SQLException ex) {
Util.printException("Failed to close Prepared Statement: " + query, ex);
}
for (DbStatement child : children) {
child.close();
}
try {
if (dbConn != null) {
dbConn.close();
}
} catch (SQLException ex) {
Util.printException("Failed to close DB connection: " + query, ex);
}
}
public boolean isClosed() throws SQLException {
return dbConn.isClosed();
}
}
package com.empireminecraft.systems.db;
import com.empireminecraft.Empire;
import com.empireminecraft.util.Util;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import com.mchange.v2.c3p0.DataSources;
import org.bukkit.Bukkit;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class EmpireDb {
static ComboPooledDataSource pooledDataSource;
/**
* Called in onDisable, destroys the Data source and nulls out references.
*/
public static void close() {
try {
DataSources.destroy(pooledDataSource);
pooledDataSource = null;
} catch (SQLException e) {
Util.printException("EmpireDb: Failed to destroy DataSource", e);
}
}
/**
* Called in onEnable, initializes the pool and configures it and opens the first connection to spawn the pool.
*/
public static void initialize() {
try {
pooledDataSource = new ComboPooledDataSource();
pooledDataSource.setDriverClass("com.mysql.jdbc.Driver");
pooledDataSource.setPreferredTestQuery("SELECT 1 ");
pooledDataSource.setTestConnectionOnCheckin(true);
pooledDataSource.setNumHelperThreads(5);
pooledDataSource.setJdbcUrl(Empire.databaseLoc);
pooledDataSource.setUser(Empire.databaseUser);
pooledDataSource.setPassword(Empire.databasePass);
pooledDataSource.setAcquireIncrement(5);
pooledDataSource.setInitialPoolSize(15);
pooledDataSource.setMinPoolSize(7);
pooledDataSource.setMaxPoolSize(15);
pooledDataSource.setCheckoutTimeout(15000);
//pooledDataSource.setUnreturnedConnectionTimeout(300);
pooledDataSource.setIdleConnectionTestPeriod(300);
pooledDataSource.setMaxIdleTimeExcessConnections(300);
pooledDataSource.setMaxStatements(100);
pooledDataSource.setMaxStatementsPerConnection(10);
pooledDataSource.getConnection().close();
} catch (Exception ex) {
pooledDataSource = null;
Util.printException("EmpireDB: Error creating Database Context", ex);
Bukkit.getServer().shutdown();
}
}
/**
* Initiates a new DbStatement and prepares the first query.
* <p/>
* YOU MUST MANUALLY CLOSE THIS STATEMENT IN A finally {} BLOCK!
*
* @param query
* @return
* @throws SQLException
*/
public static DbStatement query(String query) throws SQLException {
return (new DbStatement()).query(query);
}
/**
* Utility method to execute a query and retrieve the first row, then close statement.
* You should ensure result will only return 1 row for maximum performance.
*
* @param query The query to run
* @param params The parameters to execute the statement with
* @return DbRow of your results (HashMap with template return type)
* @throws SQLException
*/
public static DbRow getFirstRow(String query, Object... params) throws SQLException {
DbRow dbRow = null;
DbStatement statement = null;
try {
statement = EmpireDb.query(query).execute(params);
dbRow = statement.getNextRow();
} catch (SQLException e) {
if (statement != null) {
statement.close();
}
throw e;
}
if (statement != null) {
statement.close();
}
return dbRow;
}
/**
* Utility method to execute a query and retrieve the first column of the first row, then close statement.
* You should ensure result will only return 1 row for maximum performance.
*
* @param query The query to run
* @param params The parameters to execute the statement with
* @return DbRow of your results (HashMap with template return type)
* @throws SQLException
*/
public static <T> T getFirstColumn(String query, Object... params) throws SQLException {
Object result = null;
DbStatement statement = null;
try {
statement = EmpireDb.query(query).execute(params);
result = statement.getFirstColumn();
if (statement != null) {
statement.close();
}
return (T) result;
} catch (SQLException e) {
if (statement != null) {
statement.close();
}
throw e;
}
}
/**
* Utility method to execute a query and retrieve first column of all results, then close statement.
*
* Meant for single queries that will not use the statement multiple times.
* @param query
* @param params
* @param <T>
* @return
* @throws SQLException
*/
public static <T> List<T> getFirstColumnResults(String query, Object... params) throws SQLException {
List<T> dbRows = new ArrayList<>();
DbStatement statement = null;
T result;
try {
statement = EmpireDb.query(query).execute(params);
while ((result = statement.getFirstColumn()) != null) {
dbRows.add(result);
}
} catch (SQLException e) {
if (statement != null) {
statement.close();
}
throw e;
}
if (statement != null) {
statement.close();
}
return dbRows;
}
/**
* Utility method to execute a query and retrieve all results, then close statement.
*
* Meant for single queries that will not use the statement multiple times.
*
* @param query The query to run
* @param params The parameters to execute the statement with
* @return List of DbRow of your results (HashMap with template return type)
* @throws SQLException
*/
public static List<DbRow> getResults(String query, Object... params) throws SQLException {
List<DbRow> dbRows = null;
DbStatement statement = null;
try {
statement = EmpireDb.query(query).execute(params);
dbRows = statement.getResults();
} catch (SQLException e) {
if (statement != null) {
statement.close();
}
throw e;
}
if (statement != null) {
statement.close();
}
return dbRows;
}
/**
* Utility method for executing an update synchronously, and then close the statement.
*
* @param query Query to run
* @param params Params to execute the statement with.
* @return Number of rows modified.
* @throws SQLException
*/
public static int executeUpdate(String query, Object... params) throws SQLException {
DbStatement statement = null;
int ret = 0;
try {
statement = EmpireDb.query(query);
ret = statement.executeUpdate(params);
} catch (SQLException e) {
if (statement != null) {
statement.close();
}
throw e;
}
if (statement != null) {
statement.close();
}
return ret;
}
/**
* Utility method to execute an update statement asynchronously and close the connection.
*
* @param query Query to run
* @param params Params to execute the update with
*/
public static void executeUpdateAsync(String query, final Object... params) {
new AsyncDbStatement(query) {
@Override
public void run(DbStatement statement) throws SQLException {
statement.executeUpdate(params);
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment