Skip to content

Instantly share code, notes, and snippets.

@al6x
Last active August 7, 2021 21:02
Show Gist options
  • Save al6x/246730c9337d1365b69fe68d388790a7 to your computer and use it in GitHub Desktop.
Save al6x/246730c9337d1365b69fe68d388790a7 to your computer and use it in GitHub Desktop.
import base/[basem, timem, jsonm, mathm, logm, docm]
import ext/[persistencem]
import std/os
export jsonm, timem # otherwise there will be bug
# Defaults -----------------------------------------------------------------------------------------
let default_retry_timeout = 5.minutes
let default_max_retry_timeout = 4.hours
# Job ----------------------------------------------------------------------------------------------
type
ShouldProcess* = tuple[process: bool, reason: string]
LastProcessed* = tuple[timestamp: Time, version: int]
Job* = ref object of RootObj
id*: string
last*: Fallible[LastProcessed]
priority*: int
Jobs* = ref Table[string, Job]
method should_process*(job: Job, jobs: Jobs): ShouldProcess {.base.} = throw "not implemented"
method process*(job: Job): LastProcessed {.base.} = throw "not implemented"
method after*(job: Job): void {.base.} = discard
# JobState -----------------------------------------------------------------------------------------
type
HistoryItem = object
duration: int
crawler_version: int
timestamp: Time
case is_error: bool
of false:
discard
of true:
error: string
JobState = object
history: seq[HistoryItem] # reversed, [last, previous, ...]
retry_at: Option[Time]
total_duration: int
proc recent_errors_count(state: JobState): int =
for item in state.history:
if item.is_error: result.inc else: return
# proc `==`*(x, y: JobState): bool = %x == %y
# Crawler ------------------------------------------------------------------------------------------
type
# Jobs = Table[string, JobWithState]
JobStates = Table[string, JobState]
JobErrors = Table[string, string]
# J: Job
Crawler* = ref object
id: string
version: int
jobs: Jobs
job_states: JobStates
data_dir: string
focus: HashSet[string]
retry_timeout: TimeInterval
max_retry_timeout: TimeInterval
# Log ----------------------------------------------------------------------------------------------
proc log(crawler_id: string): Log = Log.init(crawler_id)
proc log(crawler: Crawler): Log = log(crawler.id)
# init ---------------------------------------------------------------------------------------------
proc init*(
_: type[Crawler],
id: string,
version: int,
jobs: seq[Job],
data_dir: string,
focus: seq[string],
retry_timeout = default_retry_timeout,
max_retry_timeout = default_max_retry_timeout
): Crawler =
# Loading job states
let ids = jobs.pick(id).to_hash_set()
assert ids.len == jobs.len, "there are jobs with same ids"
var job_states = JobStates
.read_from(fmt"{data_dir}/{id}-crawler.json", () => JobStates())
.filter((_, id) => id in ids) # Removing states for old jobs
# Cleaning retry when restarted
for id, _ in job_states: job_states[id].retry_at = Time.none
# Adding states for new jobs
for job in jobs:
if not (job.id in job_states): job_states[job.id] = JobState()
Crawler(
id: id,
version: version,
jobs: jobs.to_table((j) => j.id).to_ref,
data_dir: data_dir,
retry_timeout: retry_timeout,
focus: focus.to_set,
max_retry_timeout: max_retry_timeout,
job_states: job_states,
)
# get_errors -----------------------------------------------------------------------------
proc get_errors(states: JobStates): JobErrors =
for id, state in states:
if state.recent_errors_count > 1:
result[id] = state.history[0].error
# save -----------------------------------------------------------------------------------
proc save(crawler: Crawler): void =
crawler.job_states.write_to fmt"{crawler.data_dir}/{crawler.id}-crawler.json"
crawler.job_states.get_errors.write_to fmt"{crawler.data_dir}/{crawler.id}-crawler-errors.json"
crawler.log.debug "state saved"
# process_job --------------------------------------------------------------------------------------
proc process_job(crawler: var Crawler, id: string, reason: string): void =
let log = crawler.log
var job = crawler.jobs[id]
var state = crawler.job_states[id]
# Processing
let tic = timer_sec()
let history_size = 5
try:
log.with((id: job.id, reason: reason)).info "processing '{id}', {reason}"
job.last = job.process().success
# Processing after
# log.with((id: job.id)).info "{id} after processing"
job.after()
let duration = tic()
log.with((id: job.id, duration: duration)).info "processed '{id}' in {duration} sec"
# Updating state
state.history.prepend_capped(HistoryItem(
duration: duration,
crawler_version: crawler.version,
timestamp: Time.now,
is_error: false
), history_size)
state.retry_at = Time.none
except:
let (duration, error) = (tic(), get_current_exception().message)
state.history.prepend_capped(HistoryItem(
duration: duration,
crawler_version: crawler.version,
timestamp: Time.now,
is_error: true,
error: error
), history_size)
let retry_count = state.recent_errors_count
let retry_at: Time = Time.now + min(
crawler.retry_timeout.seconds * 2.pow(retry_count - 1),
crawler.max_retry_timeout.seconds
).seconds
state.retry_at = retry_at.some
crawler.job_states[id] = state
let log_data = log.with((id: job.id, duration: duration, retry_count: retry_count, error: error))
if retry_count > 1:
log_data.warn "can't process '{id}' after {duration} sec, {retry_count} time, '{error}'"
else:
log_data.info "can't process '{id}' after {duration} sec, {retry_count} time, will be retried, '{error}'"
crawler.jobs[id] = job
state.total_duration = state.history.pick(duration).sum
crawler.job_states[id] = state
# run ------------------------------------------------------------------------------------
proc run*(crawler: var Crawler): void =
let log = crawler.log
log.with((version: crawler.version)).info "started v{version}"
while true:
# Building queue to process
let now = Time.now
var queue: seq[tuple[job: Job, state: JobState, reason: string]] = @[]
if not crawler.focus.is_empty:
for job in crawler.jobs.values:
if job.id in crawler.focus:
queue.add((job, crawler.job_states[job.id], "focus"))
else:
for job in crawler.jobs.values:
let state = crawler.job_states[job.id]
if state.retry_at.is_blank or state.retry_at.get < now:
# Checking `should_process` even if `retry_at < now`, because it could be already processed,
# but state hasn't been saved because crawler crashed.
let (should_process, reason) = job.should_process(crawler.jobs)
if should_process: queue.add((job, state, reason))
# p queue.map((j) => j.job.id).join("\n")
log
.with((counts: queue.count_by((job) => $(job.job.type))))
.info("queue {counts} jobs")
# Sorting and batching
let sorted = queue.sort_by((job) => (-job.job.priority, job.state.total_duration))
let batch = sorted.take(5)
# Processing
for job in batch:
crawler.process_job(job.job.id, job.reason)
# Saving or sleeping if there's nothing to process
# Would be better to save state every minute, instead of for every batch.
if not batch.is_empty:
crawler.save()
else:
log.info "all processed, waiting"
sleep(5.minutes.seconds * 1000)
import { timer_sec } from 'base/base.ts'
import { Time, TimeInterval } from 'base/time.ts'
import { Log } from 'base/log.ts'
import { PersistentVariable } from 'base/persistent_variable.ts'
import * as fs from "base/fs.ts"
export type ShouldProcess = { process: boolean, reason: string }
export type LastProcessed = { timestamp: Time, version: number }
export interface Job {
id: string
last: E<LastProcessed>
priority?: number // -1000..1000, default 0
should_process(): ShouldProcess
process(): Promise<LastProcessed>
after?(): Promise<void>
}
const max_abs_job_priority = 1000
export interface HistoryItemError {
duration: number
crawler_version: number
timestamp: Time
is_error: true
message: string
}
export interface HistoryItemSuccess {
duration: number
crawler_version: number
timestamp: Time
is_error: false
}
export type HistoryItem = HistoryItemError | HistoryItemSuccess
export interface JobState {
history: HistoryItem[] // reversed, [last, previous, ...]
retry_at?: Time
total_duration: number
}
type JobStates = Hash<JobState>
function job_states_post_json(states: JobStates): JobStates {
return states.apply((state) => {
state.history.apply((item) => {
item.timestamp = new Time(item.timestamp.to_s())
})
if (state.retry_at) state.retry_at = new Time(state.retry_at.to_s())
})
}
function recent_errors(state: JobState): HistoryItemError[] {
return state.history.filter_map((item) => item.is_error ? item : undefined)
}
type JobErrors = Hash<string>
export class Crawler {
protected jobs: Hash<Job>
protected job_states: JobStates
protected job_states_storage: PersistentVariable<JobStates>
protected focus: HSet<string>
protected log: Log
// protected readonly state_path: string
protected constructor(
public readonly id: string,
public readonly version: number,
jobs: Job[],
protected readonly data_dir: string,
focus: string[],
protected readonly retry_timeout = 5..minutes(),
protected readonly max_retry_timeout = 4..hours()
) {
this.jobs = jobs.to_hash(({ id }) => id)
this.jobs.each(({ priority }) =>
assert((priority || 0).abs() <= 1000, `job priority should be in -1000..1000 range`)
)
this.job_states = new Hash()
this.focus = focus.to_set()
this.log = new Log(id)
this.job_states_storage = new PersistentVariable<JobStates>(
Hash, `${this.data_dir}/${this.id}-crawler.json`, () => new Hash(), job_states_post_json
)
}
async load(): Promise<void> {
// Loading job states
const ids = this.jobs.keys()
assert.equal(ids.size(), this.jobs.size(), 'there are jobs with same ids')
this.job_states = (await this.job_states_storage.read())
.filter((_, id) => ids.has(id)) // Removing states for old jobs
.apply((state) => { delete state.retry_at }) // Cleaning retry after restart
ids.each((id) => { // Adding states for new jobs
if (!this.job_states.has(id)) this.job_states.set(id, { history: [], total_duration: 0 })
})
}
protected get_errors(): JobErrors {
return this.job_states.filter_map((state) => {
const errors = recent_errors(state)
return errors.is_empty() ? undefined : errors[0].message
})
}
async save(): Promise<void> {
await this.job_states_storage.write(this.job_states)
fs.write_json(`${this.data_dir}/${this.id}-crawler-errors.json`, this.get_errors())
this.log.debug('state saved')
}
protected async process_job(id: string, reason: string): Promise<void> {
let job = this.jobs.get(id), state = this.job_states.get(id)
const tic = timer_sec(), history_size = 5
try {
this.log.with({ id, reason }).info("processing '{id}', {reason}")
job.last = (await job.process()).to_success()
// Processing after
if (job.after) await job.after()
const duration = tic()
this.log.with({ id, duration }).info("processed '{id}' in {duration} sec")
// Updating state
prepend_capped_m(state.history, {
duration,
crawler_version: this.version,
timestamp: Time.now(),
is_error: false
}, history_size)
delete state.retry_at
} catch (e) {
const duration = tic(), message = ensure_error(e).message
prepend_capped_m(state.history, {
duration,
crawler_version: this.version,
timestamp: Time.now(),
is_error: true,
message
}, history_size)
const retry_count = recent_errors(state).size()
const delay_sec = [
this.retry_timeout.seconds() * 2..pow(retry_count - 1),
this.max_retry_timeout.seconds()
].min()
state.retry_at = Time.now().plus(delay_sec.seconds())
this.job_states.set(id, state)
const l = this.log.with({ id, duration, retry_count, message })
if (retry_count > 1) {
l.warn("can't process '{id}' after {duration} sec, {retry_count} time, '{message}'")
} else {
l.info("can't process '{id}' after {duration} sec, {retry_count} time, will be retried, '{message}'")
}
}
state.total_duration = state.history.map(({ duration }) => duration).sum()
}
async run(): Promise<void> {
this.log.with({ version: this.version }).info("started v{version}")
while(true) {
// Building queue to process
const now = Time.now()
const queue: { job: Job, state: JobState, reason: string }[] = []
if (!this.focus.is_empty()) {
this.jobs
.filter(({ id }) => this.focus.has(id))
.each((job) => queue.add({ job, state: this.job_states.get(job.id), reason: 'focus' }))
} else {
this.jobs.each((job) => {
const state = this.job_states.get(job.id)
if (!state.retry_at || state.retry_at.compare(now) < 0) {
// Checking `should_process` even if `retry_at < now`, because it could be already processed,
// but state hasn't been saved because crawler crashed.
const { process, reason } = job.should_process()
if (process) queue.add({ job, state, reason })
}
})
}
this.log.with({ counts: queue.size() }).info("queue {counts} jobs")
// Sorting and batching
const sorted = queue.asc(({ job, state }) => -1000 * (job.priority || 0) + state.total_duration)
const batch = sorted.take(5)
// Processing
for (const { job, reason } of batch) {
await this.process_job(job.id, reason)
}
// Saving or sleeping if there's nothing to process
// Would be better to save state every minute, instead of for every batch.
if (!batch.is_empty()) await this.save()
else {
this.log.info('all processed, waiting')
await sleep(5..minutes().seconds() * 1000)
}
}
}
}
function prepend_capped_m<T>(list: T[], v: T, max: number): void {
list.unshift(v)
if (list.size() > max) list.pop()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment