Skip to content

Instantly share code, notes, and snippets.

@gottlike

gottlike/Gemfile Secret

Last active June 18, 2024 11:33
Show Gist options
  • Save gottlike/e9cfed216ea7637c1c9a4ef031eb4c9e to your computer and use it in GitHub Desktop.
Save gottlike/e9cfed216ea7637c1c9a4ef031eb4c9e to your computer and use it in GitHub Desktop.
Polyphony Issue
# frozen_string_literal: true
$stdout.sync = true
require 'bundler/setup'
require 'json'
require 'nats/io/client'
require 'parallel'
ENV['NATS_CLUSTER'] = 'nats://localhost:4222'
ENV['QUEUE_NAME'] = 'queue.test'
raise('Environment variable NATS_CLUSTER not defined') if ENV.fetch('NATS_CLUSTER', nil).nil?
raise('Environment variable QUEUE_NAME not defined') if ENV.fetch('QUEUE_NAME', nil).nil?
%w[INT TERM].each do |sig|
trap(sig) do
puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIG#{sig}, shutting down"
exit!
end
end
nats_cluster = ENV.fetch('NATS_CLUSTER', nil)
queue_name = ENV.fetch('QUEUE_NAME', nil)
cluster_opts = {
servers: nats_cluster.split(','),
reconnect_time_wait: 1,
max_reconnect_attempts: -1,
ping_interval: 10,
dont_randomize_servers: true,
connect_timeout: 2
}
nats = NATS::IO::Client.new
nats.on_reconnect { puts "Reconnected to server at #{nats.connected_server}" }
nats.on_disconnect { puts 'NATS was disconnected' }
nats.on_close { puts 'NATS was closed' }
nats.on_error { |error| puts error.message }
nats.connect(cluster_opts)
def send_request(nats, queue_name, data)
response = nats.request(queue_name, data, timeout: 5)
if response.nil?
puts data
return response
end
response.data.to_s
rescue NATS::Timeout
puts 'Request timed out'
nil
end
test_data = 5000.times.collect { |i| { id: i, msg: 'Ping!' } }
start = Time.now
responses = []
Parallel.map(test_data, in_threads: 2000) do |data|
responses << send_request(nats, queue_name, data.to_json)
end
puts "Finished #{responses.compact.length} requests in #{(Time.now - start).round(4)}s, at rate of #{(responses.compact.length / (Time.now - start)).round} rps"
puts responses.last
nats.close
# frozen_string_literal: true
source 'https://rubygems.org'
gem 'base64'
gem 'json'
gem 'nats-pure'
gem 'parallel'
gem 'polyphony'
# frozen_string_literal: true
$stdout.sync = true
require 'bundler/setup'
require 'json'
require 'nats/io/client'
require 'parallel'
require 'polyphony'
ENV['NATS_CLUSTER'] = 'nats://localhost:4222'
ENV['QUEUE_NAME'] = 'queue.test'
raise('Environment variable NATS_CLUSTER not defined') if ENV.fetch('NATS_CLUSTER', nil).nil?
raise('Environment variable QUEUE_NAME not defined') if ENV.fetch('QUEUE_NAME', nil).nil?
%w[INT TERM].each do |sig|
trap(sig) do
puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIG#{sig}, shutting down"
exit!
end
end
nats_cluster = ENV.fetch('NATS_CLUSTER', nil)
queue_name = ENV.fetch('QUEUE_NAME', nil)
cluster_opts = {
servers: nats_cluster.split(','),
reconnect_time_wait: 1,
max_reconnect_attempts: -1,
ping_interval: 10,
dont_randomize_servers: true,
connect_timeout: 2
}
nats = NATS::IO::Client.new
nats.on_reconnect { puts "Reconnected to server at #{nats.connected_server}" }
nats.on_disconnect { puts 'NATS was disconnected' }
nats.on_close { puts 'NATS was closed' }
nats.on_error { |error| puts error.message }
nats.connect(cluster_opts)
def send_request(nats, queue_name, data)
response = nats.request(queue_name, data, timeout: 5)
if response.nil?
puts data
return response
end
response.data.to_s
rescue NATS::Timeout
puts 'Request timed out'
nil
end
nats.subscribe(queue_name, queue: queue_name) do |msg, reply, _subject|
# raise('raise works here')
spin do
# raise('raise does not work here')
msg = JSON.parse(msg, symbolize_names: true)
response = JSON.parse(send_request(nats, "#{queue_name}.sub-worker", msg.to_json), symbolize_names: true)
nats.publish(reply, response.to_json) if reply
end
end
sleep
# frozen_string_literal: true
$stdout.sync = true
require 'bundler/setup'
require 'json'
require 'nats/io/client'
require 'parallel'
require 'polyphony'
ENV['NATS_CLUSTER'] = 'nats://localhost:4222'
ENV['QUEUE_NAME'] = 'queue.test.sub-worker'
raise('Environment variable NATS_CLUSTER not defined') if ENV.fetch('NATS_CLUSTER', nil).nil?
raise('Environment variable QUEUE_NAME not defined') if ENV.fetch('QUEUE_NAME', nil).nil?
%w[INT TERM].each do |sig|
trap(sig) do
puts "[#{Time.now.strftime('%Y-%m-%d %H:%M:%S')}] - Received SIG#{sig}, shutting down"
exit!
end
end
nats_cluster = ENV.fetch('NATS_CLUSTER', nil)
queue_name = ENV.fetch('QUEUE_NAME', nil)
cluster_opts = {
servers: nats_cluster.split(','),
reconnect_time_wait: 1,
max_reconnect_attempts: -1,
ping_interval: 10,
dont_randomize_servers: true,
connect_timeout: 2
}
nats = NATS::IO::Client.new
nats.on_reconnect { puts "Reconnected to server at #{nats.connected_server}" }
nats.on_disconnect { puts 'NATS was disconnected' }
nats.on_close { puts 'NATS was closed' }
nats.on_error { |error| puts error.message }
nats.connect(cluster_opts)
nats.subscribe(queue_name, queue: queue_name) do |msg, reply, _subject|
# raise('raise works here')
spin do
# raise('raise does not work here')
sleep 3
msg = JSON.parse(msg, symbolize_names: true)
response = { id: msg[:id], msg: 'Pong!' }
nats.publish(reply, response.to_json) if reply
end
end
sleep
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment