Skip to content

Instantly share code, notes, and snippets.

@digitalWestie
Created April 9, 2022 00:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save digitalWestie/a0bf0c9245e477fd2c1878498ac6488c to your computer and use it in GitHub Desktop.
Save digitalWestie/a0bf0c9245e477fd2c1878498ac6488c to your computer and use it in GitHub Desktop.
Experimenting with a message queue system with couchdb.
import threading, time
from cloudant.client import CouchDB
from cloudant.database import CouchDatabase
from cloudant.document import Document
client = CouchDB("username", "password", url="http://couch-main:5984", connect=True, auto_renew=True, timeout=4000)
db = CouchDatabase(client, "channel_queue")
def process_requets(myc):
print("Starting threaded func")
time.sleep(5)
doc = myc.db[myc.sender_id]
requests = doc.get("requests", [])
if not requests:
print("\tT: Nothing to do")
return None
try:
doc['requests'] = []
doc.save()
except Exception as e:
print("\tT: Update conflict detected, skipping")
return None
# shoot out requests, if fail, retry with set
msg=""
for r in requests:
msg = msg + ", ".join(r.keys())
print("\tT: Combined %s requests: %s" % (len(requests), msg))
return True
class MyClass():
def __init__(self, client):
self.db = CouchDatabase(client, "channel_queue")
self.sender_id = 'julia006'
def collect(self, request):
print("Collect request")
doc = self.db[self.sender_id]
doc.update_field(Document.list_field_append, "requests", request)
new_thread = threading.Thread(target=process_requets, args=([self]))
new_thread.start()
return None
c = MyClass(client)
# Try running and seeing what happens:
c.collect({"a": 500}); c.collect({"b": 400}); c.collect({"c": 300});
c.collect({"d": 500}); time.sleep(1); c.collect({"e": 400}); time.sleep(1.5); c.collect({"f": 300});
c.collect({"g": 500}); c.collect({"h": 400}); time.sleep(1.5); c.collect({"i": 300});
c.collect({"j": 500}); c.collect({"k": 400}); time.sleep(2); c.collect({"l": 300});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment