Skip to content

Instantly share code, notes, and snippets.

@dasch
Last active August 7, 2021 06:09
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dasch/7e00caa3cc994c5149551eddeb9c6a29 to your computer and use it in GitHub Desktop.
Save dasch/7e00caa3cc994c5149551eddeb9c6a29 to your computer and use it in GitHub Desktop.
Mock stream processing API in Elm
module Kafka exposing (..)
type alias KafkaRecord =
{ topic : String
, partition : Int
, offset : Int
, key : Maybe String
, value : Maybe String
}
readFrom : String -> Stream KafkaRecord
values : Stream KafkaRecord -> Stream String
values stream =
stream
|> Stream.map .value stream
|> Stream.justValues
keys : Stream KafkaRecord -> Stream String
keys stream =
stream
|> Stream.map .key stream
|> Stream.justValues
module SearchClickThroughs exposing (main)
import Stream
import Kafka
type alias Search =
{ searchId : String
, query : String
}
type alias Click =
{ searchId : String
, url : String
}
toClickThrough : (Search, Click) -> ClickThrough
toClickThrough (search, click) =
{ searchId = search.searchId
, query = search.query
, resultId = click.resultId
}
searches : Stream (String, Search)
searches =
Kafka.readFrom "searches"
|> Kafka.values
|> Stream.label "decode-search" (Stream.map decodeSearch)
|> Stream.keyBy .searchId
clicks : Stream (String, Click)
clicks =
Kafka.readFrom "clicks"
|> Kafka.values
|> Stream.label "decode-click" (Stream.map decodeClick)
|> Stream.keyBy .searchId
main =
Stream.join searches clicks
|> Stream.window (Window.session { gap = 30 * minute })
|> Stream.label "remove-key" (Stream.map .snd)
|> Stream.map toClickThrough
|> Stream.removeDuplicates
|> Stream.map encodeClickThrough
|> Stream.label "prepare-for-kafka" (Stream.map (\data -> (Nothing, Just data)))
|> Kafka.writeTo "click-throughs"
module Stream exposing
type alias Element x =
{ value : x
, timestamp : Time
, window : Window
}
map : (x -> y) -> Stream x -> Stream y
filterMap : (x -> Maybe y) -> Stream x -> Stream y
filter : (x -> Bool) -> Stream x -> Stream x
concatMap : (x -> List y) -> Stream x -> Stream y
justValues : Stream (Maybe x) -> Stream x
keyBy : (x -> k) -> Stream x -> Stream (x, y)
groupByKey : Stream (k, x) -> Stream (k, List x)
join : Stream (k, x) -> Stream (k, y) -> Stream (k, (x, y))
leftJoin : Stream (k, x) -> Stream (k, y) -> Stream (k, (x, Maybe y))
reduceByKey : (s -> x -> s) -> Stream (k, List x) -> Stream (k, s)
branch : Stream (x, y) -> (Stream x, Stream y)
window : Window -> Stream x -> Stream x
assignTimestampsWith : (x -> Time) -> Stream x -> Stream x
removeDuplicates : Stream x -> Stream x
removeDuplicatesBy : (x -> y) -> Stream x -> Stream x
label : String -> (Stream x -> Stream y) -> Stream x -> Stream y
fromList : List x -> Stream x
countElements : Stream x -> Stream Int
module UniqueDailyVisits exposing (main)
import Stream
import Kafka
import Duration
pageViews : Stream PageView
pageViews =
Kafka.readFrom "page-views"
|> Kafka.values
main =
pageViews
|> Stream.keyBy .sessionId
|> Stream.removeDuplicates
|> Stream.window (Window.fixed { size = Duration.day })
|> Stream.countElements
module Window exposing (Window, global, session, fixed, sliding)
type Window
= GlobalWindow
| SessionWindow { gap : Duration }
| FixedWindow { size : Duration }
| SlidingWindow { size : Duration, period : Duration }
global : Window
session : { gap : Duration } -> Window
fixed : { size : Duration } -> Window
sliding : { size : Duration, period : Duration } -> Window
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment