https://dev.to/amplifr/monitoring-puma-web-server-with-prometheus-and-grafana-5b5o https://puma.io/puma/Puma/DSL.html#activate_control_app-instance_method
activate_control_app 'unix:///var/run/pumactl.sock', { auth_token: '12345' }
activate_control_app 'unix:///var/run/pumactl.sock'
activate_control_app 'unix:///var/run/pumactl.sock', { no_token: true }
activate_control_app('tcp://127.0.0.1:9000', auth_token: 'top_secret')
# or without any params, just activate_control_app
# GET /stats?secret=top_secret
{
"workers"=>2,
"phase"=>0,
"booted_workers"=>2,
"old_workers"=>0,
"worker_status"=> [
{
"pid"=>13,
"index"=>0,
"phase"=>0,
"booted"=>true,
"last_checkin"=>"2019-03-31T13:04:28Z",
"last_status"=>{
"backlog"=>0, "running"=>5, "pool_capacity"=>5, "max_threads"=>5
}
},
{
"pid"=>17,
"index"=>1,
"phase"=>0,
"booted"=>true,
"last_checkin"=>"2019-03-31T13:04:28Z",
"last_status"=>{
"backlog"=>0, "running"=>5, "pool_capacity"=>5, "max_threads"=>5
}
}
]
}
https://github.com/yabeda-rb/yabeda
require 'socket'
LIVENESS_PORT = 8080
Sidekiq.configure_server do |config|
config.on(:startup) do
Sidekiq::Logging.logger.info "Starting liveness server on #{LIVENESS_PORT}"
Thread.start do
server = TCPServer.new('localhost', LIVENESS_PORT)
loop do
Thread.start(server.accept) do |socket|
request = socket.gets # Read the first line of the request (the Request-Line)
::Sidekiq.redis do |r|
sidekiq_response = r.ping
end
if !sidekiq_response.eql? 'PONG'
response = "Sidekiq is not ready: Sidekiq.redis.ping returned #{res.inspect} instead of PONG\n"
Sidekiq::Logging.logger.error response
else
response = "Live!\n"
end
socket.print "HTTP/1.1 200 OK\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Length: #{response.bytesize}\r\n" +
"Connection: close\r\n"
socket.print "\r\n" # blank line separates the header from the body, as required by the protocol
socket.print response
socket.close
end
end
end
end
end
https://github.com/sidekiq/sidekiq/wiki/Scaling-Sidekiq
class ChargeOutstandingInvoicesJob
include Sidekiq::Job
def perform_at
array_of_job_args = Invoice.
where(charged_at: nil). # Get all not charged
pluck(:id). # Get only their ids
zip # turn each element into
# a single-element array
# batch size is 1000 by default
ChargeJob.perform_bulk(array_of_job_args)
end
end
Name your queues based on priority or urgency. Some teams name their queues using domain specific terms that are no help at all when it comes to planning queue priority or latency requirements. “Urgent”, “default”, and “low” are much easier to work with. You might take a step further and embrace Gusto’s approach of latency-based queue names such as “within_30_seconds”, “within_5_minutes”, etc. This approach makes it very clear which queues have priority and when queue latency is unacceptable.
web: bundle exec rails s
worker: bundle exec sidekiq -q within_30_sec -q within_5_min -q within_5_hours
worker_high_mem: RAILS_MAX_THREADS=1 bundle exec sidekiq -q high_mem
- Implementing a HorizonalPodAutoscaler is likely best done on an individual queue based on that queue's latency. When the latency begins to spike, you may choose to scale up a given queue's k8s deployment. This assumes that each queue has latency thresholds of "okay" and "we need to scale up or we will miss our latency target."
- Scaling up or down within k8s is not an instant operation. There will be latency when scaling up from k8s and the Sidekiq process starting. Sometimes, a scale up operation will finish but the queue has been empty for a few seconds (or minutes).
- Sidekiq at high levels of concurrency can be dangerous to other systems, like your database or third parties. In the event of a scale-up event, you may start processing jobs faster at the expense of the health of your database. Be careful with write-heavy workloads and more than ~10 k8s pods consuming from a queue.
It's recommend to have the Sidekiq timeout set to a value less than the k8s terminationGracePeriodSeconds. If using the default Sidekiq timeout of 25 seconds, setting terminationGracePeriodSeconds in k8s to 30 seconds is recommended (the k8s default).
SIG_TERM
Do not run sidekiq with a shell command like this: command: ["/bin/sh","-c", "bundle exec sidekiq -t 100 -e production --config config/sidekiq_custom.yml"]
https://github.com/sidekiq/sidekiq/wiki/Kubernetes#health-checks
apiVersion: apps/v1
kind: Deployment
metadata:
name: sidekiq-deployment
spec:
...
template:
...
spec:
terminationGracePeriodSeconds: 120 # wait a bit longer to shut it down
containers:
- name: sidekiq
...
command: ["bundle", "exec", "sidekiq"]
args: ["-t", "100", "-e", "production", "-C", "config/custom_sidekiq_config.yml"]
...
Then you don't need this.
# NOT NEEDED!
lifecycle:
preStop:
exec:
# SIGTERM triggers a quick exit; gracefully terminate with TSTP instead
command:
- /bin/sh
- -c
- echo 'kill -TERM $(ps aux | grep sidekiq | grep busy | awk '{ print $2 }')
For Sidekiq processes, we need a different approach. Instead of using a web request, you can use a combination of Sidekiq lifecycle hooks and a file-based readinessProbe.
Keep in mind that Sidekiq will begin processing jobs from its Redis instance as soon as it is able to. It will not wait for a signal from k8s or another system to enable it to start processing jobs. Contrast this with k8s pods serving web traffic, where k8s might not route requests to them until a readiness probe has passed.
# config/initializers/sidekiq.rb
SIDEKIQ_WILL_PROCESSES_JOBS_FILE = Rails.root.join('tmp/sidekiq_process_has_started_and_will_begin_processing_jobs').freeze
Sidekiq.configure_server do |config|
# We touch and destroy files in the Sidekiq lifecycle to provide a
# signal to Kubernetes that we are ready to process jobs or not.
#
# Doing this gives us a better sense of when a process is actually
# alive and healthy, rather than just beginning the boot process.
config.on(:startup) do
FileUtils.touch(SIDEKIQ_WILL_PROCESSES_JOBS_FILE)
end
config.on(:shutdown) do
FileUtils.rm_f(SIDEKIQ_WILL_PROCESSES_JOBS_FILE)
end
end
readinessProbe:
failureThreshold: 10
exec:
command:
- cat
- /var/www/tmp/sidekiq_process_has_started_and_will_begin_processing_jobs
initialDelaySeconds: 10
periodSeconds: 2
successThreshold: 2
timeoutSeconds: 1
Sidekiq's one-job-per-thread model along with Ruby's Global Virtual Machine Lock (GVL) is an assumption that Sidekiq workloads are not CPU-bound, so an observed CPU utilization metric is inappropriate for autoscaling.
kubectl run --restart=Never --rm -it --image=ubuntu --limits='memory=123Mi' -- sh
livenessProbe:
exec:
command:
- kubemem
- --failure
- "95" # Fail at 95% memory
- --warning
- "85" # Warn at 85% or greater memory usage
- --logfile
- /proc/1/fd/1 # Write to PID 1's output, allowing logging
initialDelaySeconds: 5
periodSeconds: 5
BLPOP
BLPOP is a blocking list pop primitive. It is the blocking version of LPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the head of the first list that is non-empty, with the given keys being checked in the order that they are given.
In Brief : See BLPOP
BRPOP
BRPOP is a blocking list pop primitive. It is the blocking version of RPOP because it blocks the connection when there are no elements to pop from any of the given lists. An element is popped from the tail of the first list that is non-empty, with the given keys being checked in the order that they are given.
In Brief : See BRPOP
BRPOPLPUSH
BRPOPLPUSH is the blocking variant of RPOPLPUSH. When source contains elements, this command behaves exactly like RPOPLPUSH. When used inside a MULTI/EXEC block, this command behaves exactly like RPOPLPUSH. When source is empty, Redis will block the connection until another client pushes to it or until timeout is reached. A timeout of zero can be used to block indefinitely.
In Brief : See BRPOPLPUSH
kubectl get pods sh -o yaml | grep uid
class WorkerPool
attr_reader :ractors
def initialize
@ractors = 10.times.map { spawn_worker }
end
def spawn_worker
Ractor.new do
Ractor.yield(:ready)
loop { Ractor.yield Job.run(Ractor.receive) }
end
end
def self.run(parameters)
ractor, _ignored_result =
Ractor.select(*(@instance ||= new).ractors)
ractor << parameters
end
end
class Job
def self.process(*args)
WorkerPool.run({ class: self, args: args })
end
def self.run(hash)
case hash
in { class: klass, args: args }
klass.new.process(*args)
end
end
end
class PrintJob < Job
def process(message)
puts message
end
end
PrintJob.process('Hello World!')