Skip to content

Instantly share code, notes, and snippets.

@lenadroid
Last active January 12, 2018 20:06
Show Gist options
  • Save lenadroid/8d53e35a2742392f6f5a75449765476c to your computer and use it in GitHub Desktop.
Save lenadroid/8d53e35a2742392f6f5a75449765476c to your computer and use it in GitHub Desktop.
Example F# Job template for Kubernetes
FROM fsharp
COPY . .
RUN mono ./.paket/paket.bootstrapper.exe
RUN mono ./.paket/paket.exe restore
RUN mono .paket/paket.exe install
#r "packages/CassandraCSharpDriver/lib/net40/Cassandra.dll"
#r "packages/Mono.Posix/lib/net40/Mono.Posix.dll"
#r "packages/FSharp.Data/lib/net45/FSharp.Data.dll"
#r "packages/FSharp.Collections.ParallelSeq/lib/net40/FSharp.Collections.ParallelSeq.dll"
open Cassandra
open System
open FSharp.Data
open FSharp.Collections.ParallelSeq
let startIndex = fsi.CommandLineArgs |> Seq.tail |> Seq.head |> int
let writesPerJob = fsi.CommandLineArgs |> Seq.tail |> Seq.item 1 |> int
let increment = fsi.CommandLineArgs |> Seq.tail |> Seq.item 2 |> int
printfn "Start index: %A" startIndex
printfn "Items to add: %A" writesPerJob
printfn "Increment: %A" increment
type Addresses = CsvProvider<"data/address-format.csv",
HasHeaders = true,
Schema = "LON (decimal), LAT (decimal), NUMBER (string), STREET (string),,,,, POSTCODE (int option),, HASH (string)">
type Address = {
Lon: decimal
Lat: decimal
Number: string
Street: string
Postcode: int option
Hash: string
}
let seattleAddressDataUrl = "https://drive.google.com/uc?export=download&id=0B2N10qxSwZx5c3Y4MUJONFc3RG8"
let getAddresses () = Addresses.Load(seattleAddressDataUrl)
let seattleAddresses = getAddresses ()
let connectToCassandraCluster (endpointAddress: string) =
let cluster =
Cassandra.Cluster.Builder()
.AddContactPoint(endpointAddress)
.Build()
let session = cluster.Connect("system")
session
let createHosuingKeyspace (session: ISession) =
session.Execute(@"
create keyspace if not exists housingdata
with replication = {'class': 'SimpleStrategy', 'replication_factor': 3};"
) |> ignore
session.Execute(@"use housingdata;")
let createAddressesTable (session: ISession) =
// LON, LAT, NUMBER, STREET, POSTCODE, HASH
session.Execute(@"
CREATE TABLE addresses (
lon decimal,
lat decimal,
number text,
street text,
postcode int,
hash text,
PRIMARY KEY (postcode, street, number)) WITH CLUSTERING ORDER BY (street ASC)
")
let insertAddress (address:Address) (session: ISession) =
let query = session.Prepare("insert into addresses "
+ "(lon,lat,number,street,postcode,hash) values ("
+ "?, ?, ?, ?, ?, ?);")
let postcode =
match address.Postcode with
| Some p -> p
| None -> 0
query.Bind(address.Lon, address.Lat, address.Number, address.Street, postcode, address.Hash)
|> session.Execute
let displayAddressRow(row: Row) =
Console.WriteLine(
"LON: {0},\tLAT: {1}, \t House Number: {2},\tStreet: {3},\tPostcode: {4},\tHash: {5}",
row.GetValue<decimal>("lon"),
row.GetValue<decimal>("lat"),
row.GetValue<string>("number"),
row.GetValue<string>("street"),
row.GetValue<int>("postcode"),
row.GetValue<string>("hash"))
let getAllAddressRows (session: ISession) =
session.Execute("select * from addresses;")
let getAddressesBySomePostCode (session: ISession) =
session.Execute("select * from addresses where postcode=98118;")
let deleteHousingKeyspace (session: ISession) =
session.Execute(@"drop keyspace housingdata;") |> ignore
let persist address i session =
insertAddress address session
let persistAddresses session skip take =
seattleAddresses.Rows
|> Seq.skip skip
|> Seq.take take
|> Seq.fold (fun addressCount a ->
let goodAddress = {
Lon = a.LON
Lat = a.LAT
Number = a.NUMBER
Street = a.STREET
Postcode = a.POSTCODE
Hash = a.HASH
}
let result = persist goodAddress (addressCount+1) session
printfn "%A addresses persisted: skip %A, take %A ..." (addressCount+1+skip) skip take
addressCount+1
) 0
let persistAddressesParallel (session: ISession) start count increment =
let overallCount = seattleAddresses.Rows |> Seq.length
let howManyToAdd =
if overallCount - start < count
then overallCount - start
else count
let step = increment
let executions = howManyToAdd / step
let getSkipAndTake index =
match index with
| e when (e = executions) -> start + index * step, howManyToAdd % step
| _ -> start + index * step, step
let till = if howManyToAdd % step = 0 then executions - 1 else executions
[0..till]
|> PSeq.map getSkipAndTake
|> PSeq.map (fun (skip,take) ->
let resultCount = persistAddresses session skip take
printfn "Persisted portion %A" (skip/step)
resultCount
)
|> PSeq.sum
//// Test it in F# interactive first!
//// Connect to the cluster via external IP address:
// let session = connectToCassandraCluster "Load Balancer External IP Address"
//// Create a keyspace:
// createHosuingKeyspace session
//// Create a table:
// createAddressesTable session
//// Write some data:
// persistAddressesParallel session 0 100 10
// let allAddresses = getAllAddressRows session
// allAddresses |> Seq.length
// For internal to Kubernetes execution, using the hostname "cassandra":
let session = connectToCassandraCluster "cassandra"
// Connecting to the keyspace (table exists):
createHosuingKeyspace session
// Persisting "writesPerJob" number of rows from the dataset that start at "startIndex", with "increment" rows added concurrently
persistAddressesParallel session startIndex writesPerJob increment
source https://nuget.org/api/v2
nuget Mono.Posix
nuget CassandraCSharpDriver
nuget FSharp.Data
nuget FSharp.Collections.ParallelSeq
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment