Created
April 29, 2024 14:13
-
-
Save romiras/cff10a1940fd5f0d3a3d7129510a18c6 to your computer and use it in GitHub Desktop.
Drain Sidekiq' queue in Redis with matching args to stdout, otherwise push to another queue (Crystal lang)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Run: | |
# crystal src/main.cr \ | |
# -redis-url='redis://localhost:6379' \ | |
# -namespace=sidekiq \ | |
# -queue-name=foo \ | |
# -class-name=Foo::BarProcessor \ | |
# -arg1=123 \ | |
# > filtered_jobs.jsonl | |
require "json" | |
require "redis" | |
def parse_args | |
arg_hash = Hash(String, String).new | |
ARGV.each do |string| | |
parts = string.split("=") | |
next if parts.size < 2 | |
arg_name = parts[0][1..-1] # Remove the leading "-" | |
arg_value = parts[1] | |
arg_hash[arg_name] = arg_value | |
end | |
arg_hash | |
end | |
# Define a method that yields matching job payloads | |
def process_matching_jobs(redis : Redis, queue_name : String, class_name : String, arg1 : Int32) | |
loop do | |
job_payload = redis.lpop(queue_name) | |
break unless job_payload | |
parsed_payload = JSON.parse(job_payload.to_s) | |
args = parsed_payload["args"] | |
if args.size > 0 && args[0].to_s.to_i.as?(Int32) == arg1 && parsed_payload["class"] == class_name | |
yield job_payload | |
else | |
redis.rpush(queue_name + "-bak", job_payload.to_s) | |
end | |
end | |
end | |
# Parse command-line arguments | |
args = parse_args | |
redis_url = args["redis-url"]? || "redis://localhost:6379/0" | |
namespace = args["namespace"]? || "" | |
arg1 = (args["arg1"]? || 0).to_i | |
queue_name = args["queue-name"]? || "jobs" | |
class_name = args["class-name"]? || "MyJobClass" | |
# Connect to Redis | |
redis = Redis.new(url: redis_url) | |
redis.ping | |
queue_name = "#{namespace}:queue:#{queue_name}" | |
# Process matching jobs and output payloads directly | |
process_matching_jobs(redis, queue_name, class_name, arg1) do |payload| | |
puts payload | |
end | |
redis.close |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment