Skip to content

Instantly share code, notes, and snippets.

@nandoquintana
Created April 3, 2019 08:40
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nandoquintana/574792bd76711767407c23a5b3183dfb to your computer and use it in GitHub Desktop.
Save nandoquintana/574792bd76711767407c23a5b3183dfb to your computer and use it in GitHub Desktop.
Enqueue messages into Sidekiq with Python, directly via Redis
#!/usr/bin/python3
import os
import redis
import json
from os.path import join, dirname
from dotenv import load_dotenv
from random import choice
from datetime import datetime
load_dotenv()
host = os.environ.get("REDIS_PROXIES_HOST")
port = os.environ.get("REDIS_PROXIES_PORT")
password = os.environ.get("REDIS_PROXIES_PASSWORD")
db = os.environ.get("REDIS_PROXIES_DB")
sidekiq_queues = redis.Redis(host=host, port=port, db=db, password=password)
queue = 'my_queue'
job = 'MyJob'
arguments = 123456789
value = {
"class": job,
"queue": queue,
"args": [arguments],
"retry": 'true',
"jid": ''.join([choice('qwertyuiopasdfghjklzxcvbnm1234567890') for i in range(24)]),
"created_at": datetime.now().timestamp(),
"enqueued_at": datetime.now().timestamp()}
sidekiq_queues.sadd(f"queues", queue)
sidekiq_queues.lpush(f"queue:{queue}" , json.dumps(value))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment