Last active
August 7, 2021 21:02
-
-
Save al6x/246730c9337d1365b69fe68d388790a7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base/[basem, timem, jsonm, mathm, logm, docm] | |
import ext/[persistencem] | |
import std/os | |
export jsonm, timem # otherwise there will be bug | |
# Defaults ----------------------------------------------------------------------------------------- | |
let default_retry_timeout = 5.minutes | |
let default_max_retry_timeout = 4.hours | |
# Job ---------------------------------------------------------------------------------------------- | |
type | |
ShouldProcess* = tuple[process: bool, reason: string] | |
LastProcessed* = tuple[timestamp: Time, version: int] | |
Job* = ref object of RootObj | |
id*: string | |
last*: Fallible[LastProcessed] | |
priority*: int | |
Jobs* = ref Table[string, Job] | |
method should_process*(job: Job, jobs: Jobs): ShouldProcess {.base.} = throw "not implemented" | |
method process*(job: Job): LastProcessed {.base.} = throw "not implemented" | |
method after*(job: Job): void {.base.} = discard | |
# JobState ----------------------------------------------------------------------------------------- | |
type | |
HistoryItem = object | |
duration: int | |
crawler_version: int | |
timestamp: Time | |
case is_error: bool | |
of false: | |
discard | |
of true: | |
error: string | |
JobState = object | |
history: seq[HistoryItem] # reversed, [last, previous, ...] | |
retry_at: Option[Time] | |
total_duration: int | |
proc recent_errors_count(state: JobState): int = | |
for item in state.history: | |
if item.is_error: result.inc else: return | |
# proc `==`*(x, y: JobState): bool = %x == %y | |
# Crawler ------------------------------------------------------------------------------------------ | |
type | |
# Jobs = Table[string, JobWithState] | |
JobStates = Table[string, JobState] | |
JobErrors = Table[string, string] | |
# J: Job | |
Crawler* = ref object | |
id: string | |
version: int | |
jobs: Jobs | |
job_states: JobStates | |
data_dir: string | |
focus: HashSet[string] | |
retry_timeout: TimeInterval | |
max_retry_timeout: TimeInterval | |
# Log ---------------------------------------------------------------------------------------------- | |
proc log(crawler_id: string): Log = Log.init(crawler_id) | |
proc log(crawler: Crawler): Log = log(crawler.id) | |
# init --------------------------------------------------------------------------------------------- | |
proc init*( | |
_: type[Crawler], | |
id: string, | |
version: int, | |
jobs: seq[Job], | |
data_dir: string, | |
focus: seq[string], | |
retry_timeout = default_retry_timeout, | |
max_retry_timeout = default_max_retry_timeout | |
): Crawler = | |
# Loading job states | |
let ids = jobs.pick(id).to_hash_set() | |
assert ids.len == jobs.len, "there are jobs with same ids" | |
var job_states = JobStates | |
.read_from(fmt"{data_dir}/{id}-crawler.json", () => JobStates()) | |
.filter((_, id) => id in ids) # Removing states for old jobs | |
# Cleaning retry when restarted | |
for id, _ in job_states: job_states[id].retry_at = Time.none | |
# Adding states for new jobs | |
for job in jobs: | |
if not (job.id in job_states): job_states[job.id] = JobState() | |
Crawler( | |
id: id, | |
version: version, | |
jobs: jobs.to_table((j) => j.id).to_ref, | |
data_dir: data_dir, | |
retry_timeout: retry_timeout, | |
focus: focus.to_set, | |
max_retry_timeout: max_retry_timeout, | |
job_states: job_states, | |
) | |
# get_errors ----------------------------------------------------------------------------- | |
proc get_errors(states: JobStates): JobErrors = | |
for id, state in states: | |
if state.recent_errors_count > 1: | |
result[id] = state.history[0].error | |
# save ----------------------------------------------------------------------------------- | |
proc save(crawler: Crawler): void = | |
crawler.job_states.write_to fmt"{crawler.data_dir}/{crawler.id}-crawler.json" | |
crawler.job_states.get_errors.write_to fmt"{crawler.data_dir}/{crawler.id}-crawler-errors.json" | |
crawler.log.debug "state saved" | |
# process_job -------------------------------------------------------------------------------------- | |
proc process_job(crawler: var Crawler, id: string, reason: string): void = | |
let log = crawler.log | |
var job = crawler.jobs[id] | |
var state = crawler.job_states[id] | |
# Processing | |
let tic = timer_sec() | |
let history_size = 5 | |
try: | |
log.with((id: job.id, reason: reason)).info "processing '{id}', {reason}" | |
job.last = job.process().success | |
# Processing after | |
# log.with((id: job.id)).info "{id} after processing" | |
job.after() | |
let duration = tic() | |
log.with((id: job.id, duration: duration)).info "processed '{id}' in {duration} sec" | |
# Updating state | |
state.history.prepend_capped(HistoryItem( | |
duration: duration, | |
crawler_version: crawler.version, | |
timestamp: Time.now, | |
is_error: false | |
), history_size) | |
state.retry_at = Time.none | |
except: | |
let (duration, error) = (tic(), get_current_exception().message) | |
state.history.prepend_capped(HistoryItem( | |
duration: duration, | |
crawler_version: crawler.version, | |
timestamp: Time.now, | |
is_error: true, | |
error: error | |
), history_size) | |
let retry_count = state.recent_errors_count | |
let retry_at: Time = Time.now + min( | |
crawler.retry_timeout.seconds * 2.pow(retry_count - 1), | |
crawler.max_retry_timeout.seconds | |
).seconds | |
state.retry_at = retry_at.some | |
crawler.job_states[id] = state | |
let log_data = log.with((id: job.id, duration: duration, retry_count: retry_count, error: error)) | |
if retry_count > 1: | |
log_data.warn "can't process '{id}' after {duration} sec, {retry_count} time, '{error}'" | |
else: | |
log_data.info "can't process '{id}' after {duration} sec, {retry_count} time, will be retried, '{error}'" | |
crawler.jobs[id] = job | |
state.total_duration = state.history.pick(duration).sum | |
crawler.job_states[id] = state | |
# run ------------------------------------------------------------------------------------ | |
proc run*(crawler: var Crawler): void = | |
let log = crawler.log | |
log.with((version: crawler.version)).info "started v{version}" | |
while true: | |
# Building queue to process | |
let now = Time.now | |
var queue: seq[tuple[job: Job, state: JobState, reason: string]] = @[] | |
if not crawler.focus.is_empty: | |
for job in crawler.jobs.values: | |
if job.id in crawler.focus: | |
queue.add((job, crawler.job_states[job.id], "focus")) | |
else: | |
for job in crawler.jobs.values: | |
let state = crawler.job_states[job.id] | |
if state.retry_at.is_blank or state.retry_at.get < now: | |
# Checking `should_process` even if `retry_at < now`, because it could be already processed, | |
# but state hasn't been saved because crawler crashed. | |
let (should_process, reason) = job.should_process(crawler.jobs) | |
if should_process: queue.add((job, state, reason)) | |
# p queue.map((j) => j.job.id).join("\n") | |
log | |
.with((counts: queue.count_by((job) => $(job.job.type)))) | |
.info("queue {counts} jobs") | |
# Sorting and batching | |
let sorted = queue.sort_by((job) => (-job.job.priority, job.state.total_duration)) | |
let batch = sorted.take(5) | |
# Processing | |
for job in batch: | |
crawler.process_job(job.job.id, job.reason) | |
# Saving or sleeping if there's nothing to process | |
# Would be better to save state every minute, instead of for every batch. | |
if not batch.is_empty: | |
crawler.save() | |
else: | |
log.info "all processed, waiting" | |
sleep(5.minutes.seconds * 1000) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { timer_sec } from 'base/base.ts' | |
import { Time, TimeInterval } from 'base/time.ts' | |
import { Log } from 'base/log.ts' | |
import { PersistentVariable } from 'base/persistent_variable.ts' | |
import * as fs from "base/fs.ts" | |
export type ShouldProcess = { process: boolean, reason: string } | |
export type LastProcessed = { timestamp: Time, version: number } | |
export interface Job { | |
id: string | |
last: E<LastProcessed> | |
priority?: number // -1000..1000, default 0 | |
should_process(): ShouldProcess | |
process(): Promise<LastProcessed> | |
after?(): Promise<void> | |
} | |
const max_abs_job_priority = 1000 | |
export interface HistoryItemError { | |
duration: number | |
crawler_version: number | |
timestamp: Time | |
is_error: true | |
message: string | |
} | |
export interface HistoryItemSuccess { | |
duration: number | |
crawler_version: number | |
timestamp: Time | |
is_error: false | |
} | |
export type HistoryItem = HistoryItemError | HistoryItemSuccess | |
export interface JobState { | |
history: HistoryItem[] // reversed, [last, previous, ...] | |
retry_at?: Time | |
total_duration: number | |
} | |
type JobStates = Hash<JobState> | |
function job_states_post_json(states: JobStates): JobStates { | |
return states.apply((state) => { | |
state.history.apply((item) => { | |
item.timestamp = new Time(item.timestamp.to_s()) | |
}) | |
if (state.retry_at) state.retry_at = new Time(state.retry_at.to_s()) | |
}) | |
} | |
function recent_errors(state: JobState): HistoryItemError[] { | |
return state.history.filter_map((item) => item.is_error ? item : undefined) | |
} | |
type JobErrors = Hash<string> | |
export class Crawler { | |
protected jobs: Hash<Job> | |
protected job_states: JobStates | |
protected job_states_storage: PersistentVariable<JobStates> | |
protected focus: HSet<string> | |
protected log: Log | |
// protected readonly state_path: string | |
protected constructor( | |
public readonly id: string, | |
public readonly version: number, | |
jobs: Job[], | |
protected readonly data_dir: string, | |
focus: string[], | |
protected readonly retry_timeout = 5..minutes(), | |
protected readonly max_retry_timeout = 4..hours() | |
) { | |
this.jobs = jobs.to_hash(({ id }) => id) | |
this.jobs.each(({ priority }) => | |
assert((priority || 0).abs() <= 1000, `job priority should be in -1000..1000 range`) | |
) | |
this.job_states = new Hash() | |
this.focus = focus.to_set() | |
this.log = new Log(id) | |
this.job_states_storage = new PersistentVariable<JobStates>( | |
Hash, `${this.data_dir}/${this.id}-crawler.json`, () => new Hash(), job_states_post_json | |
) | |
} | |
async load(): Promise<void> { | |
// Loading job states | |
const ids = this.jobs.keys() | |
assert.equal(ids.size(), this.jobs.size(), 'there are jobs with same ids') | |
this.job_states = (await this.job_states_storage.read()) | |
.filter((_, id) => ids.has(id)) // Removing states for old jobs | |
.apply((state) => { delete state.retry_at }) // Cleaning retry after restart | |
ids.each((id) => { // Adding states for new jobs | |
if (!this.job_states.has(id)) this.job_states.set(id, { history: [], total_duration: 0 }) | |
}) | |
} | |
protected get_errors(): JobErrors { | |
return this.job_states.filter_map((state) => { | |
const errors = recent_errors(state) | |
return errors.is_empty() ? undefined : errors[0].message | |
}) | |
} | |
async save(): Promise<void> { | |
await this.job_states_storage.write(this.job_states) | |
fs.write_json(`${this.data_dir}/${this.id}-crawler-errors.json`, this.get_errors()) | |
this.log.debug('state saved') | |
} | |
protected async process_job(id: string, reason: string): Promise<void> { | |
let job = this.jobs.get(id), state = this.job_states.get(id) | |
const tic = timer_sec(), history_size = 5 | |
try { | |
this.log.with({ id, reason }).info("processing '{id}', {reason}") | |
job.last = (await job.process()).to_success() | |
// Processing after | |
if (job.after) await job.after() | |
const duration = tic() | |
this.log.with({ id, duration }).info("processed '{id}' in {duration} sec") | |
// Updating state | |
prepend_capped_m(state.history, { | |
duration, | |
crawler_version: this.version, | |
timestamp: Time.now(), | |
is_error: false | |
}, history_size) | |
delete state.retry_at | |
} catch (e) { | |
const duration = tic(), message = ensure_error(e).message | |
prepend_capped_m(state.history, { | |
duration, | |
crawler_version: this.version, | |
timestamp: Time.now(), | |
is_error: true, | |
message | |
}, history_size) | |
const retry_count = recent_errors(state).size() | |
const delay_sec = [ | |
this.retry_timeout.seconds() * 2..pow(retry_count - 1), | |
this.max_retry_timeout.seconds() | |
].min() | |
state.retry_at = Time.now().plus(delay_sec.seconds()) | |
this.job_states.set(id, state) | |
const l = this.log.with({ id, duration, retry_count, message }) | |
if (retry_count > 1) { | |
l.warn("can't process '{id}' after {duration} sec, {retry_count} time, '{message}'") | |
} else { | |
l.info("can't process '{id}' after {duration} sec, {retry_count} time, will be retried, '{message}'") | |
} | |
} | |
state.total_duration = state.history.map(({ duration }) => duration).sum() | |
} | |
async run(): Promise<void> { | |
this.log.with({ version: this.version }).info("started v{version}") | |
while(true) { | |
// Building queue to process | |
const now = Time.now() | |
const queue: { job: Job, state: JobState, reason: string }[] = [] | |
if (!this.focus.is_empty()) { | |
this.jobs | |
.filter(({ id }) => this.focus.has(id)) | |
.each((job) => queue.add({ job, state: this.job_states.get(job.id), reason: 'focus' })) | |
} else { | |
this.jobs.each((job) => { | |
const state = this.job_states.get(job.id) | |
if (!state.retry_at || state.retry_at.compare(now) < 0) { | |
// Checking `should_process` even if `retry_at < now`, because it could be already processed, | |
// but state hasn't been saved because crawler crashed. | |
const { process, reason } = job.should_process() | |
if (process) queue.add({ job, state, reason }) | |
} | |
}) | |
} | |
this.log.with({ counts: queue.size() }).info("queue {counts} jobs") | |
// Sorting and batching | |
const sorted = queue.asc(({ job, state }) => -1000 * (job.priority || 0) + state.total_duration) | |
const batch = sorted.take(5) | |
// Processing | |
for (const { job, reason } of batch) { | |
await this.process_job(job.id, reason) | |
} | |
// Saving or sleeping if there's nothing to process | |
// Would be better to save state every minute, instead of for every batch. | |
if (!batch.is_empty()) await this.save() | |
else { | |
this.log.info('all processed, waiting') | |
await sleep(5..minutes().seconds() * 1000) | |
} | |
} | |
} | |
} | |
function prepend_capped_m<T>(list: T[], v: T, max: number): void { | |
list.unshift(v) | |
if (list.size() > max) list.pop() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment