Skip to content

Instantly share code, notes, and snippets.

@eliemichel
Created March 10, 2016 14:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eliemichel/be5a767a66da28204718 to your computer and use it in GitHub Desktop.
Save eliemichel/be5a767a66da28204718 to your computer and use it in GitHub Desktop.
Qless server API
================
The first argument is always `now`, the current timestamp. It is not precised for each method in the documentation because bindings usually automatically prepend it to the argument list. But if you are calling the server directly in lua, you will need it.
The best way to have more information about the methods documented here is to look at `api.lua`. The commands available in the API are the members of the `QlessAPI` table, which are easily readable and point to methods provided in other files. For instance, if the body of the method looks like `return Qless.Job(jid):foobar(...)`, then it is likely that the function is defined and documented in `job.lua` because it's getting a `Job` object. If you cannot say at first because it's not using an object, it must be in `base.lua`.
# Config
## config.get([key])
Get a server configuration value, or the full configuration.
Note: Configuration object is just a Redis hset.
#### Arguments
key: (optional) Configuration field to access.
#### Returns
The value corresponding to the key passed as argument, or the full json encoded configuration if no key is provided.
## config.set(key, value)
Set a server configuration value.
#### Arguments
key: Configuration field to edit
value: New value for this field
#### Returns
nil
## config.unset(key)
Remove a field from server configuration
#### Arguments
key: Key to remove
#### Returns
nil
# Queues
## queues([name])
Get information about a queue, or about all queues.
This includes the name of the queue, plus some job counts.
#### Arguments
name: (optional) Name of the queue to get
#### Returns
json encoded queue or list of queues if no name was provided
# Jobs
## get(jid)
Get job data for a given job.
#### Arguments
jid: Identifier for the job to get.
#### Returns
json for the job identified by the provided jid.
If the job is not present, then `nil` is returned.
## multiget([jid1, [jid2, [jid3, ...]]])
Get multiple jobs in a single connection to the server.
You can specify as many arguments as you need jobs.
#### Arguments
jid1: (optional) Identifier of the first job to get
jid2: (optional) Identifier of the second job to get
jid3: (optional) Identifier of the third job to get
...
#### Returns
A json-encoded list of jobs data, in the same order as they were requested.
Job IDs that do not exist are skipped, so the length of the returned list is equal or less than the number of arguments. Furthermore, this prevents one from assuming that the nth element of the returned list corresponds to the nth argument given.
## jobs(state, [queue], [offset, [count]])
Get available job ids in a given state in a given queue. Completed jobs are not in a queue any more so if you are querying complete jobs you must not provide a queue name.
#### Arguments
state: State of jobs to return. Possible values are 'stalled', 'running', 'scheduled', 'depends', 'recurring' or 'complete'.
queue: (only if state is not 'complete') Queue in which counting the jobs.
offset: (optional) Offset in the list of job IDs returned. Default value if 0.
count: (optional) Maximum number of IDs returned. Set it to -1 to get the full list. Default value if 25.
#### Returns
A list of job IDs currently considered to be in the provided state in a particular queue.
## complete(jid, worker, queue, data, ...)
Complete a job and optionally put it in another queue, either scheduled or to be considered waiting immediately. It can also optionally accept other jids on which this job will be considered dependent before it's considered valid.
#### Arguments
jid: Identifier of the job to set to complete
worker: Worker on behalf of which this message is sent
queue: Queue in which the job should be
data: New data to attach to the job
The variable-length arguments may be pairs of the form:
('next' , queue) : The queue to advance it to next
('delay' , delay) : The delay for the next queue
('depends', '["jid1", "jid2", ...]') : Json of job list it depends on in the new queue
#### Returns
The new job state. One of 'scheduled', 'waiting', 'depends', or 'complete'.
An error is raised if the job does not exist, was not running in the selected queue or was owned by another worker.
## failed([group, [offset, [count]]])
If no group is provided, this returns a JSON blob of the counts of the
various groups of failures known. If a group is provided, it will report up
to `count` from `offset` of the jobs affected by that issue.
#### Arguments
group: (optional) Category of error to return
offset: (optional) Index wrt. the list of failed jobs in the selected group of the first job returned. Default value is 0.
count: (optional) Number of detailed jobs returned. Default value is 25.
#### Returns
In any way, the returned value is json encoded.
If no `group` was provided:
```
{
'group1': 1,
'group2': 5,
...
}
```
Otherwise:
```
{
'total': 20,
'jobs': [
{
# All the normal keys for a job
'jid': ...,
'data': ...
# The message for this particular instance
'message': ...,
'group': ...,
}, ...
]
}
```
## fail(jid, worker, group, message, [data])
Mark the particular job as failed, with the provided group, and a more specific message. By `group`, we mean some phrase that might be one of several categorical modes of failure. The `message` is something more job-specific, like perhaps a traceback.
This method should NOT be used to note that a job has been dropped or has failed in a transient way. This method SHOULD be used to note that a job has something really wrong with it that must be remedied.
The motivation behind the `group` is so that similar errors can be grouped together. Optionally, updated data can be provided for the job. A job in any state can be marked as failed. If it has been given to a worker as a job, then its subsequent requests to heartbeat or complete that job will fail. Failed jobs are kept until they are canceled or completed.
#### Arguments
jid: Identifier of the job to set to failed
worker: Worker on behalf of which this message is sent
group: An arbitrary category of error. The choice of possible values is let to the end user.
message: A detailed error messages, likely a traceback or some debug info.
data: (optional) New data to attach to the job
#### Returns
The id of the failed job if successful, or `False` on failure.
## retry(jid, queue, worker, [delay, [group, [message]]])
This script accepts jid, queue, worker and delay for retrying a job. This is similar in functionality to `put`, except that this counts against the retries a job has for a stage.
If the maximum number of retries is reached, the job is set to failed with the given `group` and `message`. A default messaging about retries is used if these were not provided.
#### Arguments
jid: Identifier of the job to retry
queue: Queue into which teh job is put
worker: Worker on behalf of which this message is sent
delay: (optional) Delay before which the job must not be started. Schedules the job for later. Default value is 0 (start as soon as possible).
group: (optional) Category of error in which setting the job if the maximum number of attempts is exceeded after this call. Default is `failed-retries-<queue name>`.
message: (optional) Error message in case of maximum number of attempts exceeded. Default is `Job exhausted retries in queue "<old queue name>"`.
#### Returns
The number of retries remaining. If the allowed retries have been exhausted, then it automatically fails, and a negative number is returned.
Throws an exception if:
- the worker is not the worker with a lock on the job
- the job is not actually running
## depends(jid, 'on'|'off', jid1, [jid2, [...]])
Add or remove dependencies a job has.
If `'on'` is provided, the provided jids are added as dependencies. If `'off'` is provided, then those jids are removed as dependencies. The special value `'all'` can be used as first and only jid after a `'off'` to mean removing all the dependencies.
#### Arguments
jid: Identifier of the job of which adding or removing dependencies
`'on'|'off'`: Whether adding or removing dependencies
jid1: Job to add/remove dependencies about. The special value `'all'` can be used if the mode is `'off'`.
jid2: (optional) Other job to change dependency about
...
#### Returns
If a job is not already in the 'depends' state, then this call will return false. Otherwise, it will return true
## heartbeat(jid, worker, [data])
Renew this worker's lock on this job.
#### Arguments
jid: Identifier of the job this heartbeat is about
worker: Worker on behalf of which this message is sent
data: (optional) Update of job's data
#### Returns
The timestamp at which the renewed lock will expire. The worker must send another heartbeat before this time otherwise the job could be rescheduled.
Throws an exception if:
- the job has been given to another worker
- the job has been completed
- the job has been canceled
- the job is not running
# Workers
## workers([name])
Provide data about all the workers, or if a specific worker is provided, then which jobs that worker is responsible for.
#### Arguments
name: (optional) Name of the worker to return.
#### Returns
If no worker name is provided, return a json encoded list of available workers. Information about a worker are its name (of the form `<hostname>-<pid>`), the number of jobs it is responsible for and the number of stalled jobs.
If a worker id is provided, the return value details the jids of the jobs running or stalled:
```
{
'jobs': [
jid1,
jid2,
...
], 'stalled': [
jid1,
...
]
}
```
## track(['track'|'untrack', jid])
Get tracked jobs or (un)set tracking for a given job.
#### Arguments:
`'track'|'untrack'`: (optional) Whether to enable or disable tracking for the provided job
jid: (only if the first argument is provided) Identifier of the job to set tracking of.
#### Returns
When adding a job, 1 if it is newly tracked, or 0 if it was already in the tracked list.
When removing, 1 if the job was tracked, 0 if not.
If no arguments were provided, a details of all currently-tracked jobs, with expired jobs in a separate list:
```
{
'jobs': [
{
'jid': ...,
# All the other details you'd get from 'get'
}, {
...
}
], 'expired': [
# These are all the jids that are completed and whose data expired
'deadbeef',
...,
...,
]
}
```
## tag(command, ...)
Possible version are:
```
tag('add'|'remove', jid, tag, [tag, ...])
tag('get', tag, [offset, [count]])
tag('top', [offset, [count]])
```
Accepts a jid, 'add' or 'remove', and then a list of tags to either add or remove from the job. Alternatively, 'get' followed by a tag, to get jobs associated with that tag, and offset and count.
If 'add' or 'remove', the response is a list of the jobs current tags, or False if the job doesn't exist. If 'get', the response is of the form:
```
{
total: ...,
jobs: [
jid,
...
]
}
```
If 'top' is supplied, it returns the most commonly-used tags
in a paginated fashion.
QlessAPI.stats = function(now, queue, date)
return cjson.encode(Qless.queue(queue):stats(now, date))
end
QlessAPI.priority = function(now, jid, priority)
return Qless.job(jid):priority(priority)
end
-- Add logging to a particular jid
QlessAPI.log = function(now, jid, message, data)
assert(jid, "Log(): Argument 'jid' missing")
assert(message, "Log(): Argument 'message' missing")
if data then
data = assert(cjson.decode(data),
"Log(): Argument 'data' not cjson: " .. tostring(data))
end
local job = Qless.job(jid)
assert(job:exists(), 'Log(): Job ' .. jid .. ' does not exist')
job:history(now, message, data)
end
QlessAPI.peek = function(now, queue, count)
local jids = Qless.queue(queue):peek(now, count)
local response = {}
for i, jid in ipairs(jids) do
table.insert(response, Qless.job(jid):data())
end
return cjson.encode(response)
end
QlessAPI.pop = function(now, queue, worker, count)
local jids = Qless.queue(queue):pop(now, worker, count)
local response = {}
for i, jid in ipairs(jids) do
table.insert(response, Qless.job(jid):data())
end
return cjson.encode(response)
end
QlessAPI.pause = function(now, ...)
return QlessQueue.pause(now, unpack(arg))
end
QlessAPI.unpause = function(now, ...)
return QlessQueue.unpause(unpack(arg))
end
QlessAPI.cancel = function(now, ...)
return Qless.cancel(unpack(arg))
end
QlessAPI.timeout = function(now, ...)
for _, jid in ipairs(arg) do
Qless.job(jid):timeout(now)
end
end
QlessAPI.put = function(now, me, queue, jid, klass, data, delay, ...)
return Qless.queue(queue):put(now, me, jid, klass, data, delay, unpack(arg))
end
QlessAPI.requeue = function(now, me, queue, jid, ...)
local job = Qless.job(jid)
assert(job:exists(), 'Requeue(): Job ' .. jid .. ' does not exist')
return QlessAPI.put(now, me, queue, jid, unpack(arg))
end
QlessAPI.unfail = function(now, queue, group, count)
return Qless.queue(queue):unfail(now, group, count)
end
-- Recurring job stuff
QlessAPI.recur = function(now, queue, jid, klass, data, spec, ...)
return Qless.queue(queue):recur(now, jid, klass, data, spec, unpack(arg))
end
QlessAPI.unrecur = function(now, jid)
return Qless.recurring(jid):unrecur()
end
QlessAPI['recur.get'] = function(now, jid)
local data = Qless.recurring(jid):data()
if not data then
return nil
end
return cjson.encode(data)
end
QlessAPI['recur.update'] = function(now, jid, ...)
return Qless.recurring(jid):update(now, unpack(arg))
end
QlessAPI['recur.tag'] = function(now, jid, ...)
return Qless.recurring(jid):tag(unpack(arg))
end
QlessAPI['recur.untag'] = function(now, jid, ...)
return Qless.recurring(jid):untag(unpack(arg))
end
QlessAPI.length = function(now, queue)
return Qless.queue(queue):length()
end
QlessAPI['worker.deregister'] = function(now, ...)
return QlessWorker.deregister(unpack(arg))
end
QlessAPI['queue.forget'] = function(now, ...)
QlessQueue.deregister(unpack(arg))
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment