Skip to content

Instantly share code, notes, and snippets.

@GWRon
Last active November 3, 2022 10:16
Show Gist options
  • Save GWRon/5de1cbe0ddcb8ca511ad7e7ecbb0aeed to your computer and use it in GitHub Desktop.
Save GWRon/5de1cbe0ddcb8ca511ad7e7ecbb0aeed to your computer and use it in GitHub Desktop.
Test code for a command queue.
SuperStrict
Framework Brl.StandardIO
Import Brl.Threads
Import Brl.ObjectList
Enum ECommandStatus
OPEN ' not yet run
RUNNING ' currently running
FINISHED ' finished execution
End Enum
Struct SCommandResult
Field status:Int
Field data:Object
Method New(status:Int, data:Object = Null)
Self.status = status
Self.data = data
End Method
End Struct
Type TCommand
Field status:ECommandStatus = ECommandStatus.OPEN
Field payload:Object
Field runCallback:SCommandResult(payload:Object)
Field resultCallback(result:SCommandResult)
Field result:SCommandResult
Field semaphore:TSemaphore
Method New(runCallback:SCommandResult(payload:Object), payload:Object, resultCallback(result:SCommandResult) = Null)
Self.payload = payload
Self.runCallback = runCallback
Self.resultCallback = resultCallback
End Method
'the customizable element any TCommand-extension could individualize
Method CustomRun()
End Method
Method Run:SCommandResult()
self.status = ECommandStatus.RUNNING
'execute a callback if defined
If Self.runCallback
result = Self.runCallback(payload)
EndIf
'execute a customized method (individualized in extending types)
CustomRun()
'inform potentially interested callback (eg an "onDidSomething()")
If Self.resultCallback
Self.resultCallback(result)
EndIf
self.status = ECommandStatus.FINISHED
Return result
End Method
Method IsFinished:Int()
Return self.status = ECommandStatus.FINISHED
End Method
Method WaitTillFinished:Int(timeOut:Int = -1)
If IsFinished() Then Return True
If Self.semaphore
Self.semaphore.TimedWait(timeOut)
EndIf
Return True
End Method
Method Serialize:String()
'enum to String etc
End Method
End Type
Type TCommandQueue
Private
Field activeList:TObjectList = New TObjectList
Field inactiveList:TObjectList = New TObjectList
Field listMutex:TMutex = CreateMutex()
Public
Method Run:SCommandResult(c:TCommand, timeOut:Int = 0)
LockMutex(listMutex)
activeList.AddLast(c)
'TODO: have pool and reuse semaphores?
c.semaphore = CreateSemaphore(0)
UnlockMutex(listMutex)
'now wait until the queue processed it
If timeOut > 0
c.semaphore.TimedWait(timeOut)
Else
c.semaphore.Wait()
EndIf
Return c.result
End Method
Method Run(runCallback:SCommandResult(payload:Object), payLoad:Object, resultCallback(result:SCommandResult) = Null, timeOut:Int = 0)
Local c:TCommand = New TCommand(runCallback, payLoad, resultCallback)
Run(c, timeOut)
End Method
'add a command but do not wait for execution and results
Method RunDeferred(c:TCommand)
LockMutex(listMutex)
activeList.AddLast(c)
UnlockMutex(listMutex)
End Method
Method RunDeferred(runCallback:SCommandResult(payload:Object), payLoad:Object, resultCallback(result:SCommandResult) = Null)
Local c:TCommand = New TCommand(runCallback, payLoad, resultCallback)
RunDeferred(c)
EndMethod
'processes all enqueued commands
Method Process:Int()
'this - or assigning a thread and comparing against it
'which would allow to have command queues in specific child
'threads too
'If CurrentThread() <> MainThread()
' Throw "Only run TCommandQueue.Process from MainThread"
'EndIf
LockMutex(listMutex)
'switch active list so we can clear it right here ...
'allowing other threads to enqueue new commands while
Local queueSize:Int = activeList.Count() 'compacts array inside
If queueSize = 0
UnlockMutex(listMutex)
Else
Local tmp:TObjectList = inactiveList
inactiveList = activeList
activeList = tmp
'now other threads could already add new commands
UnlockMutex(listMutex)
'actually start processing the commands
For Local c:TCommand = EachIn inactiveList
c.Run()
'we are done with the command
If c.semaphore
c.semaphore.Post()
EndIf
Next
'done with all of them
inactiveList.Clear()
EndIf
Return queueSize
End Method
End Type
'-------------------
'sample usage
'-------------------
Global boughtLicences:Int[]
Global commandQueue:TCommandQueue = New TCommandQueue
' command function to run
'define an individual run function
Function runCallback:SCommandResult(payload:Object)
boughtLicences :+ [Int(String(payload))]
Return New SCommandResult(True, Null)
End Function
Function ThreadFunc:Object( data:Object )
Local lastAdded:Int = Int(String(data))
Local localAdded:Int = 0
Repeat
lastAdded :+ 1
localAdded :+ 1
' Make thread 5 wait each time it posts
If(lastAdded / 1000000 = 5)
commandQueue.Run(runCallback, String(lastAdded))
Else
commandQueue.RunDeferred(runCallback, String(lastAdded))
EndIf
Until localAdded >= 20 'Forever
End Function
Local threads:TThread[5]
For Local i:Int = 0 Until threads.length
threads[i] = CreateThread(ThreadFunc, String((i + 1) * 1000000))
Next
Local now:Int = MilliSecs() + 2000
Repeat
commandQueue.Process()
Until MilliSecs() > now
Assert(commandQueue.activeList.count() = 0)
Assert(commandQueue.inactiveList.count() = 0)
Print "boughtLicences.length = " + boughtLicences.length
For Local i:Int = EachIn boughtlicences
Print i
Next
@davecamp
Copy link

davecamp commented Nov 3, 2022

SuperStrict
Framework Brl.StandardIO
Import Brl.Threads
Import Brl.ObjectList


Struct SCommandResult
	Field status:Int
	Field data:Object
	
	Method New(status:Int, data:Object = Null)
		Self.status = status
		Self.data = data
	End Method
End Struct




Type TCommand
	Field payload:Object
	Field semaphore:TSemaphore
	Field runCallback:SCommandResult(payload:Object)
	Field resultCallback(result:SCommandResult)
	

	Method New(runCallback:SCommandResult(payload:Object), payload:Object, resultCallback(result:SCommandResult) = Null)
		Self.payload = payload
		Self.runCallback = runCallback
		Self.resultCallback = resultCallback
	End Method
	

	'the customizable element any TCommand-extension could individualize
	Method CustomRun()
	End Method

	
	Method Run:SCommandResult()
		Local result:SCommandResult
		
		'execute a callback if defined
		If Self.runCallback
			result = Self.runCallback(payload)
		EndIf
		
		'execute a customized method (individualized in extending types)
		CustomRun()

		'inform potentially interested callback (eg an "onDidSomething()")
		If Self.resultCallback
			Self.resultCallback(result)
		EndIf

		Return result
	End Method
	
	
	Method WaitTillFinished:Int(timeOut:Int = -1)
		If Self.semaphore
			Self.semaphore.TimedWait(timeOut)
		EndIf

		Return True
	End Method

	
	Method Serialize:String()
		'enum to String etc
	End Method
End Type




Type TCommandQueue
	Private
	Field activeList:TObjectList = New TObjectList
	Field inactiveList:TObjectList = New TObjectList
	Field listMutex:TMutex = CreateMutex()

	Public
	Method RunCommand(runCallback:SCommandResult(payload:Object), payLoad:Object, resultCallback(result:SCommandResult) = Null)
		Local c:TCommand = New TCommand(runCallback, payLoad, resultCallback)
		LockMutex(listMutex)
		activeList.AddLast(c)
		UnlockMutex(listMutex)
	End Method
	
	Method RunCommandAndWait(runCallback:SCommandResult(payload:Object), payLoad:Object, resultCallback(result:SCommandResult) = Null)
		Local c:TCommand = New TCommand(runCallback, payLoad, resultCallback)
		LockMutex(listMutex)
		c.semaphore = CreateSemaphore(0)
		activeList.AddLast(c)
		UnlockMutex(listMutex)
		WaitSemaphore(c.semaphore)
	EndMethod


	'processes all enqueued commands
	Method Process:Int()
		LockMutex(listMutex)
		'switch active list so we can clear it right here ...
		'allowing other threads to enqueue new commands while
		Local queueSize:Int = activeList.Count() 'compacts array inside
		If queueSize = 0
			UnlockMutex(listMutex)
		Else
			Local tmp:TObjectList = inactiveList
			inactiveList = activeList
			activeList = tmp

			'now other threads could already add new commands
			UnlockMutex(listMutex)

			'actually start processing the commands
			For Local c:TCommand = EachIn inactiveList
				Local Result:SCommandResult = c.Run()

				'we are done with the command
				If c.semaphore
					c.semaphore.Post()
				EndIf
			Next

			'done with all of them
			inactiveList.Clear()
		EndIf

		Return queueSize
	End Method
End Type



'-------------------
'sample usage
'-------------------




Global boughtLicences:Int[]

Global commandQueue:TCommandQueue = New TCommandQueue

' command function to run
'define an individual run function
Function runCallback:SCommandResult(payload:Object)
	boughtLicences :+ [Int(String(payload))]
	Return New SCommandResult(True, Null)
End Function

Function ThreadFunc:Object( data:Object )
	Local lastAdded:Int = Int(String(data))
	Local localAdded:Int = 0
	
	Repeat
		lastAdded :+ 1
		localAdded :+ 1
		
		' Make thread 5 wait each time it posts
		If(lastAdded / 1000000 = 5)
			commandQueue.RunCommandAndWait(runCallback, String(lastAdded))
		Else
			commandQueue.RunCommand(runCallback, String(lastAdded))
		EndIf
	Until localAdded >= 20 'Forever
End Function



Local threads:TThread[5]

For Local i:Int = 0 Until threads.length
	threads[i] = CreateThread(ThreadFunc, String((i + 1) * 1000000))
Next

Local now:Int = MilliSecs() + 2000
Repeat
	commandQueue.Process()
Until MilliSecs() > now



Assert(commandQueue.activeList.count() = 0)
Assert(commandQueue.inactiveList.count() = 0)

Print "boughtLicences.length = " + boughtLicences.length
For Local i:Int = EachIn boughtlicences
	Print i
Next

@GWRon
Copy link
Author

GWRon commented Nov 3, 2022

Incorporated the fixes and changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment