Skip to content

Instantly share code, notes, and snippets.

@GuillaumeDerval
Created January 21, 2020 13:16
Show Gist options
  • Save GuillaumeDerval/8ed9f40580c8cbce9514ffef60eef33e to your computer and use it in GitHub Desktop.
Save GuillaumeDerval/8ed9f40580c8cbce9514ffef60eef33e to your computer and use it in GitHub Desktop.
Performance test for INGInious
import asyncio
import logging
import time
from multiprocessing import Process
from zmq.asyncio import ZMQEventLoop, Context
from inginious.agent import Agent
from inginious.backend.backend import Backend
from inginious.client.client import Client
from inginious.common.messages import BackendKillJob, BackendNewJob
N_AGENTS = 2
N_CLIENTS = 20
N_JOBS_PER_CLIENT = 1000
BACKEND_FRONTEND_ADDR = "tcp://127.0.0.1:2000"
BACKEND_AGENT_ADDR = "tcp://127.0.0.1:2001"
def start_logger(debug=False):
# create logger
logger = logging.getLogger("inginious")
logger.setLevel(logging.WARN if not debug else logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.WARN if not debug else logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
def start_backend():
logger = start_logger()
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
context = Context()
# Create backend
backend = Backend(context, BACKEND_AGENT_ADDR, BACKEND_FRONTEND_ADDR)
# Run!
try:
loop.run_until_complete(backend.run())
except:
logger.exception("Closing due to exception")
finally:
logger.info("Closing loop")
loop.close()
logger.info("Waiting for ZMQ to send remaining messages to backend (can take 1 sec)")
context.destroy(1000) # give zeromq 1 sec to send remaining messages
logger.info("Done")
class FakeFS:
def from_subfolder(self, x):
return self
def exists(self):
return True
class FakeAgent(Agent):
def __init__(self, context, backend_addr, friendly_name, environments):
super().__init__(context, backend_addr, friendly_name, 1, FakeFS())
self._envs = environments
@property
def environments(self):
return {name: {"id": name, "created": 12345678, "ports": [], "type": "x"} for name in self._envs}
async def new_job(self, message: BackendNewJob):
await self.send_job_result(message.job_id, "ok", "Yeah")
async def kill_job(self, message: BackendKillJob):
pass
def start_agent(name, environments):
logger = start_logger()
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
context = Context()
agent = FakeAgent(context, BACKEND_AGENT_ADDR, name, environments)
try:
loop.run_until_complete(agent.run())
except:
logger.exception("Closing due to exception")
finally:
logger.info("Closing loop")
loop.close()
logger.info("Waiting for ZMQ to send remaining messages to backend (can take 1 sec)")
context.destroy(1000) # give zeromq 1 sec to send remaining messages
logger.info("Done")
class FakeTask:
def __init__(self, env):
self.env = env
def get_course_id(self): return "course"
def get_id(self): return "task"
def get_environment_id(self): return self.env
def get_environment_type(self): return "x"
def get_environment_parameters(self): return {}
def start_client():
logger = start_logger()
loop = ZMQEventLoop()
asyncio.set_event_loop(loop)
context = Context()
client = Client(context, BACKEND_FRONTEND_ADDR)
client.start()
def callback(*args, **kwargs):
if args[0] != ("ok", "Yeah"):
print(args)
assert False
callback.count += 1
if callback.count == N_JOBS_PER_CLIENT:
loop.stop()
callback.count = 0
for _ in range(N_JOBS_PER_CLIENT):
client.new_job(0, FakeTask("a"), {}, callback)
loop.call_soon_threadsafe(asyncio.ensure_future, client.client_start())
#loop.call_soon(client.client_start)
loop.run_forever()
if __name__ == '__main__':
p_backend = Process(target=start_backend)
p_backend.start()
time.sleep(1)
clients = []
for _ in range(N_CLIENTS):
p_client = Process(target=start_client)
p_client.start()
clients.append(p_client)
time.sleep(1)
start_time = time.time()
agents = []
for idx in range(N_AGENTS):
p_agent = Process(target=start_agent, args=("hello " + str(idx), ["a", "b" + str(idx)]))
p_agent.start()
agents.append(p_agent)
for p_client in clients:
p_client.join()
end_time = time.time()
p_backend.kill()
p_backend.join()
for agent in agents:
agent.kill()
agent.join()
print("DONE", end_time-start_time)
@GuillaumeDerval
Copy link
Author

Note: you will need to comment lines

        environment = task.get_environment_id()
        if environment not in self._available_environments:
            self._logger.warning("Env %s not available for task %s/%s", environment, task.get_course_id(), task.get_id())
            ssh_callback(None, None, None)  # ssh_callback must be called once
            callback(("crash", "Environment not available."), 0.0, {}, {}, "", {}, None, "", "")
            return

        environment_type = task.get_environment_type()
        if self._available_environments[environment] != environment_type:
            self._logger.warning("Env %s does not have the expected type %s, but rather %s, in task %s/%s",
                                 environment, environment_type, self._available_environments[environment],
                                 task.get_course_id(), task.get_id())
            ssh_callback(None, None, None)  # ssh_callback must be called once
            callback(("crash", "Environment {}-{} not available.".format(environment_type, environment)), 0.0, {}, {}, "", {}, None, "", "")
            return

in client.py for this to work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment