Skip to content

Instantly share code, notes, and snippets.

@Marenz
Created November 10, 2016 17:25
Show Gist options
  • Save Marenz/369da81853e5e3933d25df2cffd99c85 to your computer and use it in GitHub Desktop.
Save Marenz/369da81853e5e3933d25df2cffd99c85 to your computer and use it in GitHub Desktop.
/+
Expected usage:
auto manager = new MySqlConnectionManager(..);
{
// Suspends the fiber if no connection available
// Will automatically free the connection when the scope is left
auto con = manager.get();
con.sql = "SELECT 1;";
con.execSQL() // or any other variation of execSQL
}
+/
module MySqlConnectionManager;
public import mysql.connection;
public import mysql.db;
public import util.ScopeBuffer;
/*******************************************************************************
Simple escape function for dangerous SQL characters
Params:
input = string to escape
buffer = buffer to use for the output
*******************************************************************************/
void mysql_escape ( Buffer, Input ) ( Input input, Buffer buffer )
{
import std.string : translate;
immutable string[dchar] transTable = [
'\\' : "\\\\",
'\'' : "\\'",
'\0' : "\\0",
'\n' : "\\n",
'\r' : "\\r",
'"' : "\\\"",
'\032' : "\\Z"
];
translate(input, transTable, null, buffer);
}
unittest
{
auto sql = scopeBuffer!(char, 512)();
mysql_escape("123456", sql.share);
assert(sql.length > 0);
assert(sql[] == "123456");
}
/*******************************************************************************
Struct to wrap around a string so it can be passed to formattedWrite and be
properly escaped all using the buffer that formattedWrite provides.
*******************************************************************************/
struct MysqlEscape ( Input )
{
Input input;
const void toString ( scope void delegate(const(char)[]) sink )
{
struct SinkOutputRange
{
void put ( const(char)[] t ) { sink(t); }
}
SinkOutputRange r;
mysql_escape(input, r);
}
}
/*******************************************************************************
Helper function to easily construct a escape wrapper struct
*******************************************************************************/
MysqlEscape!(T) mysqlEscape ( T ) ( T input )
{
return MysqlEscape!(T)(input);
}
unittest
{
auto buf = scopeBuffer!(char, 512);
import std.format : formattedWrite;
formattedWrite(buf.share, "%s, %s, %s, mkay?", 1, 2,
mysqlEscape("\0, \r, \n, \", \\"));
assert(buf[] == `1, 2, \0, \r, \n, \", \\, mkay?`);
}
alias MySqlConnection = MySqlConnectionManager.ScopedSqlConnection;
class MySqlConnectionManager
{
import vibe.core.sync;
import util.logging;
mixin Logging!"sql";
struct ScopedSqlConnection
{
Connection c;
MySqlConnectionManager u;
Object user;
string file;
uint line;
public Command cmd;
alias cmd this;
/***********************************************************************
This struct must be initialized with the proper parameters
***********************************************************************/
@disable this();
/***********************************************************************
Prevent copying of this instance (we don't want a connection freed
several times)
***********************************************************************/
@disable this(this);
/***********************************************************************
Constructor
Params:
u = connection manager
c = connection that should be made available again
file = file of the call that requested this object
line = line of the call that requested this object
incr = whether we increase the used connection count (should
only be false if it is being decreased from the outside)
***********************************************************************/
private this ( MySqlConnectionManager u, Connection c, Object user,
string file, uint line, bool incr = true )
{
this.c = c;
this.u = u;
this.user = user;
this.cmd = Command(c);
this.file = file;
this.line = line;
if ( incr )
{
auto ucon = user in u.used_connections;
if ( ucon is null )
{
u.used_connections[user] = UsedConnection(0, c);
ucon = user in u.used_connections;
}
ucon.num++;
}
}
~this ( )
{
if (this.rowsPending())
{
this.getNextRow();
if (this.rowsPending())
{
import std.format;
throw new Error(format(
"Not all rows were read by caller: %s:%s",
this.file, this.line));
}
}
auto ucon = user in this.u.used_connections;
ucon.num--;
if (ucon.num == 0)
{
this.u.used_connections.remove(user);
this.u.sql_connections.assumeSafeAppend() ~= this.c;
this.u.sql_con_event.emit();
}
}
/***********************************************************************
Returns the last inserted id
***********************************************************************/
public ulong last_id ( ) @property
{
ulong stat_id;
this.sql = "SELECT LAST_INSERT_ID();";
this.execSQL();
auto row = this.getNextRow();
stat_id = row[0].get!ulong;
this.getNextRow(); // get the terminating packet
return stat_id;
}
}
/***************************************************************************
Max amount of connections we want to use
***************************************************************************/
private size_t max_connections;
/***************************************************************************
Sql connection pool
***************************************************************************/
private Connection[] sql_connections;
/***************************************************************************
Used connections and their users
***************************************************************************/
struct UsedConnection { size_t num; Connection con; }
private UsedConnection[Object] used_connections;
/***************************************************************************
Event triggered when an SQL connection becomes available
***************************************************************************/
private ManualEvent sql_con_event;
/***************************************************************************
MySQL Connection string
***************************************************************************/
string connection_string;
this ( )
{
this.sql_con_event = createManualEvent();
}
/***************************************************************************
Sets the configuration settings on how to connect to a server
Params:
host = sql host
port = sql port
user = sql user
pwd = sql password
db = sql database
max_connections = max amount of connections to use
***************************************************************************/
void setConfig ( string host, string port, string user, string pwd,
string db, size_t max_connections )
{
this.connection_string = "host=" ~ host ~ ";" ~
"port=" ~ port ~ ";" ~
"user=" ~ user ~ ";" ~
"pwd=" ~ pwd ~ ";" ~
"db=" ~ db ;
this.max_connections = max_connections;
}
/***************************************************************************
Returns & reserves an SQL connection. If no free connection is available
the fiber will be suspended until one is free again.
***************************************************************************/
public ScopedSqlConnection get ( string file = __FILE__, uint line = __LINE__ )
{
import vibe.core.task;
Object user = Task.getThis().fiber;
if (auto ucon = user in this.used_connections)
{
return ScopedSqlConnection(this, ucon.con, user, file, line);
}
auto initial_emit_count = sql_con_event.emitCount;
while ( this.sql_connections.length == 0 )
{
if ( this.used_connections.length < this.max_connections )
try
{
logger.info("Creating new SQL connection ...");
this.used_connections[user] = UsedConnection();
scope(failure)
this.used_connections.remove(user);
auto ucon = user in this.used_connections;
ucon.con = new Connection(this.connection_string,
defaultClientFlags | SvrCapFlags.FOUND_NOT_AFFECTED);
ucon.num = 1;
return ScopedSqlConnection(this, ucon.con, user, file, line, false);
}
catch ( Exception e )
{
logger.warningf("Failed to create SQL connection: %s %s %s",
e.msg, e.file, e.line);
}
else
{
import core.time;
sql_con_event.wait(500.msecs, sql_con_event.emitCount);
}
}
auto con = this.sql_connections[$-1];
this.sql_connections.length--;
return ScopedSqlConnection(this, con, user, file, line);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment