Skip to content

Instantly share code, notes, and snippets.

@jbpotonnier
Last active August 29, 2015 13:56
Show Gist options
  • Save jbpotonnier/9180033 to your computer and use it in GitHub Desktop.
Save jbpotonnier/9180033 to your computer and use it in GitHub Desktop.
Using semaphore to allocate resources.
"""
Type ipython -i client.py and interract with surfer_queue and agent in the command line
"""
from connector import ConnectorManager, Agent
ConnectorManager.register('get_surfer_queue')
ConnectorManager.register('get_agent_pool')
if __name__ == '__main__':
address = ('localhost', 8888)
manager = ConnectorManager(address, 'password')
manager.connect()
print 'client connected to ConnectorManager on {}:{}'.format(*address)
surfer_queue = manager.get_surfer_queue()
agent_pool = manager.get_agent_pool()
import Network (withSocketsDo, accept, listenOn, PortID(PortNumber))
import Control.Concurrent (forkIO)
import System.IO (Handle, hClose, hGetLine, hSetBuffering, BufferMode (..))
import Control.Applicative ((<$>), (<*>))
import Control.Monad (forever)
import Control.Monad.STM (atomically)
import Control.Concurrent.STM (STM)
import Control.Concurrent.STM.TChan (TChan, newTChan, writeTChan, readTChan)
import Control.Concurrent.STM.TVar (TVar, newTVar, writeTVar, readTVar)
import Control.Concurrent.STM.SSem (SSem)
import qualified Control.Concurrent.STM.SSem as SSem
import Data.List (sort)
type Surfer = String
type Agent = String
data AgentPool = AgentPool (TVar [(Int, Agent)]) SSem
enqueue :: TChan Surfer -> String -> IO ()
enqueue surferChan surfer = do
atomically $ writeTChan surferChan surfer
putStrLn $ "surfer " ++ surfer ++ " enqueued"
addAgent :: AgentPool -> Agent -> Int -> IO ()
addAgent (AgentPool agentsTVar slots) agent nbSlots = do
atomically $ do
agents <- readTVar agentsTVar
writeTVar agentsTVar $ (nbSlots, agent):agents
SSem.signalN slots nbSlots
putStrLn $ "agent " ++ agent ++ " added"
acquireAgent :: AgentPool -> IO String
acquireAgent (AgentPool agentsTVar slots) =
atomically $ do
SSem.wait slots
agents <- readTVar $ agentsTVar
let (nbSlots, name) = (!! 0) . reverse . sort $ agents
let newAgents = (nbSlots - 1, name):filter ((/= name) . snd) agents
writeTVar agentsTVar $ newAgents
return name
handleRequest :: AgentPool -> TChan Surfer -> Handle -> IO ()
handleRequest agentPool surferChan h = loop
where
loop = do
hSetBuffering h NoBuffering
command <- hGetLine h
case words command of
["enqueue", surfer] -> do
enqueue surferChan surfer
loop
["add_agent", name, nb_max_slots] -> do
addAgent agentPool name (read nb_max_slots)
loop
["bye"] ->
hClose h
_ -> loop
connect :: AgentPool -> TChan Surfer -> IO ()
connect agentPool surferChan = do
surfer <- atomically $ readTChan surferChan
agent <- acquireAgent agentPool
putStrLn $ "connected " ++ agent ++ " and " ++ surfer
serve :: AgentPool -> TChan Surfer -> IO ()
serve agentPool surferChan = withSocketsDo $ do
sock <- listenOn $ PortNumber 5002
forever $ do
(h, _ , _) <- accept sock
forkIO $ handleRequest agentPool surferChan h
newAgentPool :: STM AgentPool
newAgentPool = AgentPool <$> newTVar [] <*> SSem.new 0
main :: IO ()
main = do
surferChan <- atomically newTChan
agentPool <- atomically newAgentPool
_ <- forkIO $ forever (connect agentPool surferChan)
serve agentPool surferChan
from multiprocessing.managers import BaseManager
from multiprocessing import Process, Queue, Lock, Semaphore
from collections import namedtuple, defaultdict
Agent = namedtuple('Agent', 'name nb_max_slots')
class AgentPool(object):
def __init__(self):
self.agent_collection = AgentCollection()
self.conversations = {}
self.lock = Lock()
self.slots = Semaphore(0)
def add_agent(self, agent):
with self.lock:
self.agent_collection.add(agent)
print '{agent} added'.format(**locals())
for _ in xrange(agent.nb_max_slots):
self.slots.release()
print '1 slot for {agent} added'.format(**locals())
def remove_agent(self, agent):
# TODO
pass
def connect(self, surfer):
self.slots.acquire()
print 'an agent is available'
with self.lock:
agent = self.agent_collection.acquire()
self.conversations[surfer] = agent
print '{agent} and {surfer} connected'.format(**locals())
def end_chat(self, surfer):
with self.lock:
agent = self.conversations[surfer]
self.agent_collection.release(agent)
del self.conversations[surfer]
self.slots.release()
print 'chat between {agent} and {surfer} ended'.format(**locals())
class AgentCollection(object):
def __init__(self):
self.agents_by_nb_slots = defaultdict(list)
self.nb_slot_by_agent = {}
def add(self, agent):
self.agents_by_nb_slots[agent.nb_max_slots].append(agent)
self.nb_slot_by_agent[agent] = agent.nb_max_slots
def remove(self, agent):
# TODO
pass
def acquire(self):
max_free_slots = max(self.agents_by_nb_slots.iterkeys())
agent = self.agents_by_nb_slots[max_free_slots].pop(0)
if not self.agents_by_nb_slots[max_free_slots]:
del self.agents_by_nb_slots[max_free_slots]
self.agents_by_nb_slots[max_free_slots - 1].append(agent)
self.nb_slot_by_agent[agent] -= 1
return agent
def release(self, agent):
nb_slots = self.nb_slot_by_agent[agent]
self.agents_by_nb_slots[nb_slots].remove(agent)
self.agents_by_nb_slots[nb_slots + 1].append(agent)
self.nb_slot_by_agent[agent] += 1
def connect(surfer_queue, agent_pool):
print 'connect loop started'
while True:
surfer = surfer_queue.get()
agent_pool.connect(surfer)
class ConnectorManager(BaseManager):
pass
if __name__ == '__main__':
surfer_queue = Queue()
ConnectorManager.register('get_surfer_queue', lambda: surfer_queue)
agent_pool = AgentPool()
ConnectorManager.register('get_agent_pool', lambda: agent_pool)
address = ('localhost', 8888)
manager = ConnectorManager(address, 'password')
manager.start()
print 'ConnectorManager started on {}:{}'.format(*address)
connector = Process(target=connect, args=(surfer_queue, manager.get_agent_pool()))
connector.start()
connector.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment