Skip to content

Instantly share code, notes, and snippets.

@NickDarvey
Created March 18, 2022 04:54
Show Gist options
  • Save NickDarvey/4cc5bbddf4aadc29b643305c2a556914 to your computer and use it in GitHub Desktop.
Save NickDarvey/4cc5bbddf4aadc29b643305c2a556914 to your computer and use it in GitHub Desktop.
Deploy FSharp.AWS.DynamoDB record schema to DynamoDB
#r "nuget: FSharp.AWS.DynamoDB, 0.9.3-beta"
#r "nuget: AWSSDK.DynamoDBv2"
#load "../../server/src/Features/Conversations/State.fs"
open FSharp.AWS.DynamoDB
open Amazon.DynamoDBv2
open Amazon.Runtime
open Features.Conversations.State
/// Active pattern that matches on flattened inner exceptions in an AggregateException
let private (|AggregateExn|_|) (e : exn) =
match e with
| :? System.AggregateException as ae ->
ae.Flatten().InnerExceptions
|> List.ofSeq
|> Some
| _ -> None
type private Schema' = {
TableName : string
Indexes : KeySchema' list
IndexProjections : Map<string, string>
}
and private KeySchema' = {
HashKey : KeyAttributeSchema'
RangeKey : KeyAttributeSchema' option
Type : KeySchemaType
}
and private KeyAttributeSchema' = {
AttributeName : string
AttributeType : string
}
module private KeyAttributeSchema' =
let fromKeyAttributeSchema (schema : KeyAttributeSchema) : KeyAttributeSchema' = {
AttributeName = schema.AttributeName
AttributeType = schema.KeyType.Value
}
let toAttributeDefinition (schema : KeyAttributeSchema') =
Model.AttributeDefinition (
AttributeName = schema.AttributeName,
AttributeType = schema.AttributeType
)
module private KeySchema' =
let toKeySchemaElements (schema : KeySchema') = [
Model.KeySchemaElement (
AttributeName = schema.HashKey.AttributeName,
KeyType = KeyType.HASH
)
if schema.RangeKey.IsSome then
Model.KeySchemaElement (
AttributeName = schema.RangeKey.Value.AttributeName,
KeyType = KeyType.RANGE
)
]
let fromRequestModel schemaType (keySchemaElements : Model.KeySchemaElement seq) (attributeDefinitions : Model.AttributeDefinition seq) : KeySchema' =
let findByType keyType : KeyAttributeSchema' option =
keySchemaElements
|> Seq.tryFind (fun k -> k.KeyType = keyType)
|> Option.bind (fun key -> attributeDefinitions |> Seq.tryFind (fun a -> a.AttributeName = key.AttributeName))
|> Option.map (fun attr -> {
AttributeName = attr.AttributeName
AttributeType = attr.AttributeType.Value
})
{
HashKey = findByType KeyType.HASH |> Option.get
RangeKey = findByType KeyType.RANGE
Type = schemaType
}
let fromTableKeySchema (schema : TableKeySchema) : KeySchema' = {
HashKey = schema.HashKey |> KeyAttributeSchema'.fromKeyAttributeSchema
RangeKey = schema.RangeKey |> Option.map KeyAttributeSchema'.fromKeyAttributeSchema
Type = schema.Type
}
module private Schema' =
let toCreateRequest provisionedThroughput schema =
let pk =
schema.Indexes
|> List.filter (fun i -> i.Type = KeySchemaType.PrimaryKey)
|> List.exactlyOne
let gsis =
schema.Indexes
|> List.choose (fun i ->
match i.Type with
| KeySchemaType.GlobalSecondaryIndex n -> Some (n, i)
| _ -> None)
let pk' = pk |> KeySchema'.toKeySchemaElements
let gsis' =
gsis |> List.map (fun (name, s) ->
Model.GlobalSecondaryIndex (
IndexName = name,
KeySchema = (s |> KeySchema'.toKeySchemaElements |> ResizeArray),
ProvisionedThroughput = provisionedThroughput,
Projection = Model.Projection (
ProjectionType = (schema.IndexProjections
|> Map.find name
|> ProjectionType.FindValue)
)
))
let attributeDefinitions =
pk :: (List.map snd gsis)
|> List.collect (fun s -> List.choose id [ Some s.HashKey; s.RangeKey ])
|> List.distinct
|> List.map KeyAttributeSchema'.toAttributeDefinition
Model.CreateTableRequest(
TableName = schema.TableName,
KeySchema = ResizeArray pk',
GlobalSecondaryIndexes = ResizeArray gsis',
AttributeDefinitions = ResizeArray attributeDefinitions,
ProvisionedThroughput = provisionedThroughput
)
let fromDescribeResponse (response : Model.DescribeTableResponse) = {
TableName = response.Table.TableName
Indexes = [
yield KeySchema'.fromRequestModel KeySchemaType.PrimaryKey response.Table.KeySchema response.Table.AttributeDefinitions
yield! response.Table.GlobalSecondaryIndexes
|> Seq.map (fun i -> KeySchema'.fromRequestModel (KeySchemaType.GlobalSecondaryIndex i.IndexName) i.KeySchema response.Table.AttributeDefinitions)
|> Seq.toList
]
IndexProjections =
response.Table.GlobalSecondaryIndexes
|> Seq.map (fun i -> i.IndexName, i.Projection.ProjectionType.Value)
|> Map
}
let fromTemplate tableName projections (template : RecordTemplate<'schema>) = {
TableName = tableName
Indexes = [
yield KeySchema'.fromTableKeySchema template.PrimaryKey
yield! template.GlobalSecondaryIndices
|> Seq.map KeySchema'.fromTableKeySchema
|> Seq.toList
]
IndexProjections = projections
}
let serviceUrl =
System.Environment.GetEnvironmentVariable "SERVICE_URL"
if serviceUrl = null then invalidOp $"\
Expected an environment variable set named 'SERVICE_URL' to configure DynamoDB service URL, but it be not."
let creds = BasicAWSCredentials (
accessKey = "not-used",
secretKey = "not-used"
)
let config = AmazonDynamoDBConfig (
ServiceURL = serviceUrl
)
let private schemas = Set [
Schema'.fromTemplate Message.TableName Map.empty <| RecordTemplate.Define<Message> ()
Schema'.fromTemplate Info.TableName (Map [ Info.ReceiverIndex.IndexName, Info.ReceiverIndex.Projection ]) <| RecordTemplate.Define<Info> ()
]
let work = async {
use client = new AmazonDynamoDBClient (creds, config)
let! listTables = Async.AwaitTask <| client.ListTablesAsync ()
let! describeTables =
listTables.TableNames
|> Seq.map Model.DescribeTableRequest
|> Seq.map (fun req -> client.DescribeTableAsync(req, Async.DefaultCancellationToken))
|> Seq.map Async.AwaitTask
|> Async.Sequential
let existingSchemas =
describeTables
|> Seq.map Schema'.fromDescribeResponse
|> Set
let newOrChangedSchemas = Set.difference schemas existingSchemas
let! results =
newOrChangedSchemas
|> Seq.map (Schema'.toCreateRequest (Model.ProvisionedThroughput (ReadCapacityUnits = 1, WriteCapacityUnits = 1)))
|> Seq.map (fun req -> async {
try
let! res = Async.AwaitTask <| client.CreateTableAsync (req, Async.DefaultCancellationToken)
return req.TableName, Ok res.HttpStatusCode
with
| AggregateExn [ :? Model.ResourceInUseException as ex ] ->
return req.TableName, Error "Schema has changed. The table must be dropped and recreated."
})
|> Async.Sequential
return List.ofArray results
}
printfn $"Discovered {schemas.Count} table schemas: %A{schemas |> Seq.map (fun s -> s.TableName) |> Seq.toList}"
printfn $"Migrating... "
// It seems like, despite trying this and various other means of cancellation, it will hang if it doesn't have a connection.
// If you're reading this, ensure 'config' defines a valid connection to DynamoDB.
let outcome = Async.RunSynchronously (work, 10_000)
printfn $"Migration outcome: %A{outcome}"
printfn $"Migration ended."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment