Skip to content

Instantly share code, notes, and snippets.

@motebaya
Last active March 24, 2024 14:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save motebaya/cd89216dd41b0a23ed94158ca04bc248 to your computer and use it in GitHub Desktop.
Save motebaya/cd89216dd41b0a23ed94158ca04bc248 to your computer and use it in GitHub Desktop.
diving deeper and experiment with concurrency and parallelism in Ruby for I/O bound operations.
#!/usr/bin/ruby
# copyright - 2024.03.12 motebaya
require "async"
require "async/barrier"
require "async/http/internet"
require "benchmark"
require "httparty"
require "concurrent"
require "parallel"
# based https://www.linkedin.com/advice/0/whats-difference-between-concurrent-parallel-programming
# - async -> concurrency != parallelism -> same event loop -> singel thread.
# best for I/O bound operation.
# - sync -> parallelism == concurrency -> no shared event loop -> different thread == independently.
# best for CPU bound operation
def get_http(index, client, queue)
request = client.get("https://httpbin.org/delay/1.6")
queue.push(" request ok: #{index}")
request&.close if request # always get drain warning if not close each request, dunno.
end
# w_pipe (write pipe) for IPC fork process.
# https://github.com/jnunemaker/httparty
def sync_http(index, w_pipe = nil)
HTTParty.get("https://httpbin.org/delay/1.6")
if w_pipe.nil?
" sync request ok: #{index}"
else
w_pipe.puts(" sync fork request ok: #{index}")
end
end
# i confused, it seem like no different between using `async-http` (async) and httpparty (sync).
# https://socketry.github.io/async/guides/asynchronous-tasks/index.html
# https://socketry.github.io/async-http/#multiple-requests
# using `HTTParty`
# - the function called asynchronously, but the http request are synchronously.
# it mean each request is blocking and must complete before the next one.
# using `async-http`
# - each http request is non-blocking and can overlap in execution with other tasks.
# and even though request are made asycn, the barrier ensures that program wait for all request
# complete before next step.
# The differences don't seem too clear, but i think it would be more efficient to
# work with async overall.
def async_test
Async do
results = []
client = Async::HTTP::Internet.new
barrier = Async::Barrier.new
3.times do |i|
barrier.async do
# sleep 1
results << get_http(i + 1, client, results)
# get_http(i + 1, client, results)
end
end
barrier.wait
puts results.join
end
end
# based module example:
# - https://github.com/socketry/async-http?tab=readme-ov-file#multiple-requests
# we can make multiple request using barrier.
# it seem like `Promise.all` in javascript, that's mean it will waiting all tasks and
# ensure all tasks are finished before continue.
# - https://thoughtbot.com/blog/my-adventure-with-async-ruby
#
# but, based on this article, we can also directly use `async` without using a barrier.
# both working and ran in same time, so,, where the different LOL??
# - https://brunosutic.com/blog/async-ruby#async-http
#
# well, the different using `Async` directly is:
# - the program doesn't wait for all tasks to complete before proceeeding to next step
# - each task is shceduled to run asynchronously, and the program continue execution to next
# step without waiting completion of all tasks.
def async_another_test
Async do
# |task|
results = []
barrier = Async::Barrier.new
3.times do |i|
barrier.async do
# task.async do
results << sync_http(i + 1)
end
end
barrier.wait
# look at the 'hello world' string, which will be printed directly when using async
# directly. whereas when using a barrier, it won't be printed until all tasks are completed.
# puts "hello world"
puts results.join
end
end
# a normal call func from loop, no concurrency/parallel.
def sync_normal_test
results = []
3.times do |i|
results << sync_http(i+1)
end
puts results.join
end
# https://docs.ruby-lang.org/en/3.0/Thread.html
def sync_thread_test
threads = []
3.times do |i|
threads << Thread.new do
# sleep 1
# sync_http(i + 1)
sync_http(i + 1)
end
end
puts threads.map(&:value)
end
# fork method are from official, based in docs.. this method create child process
# and ran in truly parallel/concurrency, each process have different PID (process identifier)
# and has its own memory space too. the last is kinda tricky, cause each process is not tied to the other
# it require IPC (inter process communication) for sharing the results/data.
# i think it not lightweight like `Thread` method which this method sharing memory space with other thread.
# and more faster too.
# https://docs.ruby-lang.org/en/3.0/Process.html#method-c-fork
# https://docs.ruby-lang.org/en/3.0/Process.html#method-c-waitall
def sync_fork_test
r_pipe, w_pipe = IO.pipe
3.times do |i|
fork do
r_pipe.close
sync_http(i + 1, w_pipe)
exit!
end
end
w_pipe.close
results = []
until r_pipe.eof?
results << r_pipe.gets.chomp
end
Process.waitall
puts results
end
# https://github.com/ruby-concurrency/concurrent-ruby
# https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Future.html
def sync_concurrent_test
results = []
3.times do |i|
results << Concurrent::Promise.execute do
sync_http(i + 1)
end
end
results = results.map(&:value)
puts results
end
# Another method, the docs say `Future` method was deprecated and replaced to `Promises`
# https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html
def sync_http_promise(index)
Concurrent::Promises.future do
HTTParty.get("https://httpbin.org/delay/1.6")
" sync request ok: #{index}"
end
end
def sync_concurrent_promise_test
tasks = []
3.times do |i|
tasks << sync_http_promise(i + 1)
end
tasks = Concurrent::Promises.zip(*tasks)
puts tasks.value
end
# it smiliar like python `concurrent.futures.ThreadPoolExecutor` maybe, and it more customable
# we can set min/max if threads too.
# https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadPoolExecutor.html
def sync_concurrent_threadpool_test
executor = Concurrent::ThreadPoolExecutor.new(
min_threads: 5,
max_threads: 5,
max_queue: 10,
fallback_policy: :caller_runs
)
results = []
3.times do |i|
executor.post do
results << sync_http(i + 1)
end
end
executor.shutdown
executor.wait_for_termination
puts results
end
# simple API for parallelism based on threads.
# https://github.com/grosser/parallel
def sync_parallel_test
results = []
Parallel.map(
1..3,
in_threads: 3
) do |i|
results << sync_http(i)
end
puts results
end
threads = []
File.foreach(File.expand_path(__FILE__)) do |func|
reg = Regexp.new(/^def\s+([^\s\()]+)/).match(func)
if !reg.nil?
if reg[1].end_with?("test")
threads << Thread.new do
puts " Running: #{reg[1].upcase}"
benchmark = Benchmark.measure {
send(reg[1])
}
" Benchmark function: #{reg[1].upcase} finished at: #{benchmark.real}\n"
end
end
end
end
result = threads.map(&:value)
puts result.join
# async_test
# async_another_test
# sync_normal_test
# sync_thread_test
# sync_fork_test
# sync_concurrent_test
# sync_concurrent_promise_test
# sync_concurrent_threadpool_test
# sync_parallel_test
@motebaya
Copy link
Author

motebaya commented Mar 17, 2024

 Benchmark function: ASYNC_TEST finished at: 3.2570847679999133
 Benchmark function: ASYNC_ANOTHER_TEST finished at: 3.1932133710006383
 Benchmark function: SYNC_NORMAL_TEST finished at: 8.593838957999651
 Benchmark function: SYNC_THREAD_TEST finished at: 3.0380510249997315
 Benchmark function: SYNC_FORK_TEST finished at: 3.1174781950003307
 Benchmark function: SYNC_CONCURRENT_TEST finished at: 2.9335770309999134
 Benchmark function: SYNC_CONCURRENT_PROMISE_TEST finished at: 2.9874192970000877
 Benchmark function: SYNC_CONCURRENT_THREADPOOL_TEST finished at: 3.011546085000191
 Benchmark function: SYNC_PARALLEL_TEST finished at: 2.997427269000582

average concurrency time 3 seconds
having a powerful device to run a threadpool with multi cores is great, but in I/O bound like http requests, it's useless if the internet and server are slow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment