Skip to content

Instantly share code, notes, and snippets.

@figpope
Last active August 29, 2015 14:23
Show Gist options
  • Save figpope/d8f9b8e301300adda7e3 to your computer and use it in GitHub Desktop.
Save figpope/d8f9b8e301300adda7e3 to your computer and use it in GitHub Desktop.
"The Join"

To "restart the world", we'll run a batch job then a streaming job.

Batch

  • read all profiles from the system of record and create a RDD[(String, FCProfile)] containing all profiles
  • read all interactions from the system of record, compute Map[handle: String, Map[ruleset_id: String, count: Int]] for each window
  • read all interactions from the system of record, "batch them" by timestamp, enumerate them

Streaming

FullContact Profiles

for each profile:
	Add profile to state: Map[handle, Profile]

Interactions

for each enumerated batch of interactions (i, RDD[Interaction]):
	batch_{-1} = read in batch that fell out of window (i - "num batches in window")

	mapped_batch_{-1}, mapped_batch_t : RDD[handle, (view_id, count)] = match rules with interactions
	
	state: Map[String, Map[String, Int]] = (handle, (view_id, count))

	state --= batch_{-1}
	state ++= batch_t

Scoring

for each window of interactions:
	profiles_in_window : RDD[(String, ((String, Int), Profile))] = state join profiles 
		=> (handle, ((view_id, count), profile))
	profiles_by_view_id : RDD[(String, List[Profile])] = profiles_in_window => (view_id, profile) group
		=> (view_id, list_of_profiles)
	scored_profiles : RDD[(String, List[(Profile, Int)]] = profiles_in_window map scoring_func | sort
		=> (view_id, sorted_list_of_profiles)

	write to dynamodb saver queue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment