Created
March 18, 2022 04:54
-
-
Save NickDarvey/4cc5bbddf4aadc29b643305c2a556914 to your computer and use it in GitHub Desktop.
Deploy FSharp.AWS.DynamoDB record schema to DynamoDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#r "nuget: FSharp.AWS.DynamoDB, 0.9.3-beta" | |
#r "nuget: AWSSDK.DynamoDBv2" | |
#load "../../server/src/Features/Conversations/State.fs" | |
open FSharp.AWS.DynamoDB | |
open Amazon.DynamoDBv2 | |
open Amazon.Runtime | |
open Features.Conversations.State | |
/// Active pattern that matches on flattened inner exceptions in an AggregateException | |
let private (|AggregateExn|_|) (e : exn) = | |
match e with | |
| :? System.AggregateException as ae -> | |
ae.Flatten().InnerExceptions | |
|> List.ofSeq | |
|> Some | |
| _ -> None | |
type private Schema' = { | |
TableName : string | |
Indexes : KeySchema' list | |
IndexProjections : Map<string, string> | |
} | |
and private KeySchema' = { | |
HashKey : KeyAttributeSchema' | |
RangeKey : KeyAttributeSchema' option | |
Type : KeySchemaType | |
} | |
and private KeyAttributeSchema' = { | |
AttributeName : string | |
AttributeType : string | |
} | |
module private KeyAttributeSchema' = | |
let fromKeyAttributeSchema (schema : KeyAttributeSchema) : KeyAttributeSchema' = { | |
AttributeName = schema.AttributeName | |
AttributeType = schema.KeyType.Value | |
} | |
let toAttributeDefinition (schema : KeyAttributeSchema') = | |
Model.AttributeDefinition ( | |
AttributeName = schema.AttributeName, | |
AttributeType = schema.AttributeType | |
) | |
module private KeySchema' = | |
let toKeySchemaElements (schema : KeySchema') = [ | |
Model.KeySchemaElement ( | |
AttributeName = schema.HashKey.AttributeName, | |
KeyType = KeyType.HASH | |
) | |
if schema.RangeKey.IsSome then | |
Model.KeySchemaElement ( | |
AttributeName = schema.RangeKey.Value.AttributeName, | |
KeyType = KeyType.RANGE | |
) | |
] | |
let fromRequestModel schemaType (keySchemaElements : Model.KeySchemaElement seq) (attributeDefinitions : Model.AttributeDefinition seq) : KeySchema' = | |
let findByType keyType : KeyAttributeSchema' option = | |
keySchemaElements | |
|> Seq.tryFind (fun k -> k.KeyType = keyType) | |
|> Option.bind (fun key -> attributeDefinitions |> Seq.tryFind (fun a -> a.AttributeName = key.AttributeName)) | |
|> Option.map (fun attr -> { | |
AttributeName = attr.AttributeName | |
AttributeType = attr.AttributeType.Value | |
}) | |
{ | |
HashKey = findByType KeyType.HASH |> Option.get | |
RangeKey = findByType KeyType.RANGE | |
Type = schemaType | |
} | |
let fromTableKeySchema (schema : TableKeySchema) : KeySchema' = { | |
HashKey = schema.HashKey |> KeyAttributeSchema'.fromKeyAttributeSchema | |
RangeKey = schema.RangeKey |> Option.map KeyAttributeSchema'.fromKeyAttributeSchema | |
Type = schema.Type | |
} | |
module private Schema' = | |
let toCreateRequest provisionedThroughput schema = | |
let pk = | |
schema.Indexes | |
|> List.filter (fun i -> i.Type = KeySchemaType.PrimaryKey) | |
|> List.exactlyOne | |
let gsis = | |
schema.Indexes | |
|> List.choose (fun i -> | |
match i.Type with | |
| KeySchemaType.GlobalSecondaryIndex n -> Some (n, i) | |
| _ -> None) | |
let pk' = pk |> KeySchema'.toKeySchemaElements | |
let gsis' = | |
gsis |> List.map (fun (name, s) -> | |
Model.GlobalSecondaryIndex ( | |
IndexName = name, | |
KeySchema = (s |> KeySchema'.toKeySchemaElements |> ResizeArray), | |
ProvisionedThroughput = provisionedThroughput, | |
Projection = Model.Projection ( | |
ProjectionType = (schema.IndexProjections | |
|> Map.find name | |
|> ProjectionType.FindValue) | |
) | |
)) | |
let attributeDefinitions = | |
pk :: (List.map snd gsis) | |
|> List.collect (fun s -> List.choose id [ Some s.HashKey; s.RangeKey ]) | |
|> List.distinct | |
|> List.map KeyAttributeSchema'.toAttributeDefinition | |
Model.CreateTableRequest( | |
TableName = schema.TableName, | |
KeySchema = ResizeArray pk', | |
GlobalSecondaryIndexes = ResizeArray gsis', | |
AttributeDefinitions = ResizeArray attributeDefinitions, | |
ProvisionedThroughput = provisionedThroughput | |
) | |
let fromDescribeResponse (response : Model.DescribeTableResponse) = { | |
TableName = response.Table.TableName | |
Indexes = [ | |
yield KeySchema'.fromRequestModel KeySchemaType.PrimaryKey response.Table.KeySchema response.Table.AttributeDefinitions | |
yield! response.Table.GlobalSecondaryIndexes | |
|> Seq.map (fun i -> KeySchema'.fromRequestModel (KeySchemaType.GlobalSecondaryIndex i.IndexName) i.KeySchema response.Table.AttributeDefinitions) | |
|> Seq.toList | |
] | |
IndexProjections = | |
response.Table.GlobalSecondaryIndexes | |
|> Seq.map (fun i -> i.IndexName, i.Projection.ProjectionType.Value) | |
|> Map | |
} | |
let fromTemplate tableName projections (template : RecordTemplate<'schema>) = { | |
TableName = tableName | |
Indexes = [ | |
yield KeySchema'.fromTableKeySchema template.PrimaryKey | |
yield! template.GlobalSecondaryIndices | |
|> Seq.map KeySchema'.fromTableKeySchema | |
|> Seq.toList | |
] | |
IndexProjections = projections | |
} | |
let serviceUrl = | |
System.Environment.GetEnvironmentVariable "SERVICE_URL" | |
if serviceUrl = null then invalidOp $"\ | |
Expected an environment variable set named 'SERVICE_URL' to configure DynamoDB service URL, but it be not." | |
let creds = BasicAWSCredentials ( | |
accessKey = "not-used", | |
secretKey = "not-used" | |
) | |
let config = AmazonDynamoDBConfig ( | |
ServiceURL = serviceUrl | |
) | |
let private schemas = Set [ | |
Schema'.fromTemplate Message.TableName Map.empty <| RecordTemplate.Define<Message> () | |
Schema'.fromTemplate Info.TableName (Map [ Info.ReceiverIndex.IndexName, Info.ReceiverIndex.Projection ]) <| RecordTemplate.Define<Info> () | |
] | |
let work = async { | |
use client = new AmazonDynamoDBClient (creds, config) | |
let! listTables = Async.AwaitTask <| client.ListTablesAsync () | |
let! describeTables = | |
listTables.TableNames | |
|> Seq.map Model.DescribeTableRequest | |
|> Seq.map (fun req -> client.DescribeTableAsync(req, Async.DefaultCancellationToken)) | |
|> Seq.map Async.AwaitTask | |
|> Async.Sequential | |
let existingSchemas = | |
describeTables | |
|> Seq.map Schema'.fromDescribeResponse | |
|> Set | |
let newOrChangedSchemas = Set.difference schemas existingSchemas | |
let! results = | |
newOrChangedSchemas | |
|> Seq.map (Schema'.toCreateRequest (Model.ProvisionedThroughput (ReadCapacityUnits = 1, WriteCapacityUnits = 1))) | |
|> Seq.map (fun req -> async { | |
try | |
let! res = Async.AwaitTask <| client.CreateTableAsync (req, Async.DefaultCancellationToken) | |
return req.TableName, Ok res.HttpStatusCode | |
with | |
| AggregateExn [ :? Model.ResourceInUseException as ex ] -> | |
return req.TableName, Error "Schema has changed. The table must be dropped and recreated." | |
}) | |
|> Async.Sequential | |
return List.ofArray results | |
} | |
printfn $"Discovered {schemas.Count} table schemas: %A{schemas |> Seq.map (fun s -> s.TableName) |> Seq.toList}" | |
printfn $"Migrating... " | |
// It seems like, despite trying this and various other means of cancellation, it will hang if it doesn't have a connection. | |
// If you're reading this, ensure 'config' defines a valid connection to DynamoDB. | |
let outcome = Async.RunSynchronously (work, 10_000) | |
printfn $"Migration outcome: %A{outcome}" | |
printfn $"Migration ended." |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment