To "restart the world", we'll run a batch job then a streaming job.
- read all profiles from the system of record and create a RDD[(String, FCProfile)] containing all profiles
- read all interactions from the system of record, compute Map[handle: String, Map[ruleset_id: String, count: Int]] for each window
- read all interactions from the system of record, "batch them" by timestamp, enumerate them
for each profile:
Add profile to state: Map[handle, Profile]
for each enumerated batch of interactions (i, RDD[Interaction]):
batch_{-1} = read in batch that fell out of window (i - "num batches in window")
mapped_batch_{-1}, mapped_batch_t : RDD[handle, (view_id, count)] = match rules with interactions
state: Map[String, Map[String, Int]] = (handle, (view_id, count))
state --= batch_{-1}
state ++= batch_t
for each window of interactions:
profiles_in_window : RDD[(String, ((String, Int), Profile))] = state join profiles
=> (handle, ((view_id, count), profile))
profiles_by_view_id : RDD[(String, List[Profile])] = profiles_in_window => (view_id, profile) group
=> (view_id, list_of_profiles)
scored_profiles : RDD[(String, List[(Profile, Int)]] = profiles_in_window map scoring_func | sort
=> (view_id, sorted_list_of_profiles)
write to dynamodb saver queue