Skip to content

Instantly share code, notes, and snippets.

@giuliohome
Last active October 1, 2023 20:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save giuliohome/bdcc41c2959c6410370914559ed2a364 to your computer and use it in GitHub Desktop.
Save giuliohome/bdcc41c2959c6410370914559ed2a364 to your computer and use it in GitHub Desktop.
Async Workers with Condition Handling
import Control.Concurrent.STM
import Control.Monad
import Control.Concurrent
workerWaitForCondition :: TVar Bool -> IO ()
workerWaitForCondition condition = atomically $ do
c <- readTVar condition
unless c retry
workerMakeConditionTrue :: TVar Bool -> IO ()
workerMakeConditionTrue condition = do
threadDelay 2000000 -- Simulate some work
atomically $ do
writeTVar condition True
main :: IO ()
main = do
w2_condition <- newTVarIO False
end_condition <- newEmptyMVar
-- Create two worker tasks
let task1 =
putStrLn "Worker 1 is waiting for the w2 condition..." >>
workerWaitForCondition w2_condition >>
putStrLn "Worker 1: Condition is met!" >>
putMVar end_condition "Forked thread has completed."
task2 =
putStrLn "Worker 2 is making the w2 condition true." >>
workerMakeConditionTrue w2_condition >>
putStrLn "Worker 2 has set the w2 condition true."
-- Wait for both tasks to complete
threadId1 <- forkIO task1
putStrLn ("Hello, task1 " ++ show threadId1 ++ "!")
threadId2 <- forkIO task2
putStrLn ("Hello, task2 " ++ show threadId2 ++ "!")
putStrLn ("waiting for the end of the workflow")
-- Wait for the forked thread to complete by taking the MVar
end_message <- takeMVar end_condition
putStrLn end_message
import asyncio
async def worker_wait_for_condition():
async with condition:
print("Worker 1 is waiting for the condition...")
await condition.wait()
print("Worker 1: Condition is met!")
async def worker_make_condition_true():
print("Worker 2 is making the condition true.")
await asyncio.sleep(2) # Simulate some work
async with condition:
condition.notify()
print("Worker 2 has set the condition true.")
async def main():
global condition # Make the condition variable global
condition = asyncio.Condition() # Recreate the condition within the same event loop
# Create two worker tasks
task1 = asyncio.create_task(worker_wait_for_condition())
task2 = asyncio.create_task(worker_make_condition_true())
# Wait for both tasks to complete
await asyncio.gather(task1, task2)
if __name__ == "__main__":
asyncio.run(main())
//> using dep "org.typelevel::cats-effect::3.5.2"
import cats.effect._
import cats.syntax.all._
import scala.concurrent.duration._
object Main extends IOApp.Simple {
def workerWaitForCondition(condition: Deferred[IO, Unit]): IO[Unit] =
for {
_ <- IO(println("Worker 1 is waiting for the condition..."))
_ <- condition.get
_ <- IO(println("Worker 1: Condition is met!"))
} yield ()
def workerMakeConditionTrue(condition: Deferred[IO, Unit]): IO[Unit] =
for {
_ <- IO(println("Worker 2 is making the condition true."))
_ <- IO.sleep(2.seconds) // Simulate some work
_ <- condition.complete(())
_ <- IO(println("Worker 2 has set the condition true."))
} yield ()
def run: IO[Unit] =
for {
condition <- Deferred[IO, Unit]
task1 = workerWaitForCondition(condition)
task2 = workerMakeConditionTrue(condition)
_ <- (task1, task2).parTupled
} yield ()
}
@giuliohome
Copy link
Author

giuliohome commented Oct 1, 2023

This Python code snippet (as well as the Haskell version or Scala Cats Effect one) demonstrates the use of asynchronous workers that wait for specific conditions to be met and can also set those conditions asynchronously. The code utilizes Python's asyncio library to achieve concurrent and non-blocking behavior.

Features:

Two asynchronous workers: One worker waits for a specified condition, while the other worker makes that condition true when necessary.

Proper synchronization: The code ensures proper synchronization between the workers to avoid race conditions or conflicts.

Asynchronous waiting: Workers use await to asynchronously wait for conditions, allowing efficient and non-blocking execution.

Condition setting: The code simulates the process of making a condition true, which can be replaced with any real-world task or event.

This code snippet serves as a foundation for building concurrent and asynchronous applications in Python, allowing you to create complex workflows that depend on specific conditions being met asynchronously.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment