Skip to content

Instantly share code, notes, and snippets.

@Emtec
Created December 2, 2010 19:56
Show Gist options
  • Save Emtec/725939 to your computer and use it in GitHub Desktop.
Save Emtec/725939 to your computer and use it in GitHub Desktop.
diff --git a/src/shared/Database/DatabaseMysql.cpp b/src/shared/Database/DatabaseMysql.cpp
index 2b555d8..ca5a1f8 100644
--- a/src/shared/Database/DatabaseMysql.cpp
+++ b/src/shared/Database/DatabaseMysql.cpp
@@ -279,6 +279,7 @@ bool DatabaseMysql::Execute(const char *sql)
// don't use queued execution if it has not been initialized
if (!m_threadBody) return DirectExecute(sql);
+ nMutex.acquire();
tranThread = ACE_Based::Thread::current(); // owner of this transaction
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
@@ -291,6 +292,7 @@ bool DatabaseMysql::Execute(const char *sql)
m_threadBody->Delay(new SqlStatement(sql));
}
+ nMutex.release();
return true;
}
@@ -356,6 +358,7 @@ bool DatabaseMysql::BeginTransaction()
return true; // transaction started
}
+ nMutex.acquire();
tranThread = ACE_Based::Thread::current(); // owner of this transaction
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
@@ -365,6 +368,7 @@ bool DatabaseMysql::BeginTransaction()
m_tranQueues[tranThread] = new SqlTransaction();
+ nMutex.release();
return true;
}
@@ -373,27 +377,31 @@ bool DatabaseMysql::CommitTransaction()
if (!mMysql)
return false;
+ bool _res = false;
+
// don't use queued execution if it has not been initialized
if (!m_threadBody)
{
if (tranThread != ACE_Based::Thread::current())
return false;
- bool _res = _TransactionCmd("COMMIT");
+ _res = _TransactionCmd("COMMIT");
tranThread = NULL;
mMutex.release();
return _res;
}
+ nMutex.acquire();
tranThread = ACE_Based::Thread::current();
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
{
m_threadBody->Delay(i->second);
- i->second = NULL;
- return true;
+ m_tranQueues.erase(i);
+ _res = true;
}
- else
- return false;
+
+ nMutex.release();
+ return _res;
}
bool DatabaseMysql::RollbackTransaction()
@@ -412,13 +420,16 @@ bool DatabaseMysql::RollbackTransaction()
return _res;
}
+ nMutex.acquire();
tranThread = ACE_Based::Thread::current();
TransactionQueues::iterator i = m_tranQueues.find(tranThread);
if (i != m_tranQueues.end() && i->second != NULL)
{
delete i->second;
i->second = NULL;
+ m_tranQueues.erase(i);
}
+ nMutex.release();
return true;
}
diff --git a/src/shared/Database/DatabaseMysql.h b/src/shared/Database/DatabaseMysql.h
index 43baddf..6d27f5a 100644
--- a/src/shared/Database/DatabaseMysql.h
+++ b/src/shared/Database/DatabaseMysql.h
@@ -65,7 +65,8 @@ class MANGOS_DLL_SPEC DatabaseMysql : public Database
// must be call before finish thread run
void ThreadEnd();
private:
- ACE_Thread_Mutex mMutex;
+ ACE_Thread_Mutex mMutex; // For thread safe operations between core and mySQL server
+ ACE_Thread_Mutex nMutex; // For thread safe operations on m_transQueues
ACE_Based::Thread * tranThread;
diff --git a/src/shared/Database/SqlDelayThread.cpp b/src/shared/Database/SqlDelayThread.cpp
index e131100..115f296 100644
--- a/src/shared/Database/SqlDelayThread.cpp
+++ b/src/shared/Database/SqlDelayThread.cpp
@@ -30,21 +30,22 @@ void SqlDelayThread::run()
mysql_thread_init();
#endif
- const uint32 loopSleepms = 10;
+ //lets wait for next async task no more than 2 secs...
+ ACE_Time_Value _time(2);
+ const uint32 loopSleepms = 10;
const uint32 pingEveryLoop = m_dbEngine->GetPingIntervall()/loopSleepms;
-
uint32 loopCounter = 0;
while (m_running)
{
// if the running state gets turned off while sleeping
// empty the queue before exiting
- ACE_Based::Thread::Sleep(loopSleepms);
- SqlOperation* s = NULL;
- while (m_sqlQueue.next(s))
+ ACE_Based::Thread::Sleep(loopSleepms);
+ SqlAsyncTask * s = dynamic_cast<SqlAsyncTask*> (m_sqlQueue.dequeue(/*&_time*/));
+ if(s)
{
- s->Execute(m_dbEngine);
+ s->call();
delete s;
}
if((loopCounter++) >= pingEveryLoop)
@@ -62,4 +63,11 @@ void SqlDelayThread::run()
void SqlDelayThread::Stop()
{
m_running = false;
+ m_sqlQueue.queue()->deactivate();
+}
+
+bool SqlDelayThread::Delay(SqlOperation* sql)
+{
+ int res = m_sqlQueue.enqueue(new SqlAsyncTask(m_dbEngine, sql));
+ return (res != -1);
}
diff --git a/src/shared/Database/SqlDelayThread.h b/src/shared/Database/SqlDelayThread.h
index 0b3e633..0d21034 100644
--- a/src/shared/Database/SqlDelayThread.h
+++ b/src/shared/Database/SqlDelayThread.h
@@ -20,7 +20,7 @@
#define __SQLDELAYTHREAD_H
#include "ace/Thread_Mutex.h"
-#include "LockedQueue.h"
+#include "ace/Activation_Queue.h"
#include "Threading.h"
@@ -29,7 +29,7 @@ class SqlOperation;
class SqlDelayThread : public ACE_Based::Runnable
{
- typedef ACE_Based::LockedQueue<SqlOperation*, ACE_Thread_Mutex> SqlQueue;
+ typedef ACE_Activation_Queue SqlQueue;
private:
SqlQueue m_sqlQueue; ///< Queue of SQL statements
@@ -41,7 +41,7 @@ class SqlDelayThread : public ACE_Based::Runnable
SqlDelayThread(Database* db);
///< Put sql statement to delay queue
- bool Delay(SqlOperation* sql) { m_sqlQueue.add(sql); return true; }
+ bool Delay(SqlOperation* sql);
virtual void Stop(); ///< Stop event
virtual void run(); ///< Main Thread loop
diff --git a/src/shared/Database/SqlOperations.cpp b/src/shared/Database/SqlOperations.cpp
index 25933a3..04a25aa 100644
--- a/src/shared/Database/SqlOperations.cpp
+++ b/src/shared/Database/SqlOperations.cpp
@@ -31,29 +31,36 @@ void SqlStatement::Execute(Database *db)
void SqlTransaction::Execute(Database *db)
{
+ m_Mutex.acquire();
if(m_queue.empty())
+ {
+ m_Mutex.release();
return;
+ }
db->DirectExecute("START TRANSACTION");
while(!m_queue.empty())
{
char *sql = const_cast<char*>(m_queue.front());
- m_queue.pop();
if(!db->DirectExecute(sql))
{
delete [] sql;
+ m_queue.pop();
db->DirectExecute("ROLLBACK");
while(!m_queue.empty())
{
delete [] (const_cast<char*>(m_queue.front()));
m_queue.pop();
}
+ m_Mutex.release();
return;
}
delete [] sql;
+ m_queue.pop();
}
db->DirectExecute("COMMIT");
+ m_Mutex.release();
}
/// ---- ASYNC QUERIES ----
diff --git a/src/shared/Database/SqlOperations.h b/src/shared/Database/SqlOperations.h
index fa9437c..4a9e831 100644
--- a/src/shared/Database/SqlOperations.h
+++ b/src/shared/Database/SqlOperations.h
@@ -22,6 +22,7 @@
#include "Common.h"
#include "ace/Thread_Mutex.h"
+#include "ace/Method_Request.h"
#include "LockedQueue.h"
#include <queue>
#include "Utilities/Callback.h"
@@ -55,9 +56,17 @@ class SqlTransaction : public SqlOperation
{
private:
std::queue<const char *> m_queue;
+ ACE_Thread_Mutex m_Mutex;
public:
SqlTransaction() {}
- void DelayExecute(const char *sql) { m_queue.push(mangos_strdup(sql)); }
+ void DelayExecute(const char *sql)
+ {
+ m_Mutex.acquire();
+ char* _sql = mangos_strdup(sql);
+ if (_sql)
+ m_queue.push(_sql);
+ m_Mutex.release();
+ }
void Execute(Database *db);
};
@@ -117,4 +126,30 @@ class SqlQueryHolderEx : public SqlOperation
: m_holder(holder), m_callback(callback), m_queue(queue) {}
void Execute(Database *db);
};
+
+class SqlAsyncTask : public ACE_Method_Request
+{
+ public:
+ SqlAsyncTask(Database * db, SqlOperation * op) : m_db(db), m_op(op){}
+ ~SqlAsyncTask()
+ {
+ if (!m_op)
+ return;
+
+ delete m_op;
+ m_op = NULL;
+ }
+ int call()
+ {
+ if(this == NULL || !m_db || !m_op)
+ return -1;
+
+ m_op->Execute(m_db);
+ return 0;
+ }
+ private:
+ Database * m_db;
+ SqlOperation * m_op;
+};
+
#endif //__SQLOPERATIONS_H
diff --git a/src/shared/LockedQueue.h b/src/shared/LockedQueue.h
index b3adb13..e31d389 100644
--- a/src/shared/LockedQueue.h
+++ b/src/shared/LockedQueue.h
@@ -68,7 +68,8 @@ namespace ACE_Based
//! Gets the next result in the queue, if any.
bool next(T& result)
{
- ACE_Guard<LockType> g(this->_lock);
+ //ACE_Guard<LockType> g(this->_lock);
+ ACE_GUARD_RETURN (LockType, g, this->_lock, false);
if (_queue.empty())
return false;
@@ -121,6 +122,20 @@ namespace ACE_Based
{
this->_lock.release();
}
+
+ ///! Calls pop_front of the queue
+ void pop_front()
+ {
+ ACE_GUARD (LockType, g, this->_lock);
+ _queue.pop_front();
+ }
+
+ ///! Checks if we're empty or not with locks held
+ bool empty()
+ {
+ ACE_GUARD_RETURN (LockType, g, this->_lock, false);
+ return _queue.empty();
+ }
};
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment