Created
April 9, 2022 00:17
-
-
Save digitalWestie/a0bf0c9245e477fd2c1878498ac6488c to your computer and use it in GitHub Desktop.
Experimenting with a message queue system with couchdb.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading, time | |
from cloudant.client import CouchDB | |
from cloudant.database import CouchDatabase | |
from cloudant.document import Document | |
client = CouchDB("username", "password", url="http://couch-main:5984", connect=True, auto_renew=True, timeout=4000) | |
db = CouchDatabase(client, "channel_queue") | |
def process_requets(myc): | |
print("Starting threaded func") | |
time.sleep(5) | |
doc = myc.db[myc.sender_id] | |
requests = doc.get("requests", []) | |
if not requests: | |
print("\tT: Nothing to do") | |
return None | |
try: | |
doc['requests'] = [] | |
doc.save() | |
except Exception as e: | |
print("\tT: Update conflict detected, skipping") | |
return None | |
# shoot out requests, if fail, retry with set | |
msg="" | |
for r in requests: | |
msg = msg + ", ".join(r.keys()) | |
print("\tT: Combined %s requests: %s" % (len(requests), msg)) | |
return True | |
class MyClass(): | |
def __init__(self, client): | |
self.db = CouchDatabase(client, "channel_queue") | |
self.sender_id = 'julia006' | |
def collect(self, request): | |
print("Collect request") | |
doc = self.db[self.sender_id] | |
doc.update_field(Document.list_field_append, "requests", request) | |
new_thread = threading.Thread(target=process_requets, args=([self])) | |
new_thread.start() | |
return None | |
c = MyClass(client) | |
# Try running and seeing what happens: | |
c.collect({"a": 500}); c.collect({"b": 400}); c.collect({"c": 300}); | |
c.collect({"d": 500}); time.sleep(1); c.collect({"e": 400}); time.sleep(1.5); c.collect({"f": 300}); | |
c.collect({"g": 500}); c.collect({"h": 400}); time.sleep(1.5); c.collect({"i": 300}); | |
c.collect({"j": 500}); c.collect({"k": 400}); time.sleep(2); c.collect({"l": 300}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment