Last active
August 7, 2021 06:09
-
-
Save dasch/7e00caa3cc994c5149551eddeb9c6a29 to your computer and use it in GitHub Desktop.
Mock stream processing API in Elm
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Kafka exposing (..) | |
type alias KafkaRecord = | |
{ topic : String | |
, partition : Int | |
, offset : Int | |
, key : Maybe String | |
, value : Maybe String | |
} | |
readFrom : String -> Stream KafkaRecord | |
values : Stream KafkaRecord -> Stream String | |
values stream = | |
stream | |
|> Stream.map .value stream | |
|> Stream.justValues | |
keys : Stream KafkaRecord -> Stream String | |
keys stream = | |
stream | |
|> Stream.map .key stream | |
|> Stream.justValues |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module SearchClickThroughs exposing (main) | |
import Stream | |
import Kafka | |
type alias Search = | |
{ searchId : String | |
, query : String | |
} | |
type alias Click = | |
{ searchId : String | |
, url : String | |
} | |
toClickThrough : (Search, Click) -> ClickThrough | |
toClickThrough (search, click) = | |
{ searchId = search.searchId | |
, query = search.query | |
, resultId = click.resultId | |
} | |
searches : Stream (String, Search) | |
searches = | |
Kafka.readFrom "searches" | |
|> Kafka.values | |
|> Stream.label "decode-search" (Stream.map decodeSearch) | |
|> Stream.keyBy .searchId | |
clicks : Stream (String, Click) | |
clicks = | |
Kafka.readFrom "clicks" | |
|> Kafka.values | |
|> Stream.label "decode-click" (Stream.map decodeClick) | |
|> Stream.keyBy .searchId | |
main = | |
Stream.join searches clicks | |
|> Stream.window (Window.session { gap = 30 * minute }) | |
|> Stream.label "remove-key" (Stream.map .snd) | |
|> Stream.map toClickThrough | |
|> Stream.removeDuplicates | |
|> Stream.map encodeClickThrough | |
|> Stream.label "prepare-for-kafka" (Stream.map (\data -> (Nothing, Just data))) | |
|> Kafka.writeTo "click-throughs" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Stream exposing | |
type alias Element x = | |
{ value : x | |
, timestamp : Time | |
, window : Window | |
} | |
map : (x -> y) -> Stream x -> Stream y | |
filterMap : (x -> Maybe y) -> Stream x -> Stream y | |
filter : (x -> Bool) -> Stream x -> Stream x | |
concatMap : (x -> List y) -> Stream x -> Stream y | |
justValues : Stream (Maybe x) -> Stream x | |
keyBy : (x -> k) -> Stream x -> Stream (x, y) | |
groupByKey : Stream (k, x) -> Stream (k, List x) | |
join : Stream (k, x) -> Stream (k, y) -> Stream (k, (x, y)) | |
leftJoin : Stream (k, x) -> Stream (k, y) -> Stream (k, (x, Maybe y)) | |
reduceByKey : (s -> x -> s) -> Stream (k, List x) -> Stream (k, s) | |
branch : Stream (x, y) -> (Stream x, Stream y) | |
window : Window -> Stream x -> Stream x | |
assignTimestampsWith : (x -> Time) -> Stream x -> Stream x | |
removeDuplicates : Stream x -> Stream x | |
removeDuplicatesBy : (x -> y) -> Stream x -> Stream x | |
label : String -> (Stream x -> Stream y) -> Stream x -> Stream y | |
fromList : List x -> Stream x | |
countElements : Stream x -> Stream Int |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module UniqueDailyVisits exposing (main) | |
import Stream | |
import Kafka | |
import Duration | |
pageViews : Stream PageView | |
pageViews = | |
Kafka.readFrom "page-views" | |
|> Kafka.values | |
main = | |
pageViews | |
|> Stream.keyBy .sessionId | |
|> Stream.removeDuplicates | |
|> Stream.window (Window.fixed { size = Duration.day }) | |
|> Stream.countElements |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
module Window exposing (Window, global, session, fixed, sliding) | |
type Window | |
= GlobalWindow | |
| SessionWindow { gap : Duration } | |
| FixedWindow { size : Duration } | |
| SlidingWindow { size : Duration, period : Duration } | |
global : Window | |
session : { gap : Duration } -> Window | |
fixed : { size : Duration } -> Window | |
sliding : { size : Duration, period : Duration } -> Window |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment