Skip to content

Instantly share code, notes, and snippets.

@romiras
Created April 29, 2024 14:13
Show Gist options
  • Save romiras/cff10a1940fd5f0d3a3d7129510a18c6 to your computer and use it in GitHub Desktop.
Save romiras/cff10a1940fd5f0d3a3d7129510a18c6 to your computer and use it in GitHub Desktop.
Drain Sidekiq' queue in Redis with matching args to stdout, otherwise push to another queue (Crystal lang)
# Run:
# crystal src/main.cr \
# -redis-url='redis://localhost:6379' \
# -namespace=sidekiq \
# -queue-name=foo \
# -class-name=Foo::BarProcessor \
# -arg1=123 \
# > filtered_jobs.jsonl
require "json"
require "redis"
def parse_args
arg_hash = Hash(String, String).new
ARGV.each do |string|
parts = string.split("=")
next if parts.size < 2
arg_name = parts[0][1..-1] # Remove the leading "-"
arg_value = parts[1]
arg_hash[arg_name] = arg_value
end
arg_hash
end
# Define a method that yields matching job payloads
def process_matching_jobs(redis : Redis, queue_name : String, class_name : String, arg1 : Int32)
loop do
job_payload = redis.lpop(queue_name)
break unless job_payload
parsed_payload = JSON.parse(job_payload.to_s)
args = parsed_payload["args"]
if args.size > 0 && args[0].to_s.to_i.as?(Int32) == arg1 && parsed_payload["class"] == class_name
yield job_payload
else
redis.rpush(queue_name + "-bak", job_payload.to_s)
end
end
end
# Parse command-line arguments
args = parse_args
redis_url = args["redis-url"]? || "redis://localhost:6379/0"
namespace = args["namespace"]? || ""
arg1 = (args["arg1"]? || 0).to_i
queue_name = args["queue-name"]? || "jobs"
class_name = args["class-name"]? || "MyJobClass"
# Connect to Redis
redis = Redis.new(url: redis_url)
redis.ping
queue_name = "#{namespace}:queue:#{queue_name}"
# Process matching jobs and output payloads directly
process_matching_jobs(redis, queue_name, class_name, arg1) do |payload|
puts payload
end
redis.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment