Last active
November 3, 2022 10:16
-
-
Save GWRon/5de1cbe0ddcb8ca511ad7e7ecbb0aeed to your computer and use it in GitHub Desktop.
Test code for a command queue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SuperStrict | |
Framework Brl.StandardIO | |
Import Brl.Threads | |
Import Brl.ObjectList | |
Enum ECommandStatus | |
OPEN ' not yet run | |
RUNNING ' currently running | |
FINISHED ' finished execution | |
End Enum | |
Struct SCommandResult | |
Field status:Int | |
Field data:Object | |
Method New(status:Int, data:Object = Null) | |
Self.status = status | |
Self.data = data | |
End Method | |
End Struct | |
Type TCommand | |
Field status:ECommandStatus = ECommandStatus.OPEN | |
Field payload:Object | |
Field runCallback:SCommandResult(payload:Object) | |
Field resultCallback(result:SCommandResult) | |
Field result:SCommandResult | |
Field semaphore:TSemaphore | |
Method New(runCallback:SCommandResult(payload:Object), payload:Object, resultCallback(result:SCommandResult) = Null) | |
Self.payload = payload | |
Self.runCallback = runCallback | |
Self.resultCallback = resultCallback | |
End Method | |
'the customizable element any TCommand-extension could individualize | |
Method CustomRun() | |
End Method | |
Method Run:SCommandResult() | |
self.status = ECommandStatus.RUNNING | |
'execute a callback if defined | |
If Self.runCallback | |
result = Self.runCallback(payload) | |
EndIf | |
'execute a customized method (individualized in extending types) | |
CustomRun() | |
'inform potentially interested callback (eg an "onDidSomething()") | |
If Self.resultCallback | |
Self.resultCallback(result) | |
EndIf | |
self.status = ECommandStatus.FINISHED | |
Return result | |
End Method | |
Method IsFinished:Int() | |
Return self.status = ECommandStatus.FINISHED | |
End Method | |
Method WaitTillFinished:Int(timeOut:Int = -1) | |
If IsFinished() Then Return True | |
If Self.semaphore | |
Self.semaphore.TimedWait(timeOut) | |
EndIf | |
Return True | |
End Method | |
Method Serialize:String() | |
'enum to String etc | |
End Method | |
End Type | |
Type TCommandQueue | |
Private | |
Field activeList:TObjectList = New TObjectList | |
Field inactiveList:TObjectList = New TObjectList | |
Field listMutex:TMutex = CreateMutex() | |
Public | |
Method Run:SCommandResult(c:TCommand, timeOut:Int = 0) | |
LockMutex(listMutex) | |
activeList.AddLast(c) | |
'TODO: have pool and reuse semaphores? | |
c.semaphore = CreateSemaphore(0) | |
UnlockMutex(listMutex) | |
'now wait until the queue processed it | |
If timeOut > 0 | |
c.semaphore.TimedWait(timeOut) | |
Else | |
c.semaphore.Wait() | |
EndIf | |
Return c.result | |
End Method | |
Method Run(runCallback:SCommandResult(payload:Object), payLoad:Object, resultCallback(result:SCommandResult) = Null, timeOut:Int = 0) | |
Local c:TCommand = New TCommand(runCallback, payLoad, resultCallback) | |
Run(c, timeOut) | |
End Method | |
'add a command but do not wait for execution and results | |
Method RunDeferred(c:TCommand) | |
LockMutex(listMutex) | |
activeList.AddLast(c) | |
UnlockMutex(listMutex) | |
End Method | |
Method RunDeferred(runCallback:SCommandResult(payload:Object), payLoad:Object, resultCallback(result:SCommandResult) = Null) | |
Local c:TCommand = New TCommand(runCallback, payLoad, resultCallback) | |
RunDeferred(c) | |
EndMethod | |
'processes all enqueued commands | |
Method Process:Int() | |
'this - or assigning a thread and comparing against it | |
'which would allow to have command queues in specific child | |
'threads too | |
'If CurrentThread() <> MainThread() | |
' Throw "Only run TCommandQueue.Process from MainThread" | |
'EndIf | |
LockMutex(listMutex) | |
'switch active list so we can clear it right here ... | |
'allowing other threads to enqueue new commands while | |
Local queueSize:Int = activeList.Count() 'compacts array inside | |
If queueSize = 0 | |
UnlockMutex(listMutex) | |
Else | |
Local tmp:TObjectList = inactiveList | |
inactiveList = activeList | |
activeList = tmp | |
'now other threads could already add new commands | |
UnlockMutex(listMutex) | |
'actually start processing the commands | |
For Local c:TCommand = EachIn inactiveList | |
c.Run() | |
'we are done with the command | |
If c.semaphore | |
c.semaphore.Post() | |
EndIf | |
Next | |
'done with all of them | |
inactiveList.Clear() | |
EndIf | |
Return queueSize | |
End Method | |
End Type | |
'------------------- | |
'sample usage | |
'------------------- | |
Global boughtLicences:Int[] | |
Global commandQueue:TCommandQueue = New TCommandQueue | |
' command function to run | |
'define an individual run function | |
Function runCallback:SCommandResult(payload:Object) | |
boughtLicences :+ [Int(String(payload))] | |
Return New SCommandResult(True, Null) | |
End Function | |
Function ThreadFunc:Object( data:Object ) | |
Local lastAdded:Int = Int(String(data)) | |
Local localAdded:Int = 0 | |
Repeat | |
lastAdded :+ 1 | |
localAdded :+ 1 | |
' Make thread 5 wait each time it posts | |
If(lastAdded / 1000000 = 5) | |
commandQueue.Run(runCallback, String(lastAdded)) | |
Else | |
commandQueue.RunDeferred(runCallback, String(lastAdded)) | |
EndIf | |
Until localAdded >= 20 'Forever | |
End Function | |
Local threads:TThread[5] | |
For Local i:Int = 0 Until threads.length | |
threads[i] = CreateThread(ThreadFunc, String((i + 1) * 1000000)) | |
Next | |
Local now:Int = MilliSecs() + 2000 | |
Repeat | |
commandQueue.Process() | |
Until MilliSecs() > now | |
Assert(commandQueue.activeList.count() = 0) | |
Assert(commandQueue.inactiveList.count() = 0) | |
Print "boughtLicences.length = " + boughtLicences.length | |
For Local i:Int = EachIn boughtlicences | |
Print i | |
Next |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Incorporated the fixes and changes