Skip to content

Instantly share code, notes, and snippets.

@grtjn
Created July 26, 2022 16:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save grtjn/6989732197111d3e6ae55e8405d37fa9 to your computer and use it in GitHub Desktop.
Save grtjn/6989732197111d3e6ae55e8405d37fa9 to your computer and use it in GitHub Desktop.
Task helper lib
xquery version "1.0-ml";
module namespace tl = "http://marklogic.com/task-lib";
import module namespace tb="ns://blakeley.com/taskbot" at "/lib/taskbot/taskbot.xqm";
declare namespace eval="xdmp:eval" ;
declare option xdmp:mapping "false";
declare function tl:start-synchronized-task($taskid, $iterator, $processor, $timelimit) {
try {
let $acquiredLock := xdmp:invoke-function(function(){
try {
let $_ := xdmp:set-request-time-limit(1)
let $_ := xdmp:lock-for-update($taskid)
return true()
} catch ($e) {
xdmp:trace($taskid, "Task " || $taskid || " still running")
}
})
where $acquiredLock
return
let $_ := xdmp:set-request-time-limit(3600)
let $_ := xdmp:trace($taskid, "Starting task " || $taskid)
return tl:process-queue($taskid, $iterator, $processor, $timelimit)
} catch ($e) {
xdmp:log($e/*:format-string/text(), 'warning')
}
};
declare function tl:process-queue(
$taskid,
$iterator,
$processor,
$timelimit
) {
let $next := xdmp:apply($iterator)
return
if (empty($next)) then
xdmp:trace($taskid, "Task " || $taskid || ": nothing to process")
else if (current-dateTime() + xdmp:elapsed-time() > $timelimit) then
xdmp:trace($taskid, "Task " || $taskid || ": timelimit reached")
else
let $_ := xdmp:invoke-function(function() {
xdmp:apply($processor, $taskid, $next),
map:map()
=>map:with("isolation", "different-transaction")
=>map:with("update", "true")
})
return tl:process-queue($taskid, $iterator, $processor, $timelimit)
};
declare function tl:get-next() {
(: wrapped in extra invoke to ensure it provides fresh results at each call :)
xdmp:invoke-function(function() {
head(
collection("somecollection")
)
})
};
declare function tl:process-trackId($taskid, $id) {
let $uris := cts:uris('', 'eager', cts:json-property-value-query("id", $id))
let $_ := xdmp:trace($taskid, "Task " || $taskid || ": Processing " || $id || "..")
return tb:list-segment-process(
$uris,
500,
"Task " || $taskid || ": Processing " || $id ,
function($list as item()+, $opts as map:map?) {
tb:maybe-fatal(),
for $uri in $list
let $_ := xdmp:trace($taskid, "Task " || $taskid || ": Processing " || $uri)
return xdmp:document-delete($uri, map:entry("ifNotExists", "allow")),
xdmp:commit()
},
map:map(), (: no fn options :)
$tb:OPTIONS-SYNC-UPDATE (: spawn, but wait for results :)
)
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment