Created
November 10, 2016 17:25
-
-
Save Marenz/369da81853e5e3933d25df2cffd99c85 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/+ | |
Expected usage: | |
auto manager = new MySqlConnectionManager(..); | |
{ | |
// Suspends the fiber if no connection available | |
// Will automatically free the connection when the scope is left | |
auto con = manager.get(); | |
con.sql = "SELECT 1;"; | |
con.execSQL() // or any other variation of execSQL | |
} | |
+/ | |
module MySqlConnectionManager; | |
public import mysql.connection; | |
public import mysql.db; | |
public import util.ScopeBuffer; | |
/******************************************************************************* | |
Simple escape function for dangerous SQL characters | |
Params: | |
input = string to escape | |
buffer = buffer to use for the output | |
*******************************************************************************/ | |
void mysql_escape ( Buffer, Input ) ( Input input, Buffer buffer ) | |
{ | |
import std.string : translate; | |
immutable string[dchar] transTable = [ | |
'\\' : "\\\\", | |
'\'' : "\\'", | |
'\0' : "\\0", | |
'\n' : "\\n", | |
'\r' : "\\r", | |
'"' : "\\\"", | |
'\032' : "\\Z" | |
]; | |
translate(input, transTable, null, buffer); | |
} | |
unittest | |
{ | |
auto sql = scopeBuffer!(char, 512)(); | |
mysql_escape("123456", sql.share); | |
assert(sql.length > 0); | |
assert(sql[] == "123456"); | |
} | |
/******************************************************************************* | |
Struct to wrap around a string so it can be passed to formattedWrite and be | |
properly escaped all using the buffer that formattedWrite provides. | |
*******************************************************************************/ | |
struct MysqlEscape ( Input ) | |
{ | |
Input input; | |
const void toString ( scope void delegate(const(char)[]) sink ) | |
{ | |
struct SinkOutputRange | |
{ | |
void put ( const(char)[] t ) { sink(t); } | |
} | |
SinkOutputRange r; | |
mysql_escape(input, r); | |
} | |
} | |
/******************************************************************************* | |
Helper function to easily construct a escape wrapper struct | |
*******************************************************************************/ | |
MysqlEscape!(T) mysqlEscape ( T ) ( T input ) | |
{ | |
return MysqlEscape!(T)(input); | |
} | |
unittest | |
{ | |
auto buf = scopeBuffer!(char, 512); | |
import std.format : formattedWrite; | |
formattedWrite(buf.share, "%s, %s, %s, mkay?", 1, 2, | |
mysqlEscape("\0, \r, \n, \", \\")); | |
assert(buf[] == `1, 2, \0, \r, \n, \", \\, mkay?`); | |
} | |
alias MySqlConnection = MySqlConnectionManager.ScopedSqlConnection; | |
class MySqlConnectionManager | |
{ | |
import vibe.core.sync; | |
import util.logging; | |
mixin Logging!"sql"; | |
struct ScopedSqlConnection | |
{ | |
Connection c; | |
MySqlConnectionManager u; | |
Object user; | |
string file; | |
uint line; | |
public Command cmd; | |
alias cmd this; | |
/*********************************************************************** | |
This struct must be initialized with the proper parameters | |
***********************************************************************/ | |
@disable this(); | |
/*********************************************************************** | |
Prevent copying of this instance (we don't want a connection freed | |
several times) | |
***********************************************************************/ | |
@disable this(this); | |
/*********************************************************************** | |
Constructor | |
Params: | |
u = connection manager | |
c = connection that should be made available again | |
file = file of the call that requested this object | |
line = line of the call that requested this object | |
incr = whether we increase the used connection count (should | |
only be false if it is being decreased from the outside) | |
***********************************************************************/ | |
private this ( MySqlConnectionManager u, Connection c, Object user, | |
string file, uint line, bool incr = true ) | |
{ | |
this.c = c; | |
this.u = u; | |
this.user = user; | |
this.cmd = Command(c); | |
this.file = file; | |
this.line = line; | |
if ( incr ) | |
{ | |
auto ucon = user in u.used_connections; | |
if ( ucon is null ) | |
{ | |
u.used_connections[user] = UsedConnection(0, c); | |
ucon = user in u.used_connections; | |
} | |
ucon.num++; | |
} | |
} | |
~this ( ) | |
{ | |
if (this.rowsPending()) | |
{ | |
this.getNextRow(); | |
if (this.rowsPending()) | |
{ | |
import std.format; | |
throw new Error(format( | |
"Not all rows were read by caller: %s:%s", | |
this.file, this.line)); | |
} | |
} | |
auto ucon = user in this.u.used_connections; | |
ucon.num--; | |
if (ucon.num == 0) | |
{ | |
this.u.used_connections.remove(user); | |
this.u.sql_connections.assumeSafeAppend() ~= this.c; | |
this.u.sql_con_event.emit(); | |
} | |
} | |
/*********************************************************************** | |
Returns the last inserted id | |
***********************************************************************/ | |
public ulong last_id ( ) @property | |
{ | |
ulong stat_id; | |
this.sql = "SELECT LAST_INSERT_ID();"; | |
this.execSQL(); | |
auto row = this.getNextRow(); | |
stat_id = row[0].get!ulong; | |
this.getNextRow(); // get the terminating packet | |
return stat_id; | |
} | |
} | |
/*************************************************************************** | |
Max amount of connections we want to use | |
***************************************************************************/ | |
private size_t max_connections; | |
/*************************************************************************** | |
Sql connection pool | |
***************************************************************************/ | |
private Connection[] sql_connections; | |
/*************************************************************************** | |
Used connections and their users | |
***************************************************************************/ | |
struct UsedConnection { size_t num; Connection con; } | |
private UsedConnection[Object] used_connections; | |
/*************************************************************************** | |
Event triggered when an SQL connection becomes available | |
***************************************************************************/ | |
private ManualEvent sql_con_event; | |
/*************************************************************************** | |
MySQL Connection string | |
***************************************************************************/ | |
string connection_string; | |
this ( ) | |
{ | |
this.sql_con_event = createManualEvent(); | |
} | |
/*************************************************************************** | |
Sets the configuration settings on how to connect to a server | |
Params: | |
host = sql host | |
port = sql port | |
user = sql user | |
pwd = sql password | |
db = sql database | |
max_connections = max amount of connections to use | |
***************************************************************************/ | |
void setConfig ( string host, string port, string user, string pwd, | |
string db, size_t max_connections ) | |
{ | |
this.connection_string = "host=" ~ host ~ ";" ~ | |
"port=" ~ port ~ ";" ~ | |
"user=" ~ user ~ ";" ~ | |
"pwd=" ~ pwd ~ ";" ~ | |
"db=" ~ db ; | |
this.max_connections = max_connections; | |
} | |
/*************************************************************************** | |
Returns & reserves an SQL connection. If no free connection is available | |
the fiber will be suspended until one is free again. | |
***************************************************************************/ | |
public ScopedSqlConnection get ( string file = __FILE__, uint line = __LINE__ ) | |
{ | |
import vibe.core.task; | |
Object user = Task.getThis().fiber; | |
if (auto ucon = user in this.used_connections) | |
{ | |
return ScopedSqlConnection(this, ucon.con, user, file, line); | |
} | |
auto initial_emit_count = sql_con_event.emitCount; | |
while ( this.sql_connections.length == 0 ) | |
{ | |
if ( this.used_connections.length < this.max_connections ) | |
try | |
{ | |
logger.info("Creating new SQL connection ..."); | |
this.used_connections[user] = UsedConnection(); | |
scope(failure) | |
this.used_connections.remove(user); | |
auto ucon = user in this.used_connections; | |
ucon.con = new Connection(this.connection_string, | |
defaultClientFlags | SvrCapFlags.FOUND_NOT_AFFECTED); | |
ucon.num = 1; | |
return ScopedSqlConnection(this, ucon.con, user, file, line, false); | |
} | |
catch ( Exception e ) | |
{ | |
logger.warningf("Failed to create SQL connection: %s %s %s", | |
e.msg, e.file, e.line); | |
} | |
else | |
{ | |
import core.time; | |
sql_con_event.wait(500.msecs, sql_con_event.emitCount); | |
} | |
} | |
auto con = this.sql_connections[$-1]; | |
this.sql_connections.length--; | |
return ScopedSqlConnection(this, con, user, file, line); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment