Created
December 2, 2010 19:56
-
-
Save Emtec/725939 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| diff --git a/src/shared/Database/DatabaseMysql.cpp b/src/shared/Database/DatabaseMysql.cpp | |
| index 2b555d8..ca5a1f8 100644 | |
| --- a/src/shared/Database/DatabaseMysql.cpp | |
| +++ b/src/shared/Database/DatabaseMysql.cpp | |
| @@ -279,6 +279,7 @@ bool DatabaseMysql::Execute(const char *sql) | |
| // don't use queued execution if it has not been initialized | |
| if (!m_threadBody) return DirectExecute(sql); | |
| + nMutex.acquire(); | |
| tranThread = ACE_Based::Thread::current(); // owner of this transaction | |
| TransactionQueues::iterator i = m_tranQueues.find(tranThread); | |
| if (i != m_tranQueues.end() && i->second != NULL) | |
| @@ -291,6 +292,7 @@ bool DatabaseMysql::Execute(const char *sql) | |
| m_threadBody->Delay(new SqlStatement(sql)); | |
| } | |
| + nMutex.release(); | |
| return true; | |
| } | |
| @@ -356,6 +358,7 @@ bool DatabaseMysql::BeginTransaction() | |
| return true; // transaction started | |
| } | |
| + nMutex.acquire(); | |
| tranThread = ACE_Based::Thread::current(); // owner of this transaction | |
| TransactionQueues::iterator i = m_tranQueues.find(tranThread); | |
| if (i != m_tranQueues.end() && i->second != NULL) | |
| @@ -365,6 +368,7 @@ bool DatabaseMysql::BeginTransaction() | |
| m_tranQueues[tranThread] = new SqlTransaction(); | |
| + nMutex.release(); | |
| return true; | |
| } | |
| @@ -373,27 +377,31 @@ bool DatabaseMysql::CommitTransaction() | |
| if (!mMysql) | |
| return false; | |
| + bool _res = false; | |
| + | |
| // don't use queued execution if it has not been initialized | |
| if (!m_threadBody) | |
| { | |
| if (tranThread != ACE_Based::Thread::current()) | |
| return false; | |
| - bool _res = _TransactionCmd("COMMIT"); | |
| + _res = _TransactionCmd("COMMIT"); | |
| tranThread = NULL; | |
| mMutex.release(); | |
| return _res; | |
| } | |
| + nMutex.acquire(); | |
| tranThread = ACE_Based::Thread::current(); | |
| TransactionQueues::iterator i = m_tranQueues.find(tranThread); | |
| if (i != m_tranQueues.end() && i->second != NULL) | |
| { | |
| m_threadBody->Delay(i->second); | |
| - i->second = NULL; | |
| - return true; | |
| + m_tranQueues.erase(i); | |
| + _res = true; | |
| } | |
| - else | |
| - return false; | |
| + | |
| + nMutex.release(); | |
| + return _res; | |
| } | |
| bool DatabaseMysql::RollbackTransaction() | |
| @@ -412,13 +420,16 @@ bool DatabaseMysql::RollbackTransaction() | |
| return _res; | |
| } | |
| + nMutex.acquire(); | |
| tranThread = ACE_Based::Thread::current(); | |
| TransactionQueues::iterator i = m_tranQueues.find(tranThread); | |
| if (i != m_tranQueues.end() && i->second != NULL) | |
| { | |
| delete i->second; | |
| i->second = NULL; | |
| + m_tranQueues.erase(i); | |
| } | |
| + nMutex.release(); | |
| return true; | |
| } | |
| diff --git a/src/shared/Database/DatabaseMysql.h b/src/shared/Database/DatabaseMysql.h | |
| index 43baddf..6d27f5a 100644 | |
| --- a/src/shared/Database/DatabaseMysql.h | |
| +++ b/src/shared/Database/DatabaseMysql.h | |
| @@ -65,7 +65,8 @@ class MANGOS_DLL_SPEC DatabaseMysql : public Database | |
| // must be call before finish thread run | |
| void ThreadEnd(); | |
| private: | |
| - ACE_Thread_Mutex mMutex; | |
| + ACE_Thread_Mutex mMutex; // For thread safe operations between core and mySQL server | |
| + ACE_Thread_Mutex nMutex; // For thread safe operations on m_transQueues | |
| ACE_Based::Thread * tranThread; | |
| diff --git a/src/shared/Database/SqlDelayThread.cpp b/src/shared/Database/SqlDelayThread.cpp | |
| index e131100..115f296 100644 | |
| --- a/src/shared/Database/SqlDelayThread.cpp | |
| +++ b/src/shared/Database/SqlDelayThread.cpp | |
| @@ -30,21 +30,22 @@ void SqlDelayThread::run() | |
| mysql_thread_init(); | |
| #endif | |
| - const uint32 loopSleepms = 10; | |
| + //lets wait for next async task no more than 2 secs... | |
| + ACE_Time_Value _time(2); | |
| + const uint32 loopSleepms = 10; | |
| const uint32 pingEveryLoop = m_dbEngine->GetPingIntervall()/loopSleepms; | |
| - | |
| uint32 loopCounter = 0; | |
| while (m_running) | |
| { | |
| // if the running state gets turned off while sleeping | |
| // empty the queue before exiting | |
| - ACE_Based::Thread::Sleep(loopSleepms); | |
| - SqlOperation* s = NULL; | |
| - while (m_sqlQueue.next(s)) | |
| + ACE_Based::Thread::Sleep(loopSleepms); | |
| + SqlAsyncTask * s = dynamic_cast<SqlAsyncTask*> (m_sqlQueue.dequeue(/*&_time*/)); | |
| + if(s) | |
| { | |
| - s->Execute(m_dbEngine); | |
| + s->call(); | |
| delete s; | |
| } | |
| if((loopCounter++) >= pingEveryLoop) | |
| @@ -62,4 +63,11 @@ void SqlDelayThread::run() | |
| void SqlDelayThread::Stop() | |
| { | |
| m_running = false; | |
| + m_sqlQueue.queue()->deactivate(); | |
| +} | |
| + | |
| +bool SqlDelayThread::Delay(SqlOperation* sql) | |
| +{ | |
| + int res = m_sqlQueue.enqueue(new SqlAsyncTask(m_dbEngine, sql)); | |
| + return (res != -1); | |
| } | |
| diff --git a/src/shared/Database/SqlDelayThread.h b/src/shared/Database/SqlDelayThread.h | |
| index 0b3e633..0d21034 100644 | |
| --- a/src/shared/Database/SqlDelayThread.h | |
| +++ b/src/shared/Database/SqlDelayThread.h | |
| @@ -20,7 +20,7 @@ | |
| #define __SQLDELAYTHREAD_H | |
| #include "ace/Thread_Mutex.h" | |
| -#include "LockedQueue.h" | |
| +#include "ace/Activation_Queue.h" | |
| #include "Threading.h" | |
| @@ -29,7 +29,7 @@ class SqlOperation; | |
| class SqlDelayThread : public ACE_Based::Runnable | |
| { | |
| - typedef ACE_Based::LockedQueue<SqlOperation*, ACE_Thread_Mutex> SqlQueue; | |
| + typedef ACE_Activation_Queue SqlQueue; | |
| private: | |
| SqlQueue m_sqlQueue; ///< Queue of SQL statements | |
| @@ -41,7 +41,7 @@ class SqlDelayThread : public ACE_Based::Runnable | |
| SqlDelayThread(Database* db); | |
| ///< Put sql statement to delay queue | |
| - bool Delay(SqlOperation* sql) { m_sqlQueue.add(sql); return true; } | |
| + bool Delay(SqlOperation* sql); | |
| virtual void Stop(); ///< Stop event | |
| virtual void run(); ///< Main Thread loop | |
| diff --git a/src/shared/Database/SqlOperations.cpp b/src/shared/Database/SqlOperations.cpp | |
| index 25933a3..04a25aa 100644 | |
| --- a/src/shared/Database/SqlOperations.cpp | |
| +++ b/src/shared/Database/SqlOperations.cpp | |
| @@ -31,29 +31,36 @@ void SqlStatement::Execute(Database *db) | |
| void SqlTransaction::Execute(Database *db) | |
| { | |
| + m_Mutex.acquire(); | |
| if(m_queue.empty()) | |
| + { | |
| + m_Mutex.release(); | |
| return; | |
| + } | |
| db->DirectExecute("START TRANSACTION"); | |
| while(!m_queue.empty()) | |
| { | |
| char *sql = const_cast<char*>(m_queue.front()); | |
| - m_queue.pop(); | |
| if(!db->DirectExecute(sql)) | |
| { | |
| delete [] sql; | |
| + m_queue.pop(); | |
| db->DirectExecute("ROLLBACK"); | |
| while(!m_queue.empty()) | |
| { | |
| delete [] (const_cast<char*>(m_queue.front())); | |
| m_queue.pop(); | |
| } | |
| + m_Mutex.release(); | |
| return; | |
| } | |
| delete [] sql; | |
| + m_queue.pop(); | |
| } | |
| db->DirectExecute("COMMIT"); | |
| + m_Mutex.release(); | |
| } | |
| /// ---- ASYNC QUERIES ---- | |
| diff --git a/src/shared/Database/SqlOperations.h b/src/shared/Database/SqlOperations.h | |
| index fa9437c..4a9e831 100644 | |
| --- a/src/shared/Database/SqlOperations.h | |
| +++ b/src/shared/Database/SqlOperations.h | |
| @@ -22,6 +22,7 @@ | |
| #include "Common.h" | |
| #include "ace/Thread_Mutex.h" | |
| +#include "ace/Method_Request.h" | |
| #include "LockedQueue.h" | |
| #include <queue> | |
| #include "Utilities/Callback.h" | |
| @@ -55,9 +56,17 @@ class SqlTransaction : public SqlOperation | |
| { | |
| private: | |
| std::queue<const char *> m_queue; | |
| + ACE_Thread_Mutex m_Mutex; | |
| public: | |
| SqlTransaction() {} | |
| - void DelayExecute(const char *sql) { m_queue.push(mangos_strdup(sql)); } | |
| + void DelayExecute(const char *sql) | |
| + { | |
| + m_Mutex.acquire(); | |
| + char* _sql = mangos_strdup(sql); | |
| + if (_sql) | |
| + m_queue.push(_sql); | |
| + m_Mutex.release(); | |
| + } | |
| void Execute(Database *db); | |
| }; | |
| @@ -117,4 +126,30 @@ class SqlQueryHolderEx : public SqlOperation | |
| : m_holder(holder), m_callback(callback), m_queue(queue) {} | |
| void Execute(Database *db); | |
| }; | |
| + | |
| +class SqlAsyncTask : public ACE_Method_Request | |
| +{ | |
| + public: | |
| + SqlAsyncTask(Database * db, SqlOperation * op) : m_db(db), m_op(op){} | |
| + ~SqlAsyncTask() | |
| + { | |
| + if (!m_op) | |
| + return; | |
| + | |
| + delete m_op; | |
| + m_op = NULL; | |
| + } | |
| + int call() | |
| + { | |
| + if(this == NULL || !m_db || !m_op) | |
| + return -1; | |
| + | |
| + m_op->Execute(m_db); | |
| + return 0; | |
| + } | |
| + private: | |
| + Database * m_db; | |
| + SqlOperation * m_op; | |
| +}; | |
| + | |
| #endif //__SQLOPERATIONS_H | |
| diff --git a/src/shared/LockedQueue.h b/src/shared/LockedQueue.h | |
| index b3adb13..e31d389 100644 | |
| --- a/src/shared/LockedQueue.h | |
| +++ b/src/shared/LockedQueue.h | |
| @@ -68,7 +68,8 @@ namespace ACE_Based | |
| //! Gets the next result in the queue, if any. | |
| bool next(T& result) | |
| { | |
| - ACE_Guard<LockType> g(this->_lock); | |
| + //ACE_Guard<LockType> g(this->_lock); | |
| + ACE_GUARD_RETURN (LockType, g, this->_lock, false); | |
| if (_queue.empty()) | |
| return false; | |
| @@ -121,6 +122,20 @@ namespace ACE_Based | |
| { | |
| this->_lock.release(); | |
| } | |
| + | |
| + ///! Calls pop_front of the queue | |
| + void pop_front() | |
| + { | |
| + ACE_GUARD (LockType, g, this->_lock); | |
| + _queue.pop_front(); | |
| + } | |
| + | |
| + ///! Checks if we're empty or not with locks held | |
| + bool empty() | |
| + { | |
| + ACE_GUARD_RETURN (LockType, g, this->_lock, false); | |
| + return _queue.empty(); | |
| + } | |
| }; | |
| } | |
| #endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment