Skip to content

Instantly share code, notes, and snippets.

@moteus
Last active April 13, 2018 07:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moteus/645497f1133ed683a85f6c6c083f1279 to your computer and use it in GitHub Desktop.
Save moteus/645497f1133ed683a85f6c6c083f1279 to your computer and use it in GitHub Desktop.
Run multiple curl requests from coroutines simultaneously
local cURL = require "cURL.safe"
local json = require "cjson.safe"
-------------------------------------------------------------------
local MultiRequests = {} do
MultiRequests.__index = MultiRequests
function MultiRequests.new(...)
local self = setmetatable({}, MultiRequests)
return self:__init(...)
end
function MultiRequests:__init()
self._workers = {}
self._multi = cURL.multi()
self._responses = {}
self._handels = {}
self._remain = 0
self._error_handlers = {}
return self
end
function MultiRequests:add_worker(fn, errf)
local co = coroutine.create(fn)
table.insert(self._workers, co)
self._error_handlers[co] = errf
return self
end
local function append_request(self, easy, co)
local ok, err = self._multi:add_handle(easy)
if not ok then
return nil, err
end
self._remain = self._remain + 1
local response = {_co = co, content = {}}
self._responses[easy] = response
easy:setopt_writefunction(table.insert, response.content)
return self
end
local function remove_worker(self, co)
for i, worker in ipairs(self._workers) do
if worker == co then
table.remove(self._workers, i)
self._error_handlers[worker] = nil
break
end
end
end
local function proceed_next(self, co, response)
local ok, easy = coroutine.resume(co, response)
if ok and easy then
return append_request(self, easy, co)
end
local errf = self._error_handlers[co]
remove_worker(self, co)
if easy and not ok then
if errf then errf(easy) end
end
return ok, easy
end
local function proceed_response(self, easy, response)
local co = response._co
self._responses[easy] = nil
self._remain = self._remain - 1
response._co = nil
response.url = easy:getinfo_effective_url()
response.code = easy:getinfo_response_code()
response.content = table.concat(response.content)
proceed_next(self, co, response)
end
function MultiRequests:send_request(request)
local easy = table.remove(self._handels) or cURL.easy()
easy:reset()
local body
if request.body then
if type(request.body) == 'string' then
body = request.body
else
local err
body, err = json.encode(request.body)
if not body then
return nil, err
end
end
end
local headers = request.headers
if headers and #headers == 0 then
headers = nil
end
local ok, err = easy:setopt{
url = request.url,
timeout = request.timeout,
followlocation = request.followlocation,
post = request.post,
httpheader = headers,
postfields = body,
}
if not ok then
return nil, err
end
local response = coroutine.yield(easy)
table.insert(self._handels, easy)
if response.error then
return nil, response.error
end
return response
end
function MultiRequests:run()
while #self._workers > 0 do
for i = #self._workers, 1, -1 do
local co = self._workers[i]
proceed_next(self, co, self)
end
while self._remain > 0 do
local last = self._multi:perform() -- do some work
if last < self._remain then -- we have done some tasks
while true do -- proceed results/errors
local easy, ok, err = self._multi:info_read(true) -- get result and remove handle
if easy == 0 then break end -- no more data avaliable for now
local response = self._responses[easy]
response.error = err
proceed_response(self, easy, response)
end
end
self._multi:wait() -- wait while libcurl do io select
end
end
end
end
-------------------------------------------------------------------
local mrequest = MultiRequests.new()
local urls = {
"http://httpbin.org/get?key=1",
"http://httpbin.org/get?key=2",
"http://httpbin.org/get?key=3",
"http://httpbin.org/get?key=4",
}
for i = 1, 2 do
mrequest:add_worker(function(requester)
for j, url in ipairs(urls) do
local response, err = requester:send_request{url = url}
local prefix = string.format("[%d/%d]", i, j)
if response then
print(prefix, response.code, response.content)
else
print(prefix, err)
end
end
end, function(err)
print('[ERROR]', err)
end)
end
mrequest:run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment