Skip to content

Instantly share code, notes, and snippets.

@johnallen3d
Last active December 9, 2015 23:58
Show Gist options
  • Save johnallen3d/4347636 to your computer and use it in GitHub Desktop.
Save johnallen3d/4347636 to your computer and use it in GitHub Desktop.
Our Ruote storage has collected some cruft over time. We have a fair number of abandoned (leftover) items in storage that can be killed. In addition because our workers are running on Heroku they are not being cleaned up properly.
# **** grab a reference to the redis instance ****
redis = RuoteKit.dashboard.storage.redis;nil
# **** delete all errors ****
redis.del(redis.keys('*errors*'))
# **** delete all expressions and workitems from march 2013 ****
redis.del(redis.keys('expressions*201303*'))
redis.del(redis.keys('workitems*201303*'))
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# I believe that the code below here is an old and inefficient
# approach. The above few lines with tweaks to the dates on
# lines 8:9 should do the trick.
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
# **** reset worker info data ****
doc=Workflow.engine.storage.redis.get('variables/workers');nil
doc=Rufus::Json.decode(doc);nil
doc["workers"] = {}
Workflow.engine.storage.put(doc)
Workflow.engine.worker_info
# **** respark rescent stalled on validate ****
(0..30000).step(1000).each { |skip_count|
Workflow.engine.processes(descending: true, limit: 100, skip: skip_count).each { |process|
begin;
next unless DateTime.parse(process.launched_time) > 3.day.ago;
next unless process.position[0][1] == "validate";
p process.wfid;
Workflow.engine.respark(process.wfid);
rescue;
p "error";
next;
end;
};
};nil
# **** replay errors ****
Workflow.engine.errors(limit: 1000).each { |error|
begin;
# next unless error.fields['workflow'] == '2013_all-in_email'
Workflow.replay_at_error(error)
rescue;
next;
end;
};nil
# **** identify leftovers and delete ****
wfids = Workflow.engine.storage.expression_wfids({});nil
wis = Workflow.engine.storage.get_many('workitems').compact;nil
ers = Workflow.engine.storage.get_many('errors').compact;nil
# scs = Workflow.engine.storage.get_many('schedules').compact;nil
left = (wis + ers).reject { |doc| wfids.include?(doc['fei']['wfid']) rescue next };nil
# left.each { |doc| dashboard.storage.delete(doc) }
# **** kill old workitems ****
p Workflow.engine.processes(count: true);nil
(0..20000).step(1000).each { |skip_count|
Workflow.engine.processes(descending: false, limit: 1000, skip: skip_count).each { |process|
begin;
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
next unless DateTime.parse(process.launched_time) < 3.months.ago;
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
wfid = process.wfid;nil
launch = process.launched_time;nil
Workflow.engine.remove_process(wfid);nil
p [wfid, launch];
rescue;
end;
};
};nil
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment