Skip to content

Instantly share code, notes, and snippets.

@ajayhn
Created February 10, 2015 00:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ajayhn/ca842987638131f0073a to your computer and use it in GitHub Desktop.
Save ajayhn/ca842987638131f0073a to your computer and use it in GitHub Desktop.
diff --git a/cfgm_common/vnc_kombu.py b/cfgm_common/vnc_kombu.py
index fbf637e..d8d5fd5 100644
--- a/cfgm_common/vnc_kombu.py
+++ b/cfgm_common/vnc_kombu.py
@@ -6,6 +6,7 @@ import amqp.exceptions
import kombu
import gevent
import time
+import json
from pysandesh.connection_info import ConnectionState
from pysandesh.gen_py.process_info.ttypes import ConnectionStatus, \
@@ -40,7 +41,8 @@ class VncKombuClient(object):
msg = "Unable to delete the old amqp Q: %s" % str(e)
self._logger(msg, level=SandeshLevel.SYS_ERR)
- self._obj_update_q = self._conn.SimpleQueue(self._update_queue_obj)
+ # used for publishing. separate channel/fd than for consuming
+ self._obj_update_exchange = self._update_exchange_obj(self._conn.channel())
old_subscribe_greenlet = self._subscribe_greenlet
self._subscribe_greenlet = gevent.spawn(self._dbe_oper_subscribe)
@@ -68,10 +70,10 @@ class VncKombuClient(object):
self._subscribe_cb = subscribe_cb
self._logger = logger
- obj_upd_exchange = kombu.Exchange('vnc_config.object-update', 'fanout',
- durable=False)
+ self._update_exchange_obj = kombu.Exchange('vnc_config.object-update', 'fanout',
+ durable=False)
- self._update_queue_obj = kombu.Queue(q_name, obj_upd_exchange)
+ self._update_queue_obj = kombu.Queue(q_name, self._update_exchange_obj)
self._subscribe_greenlet = None
self.connect(True)
# end __init__
@@ -99,7 +101,8 @@ class VncKombuClient(object):
trace = None
try:
- self._subscribe_cb(message.payload)
+ cb_data = json.loads(message.payload)
+ self._subscribe_cb(cb_data)
except Exception as e:
msg = "Subscribe callback had error: %s" % str(e)
self._logger(msg, level=SandeshLevel.SYS_WARN)
diff --git a/vnc_cfg_api_server/vnc_cfg_ifmap.py b/vnc_cfg_api_server/vnc_cfg_ifmap.py
index 10bbd7d..623ebef 100644
--- a/vnc_cfg_api_server/vnc_cfg_ifmap.py
+++ b/vnc_cfg_api_server/vnc_cfg_ifmap.py
@@ -913,7 +913,8 @@ class VncServerKombuClient(VncKombuClient):
message = self._publish_queue.get()
while True:
try:
- self._obj_update_q.put(message, serializer='json')
+ bound_msg = self._obj_update_exchange.Message(json.dumps(message))
+ self._obj_update_exchange.publish(bound_msg, routing_key='')
break
except Exception as e:
log_str = "Disconnected from rabbitmq. Reinitializing connection: %s" % str(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment