Skip to content

Instantly share code, notes, and snippets.

@otherwiseguy
Created May 18, 2021 21:24
Show Gist options
  • Save otherwiseguy/daa4c44614bf90bfc9a74cf25f480b85 to your computer and use it in GitHub Desktop.
Save otherwiseguy/daa4c44614bf90bfc9a74cf25f480b85 to your computer and use it in GitHub Desktop.
from multiprocessing.managers import BaseManager
from ovsdbapp.schema.ovn_northbound import commands
class QueueManager(BaseManager): pass
QueueManager.register('get_queue')
m = QueueManager(address=('127.0.0.1', 50000), authkey=b'password')
m.connect()
queue = m.get_queue()
cmd = commands.LsAddCommand(None, "test", may_exist=True)
queue.put(cmd)
import logging
from multiprocessing.managers import BaseManager
from queue import Queue
import os
from ovsdbapp.backend.ovs_idl import connection
from ovsdbapp.backend.ovs_idl import vlog
from ovsdbapp.schema.ovn_northbound import impl_idl
logging.basicConfig(level=logging.DEBUG)
LOG = logging.getLogger(__name__)
vlog.use_python_logger()
class MyIdl(connection.OvsdbIdl):
pass
conn = os.getenv("OVN_NB_DB", "tcp:127.0.0.1:6641")
#conn = "tcp:127.0.0.1:6641"
# The python-ovs Idl class. Take it from server's database
i = MyIdl.from_server(conn, 'OVN_Northbound')
# The ovsdbapp Connection object
c = connection.Connection(idl=i, timeout=10)
# The OVN_Northbound API implementation object
api = impl_idl.OvnNbApiIdlImpl(c)
queue = Queue()
class QueueManager(BaseManager): pass
QueueManager.register('get_queue', callable=lambda: queue)
m = QueueManager(address=('', 50000), authkey=b'password')
m.start()
while True:
cmd = q.get()
cmd.api = api
cmd.execute()
m.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment