Skip to content

Instantly share code, notes, and snippets.

@odytrice
Created June 19, 2024 14:11
Show Gist options
  • Save odytrice/08bd3474cf91b0bedf6bf8f35b655e8d to your computer and use it in GitHub Desktop.
Save odytrice/08bd3474cf91b0bedf6bf8f35b655e8d to your computer and use it in GitHub Desktop.
Bitcoin Indexer
module Nomad.Bitcoin.Indexer.BitcoinActors
open System
open System.Linq
open Akka.FSharp
open Microsoft.Extensions.DependencyInjection
open Microsoft.Extensions.Logging
open Nomad.Bitcoin.Core.Contracts
open Nomad.Bitcoin.Core.Domain
open Nomad.Bitcoin.Core.Domain.Errors
open Nomad.Bitcoin.Core.Domain.Types
open FsToolkit.ErrorHandling
open Nomad.Bitcoin.Infrastructure.Database
open Nomad.Bitcoin.Core.Domain.Helpers
type Msg =
| StartIndex
type Context =
abstract member Index: IWriteIndexRepository with get
abstract member Node: INodeRepository with get
abstract member Logger: ILogger<Context> with get
abstract member Database: BitcoinDataContext with get
module Context =
let create (sp: IServiceProvider) =
{
new Context with
member _.Index = sp.GetService<IWriteIndexRepository>()
member _.Node = sp.GetService<INodeRepository>()
member _.Database = sp.GetService<BitcoinDataContext>()
member _.Logger = sp.GetService<ILogger<Context>>()
}
let fetchOutput (indexRepo: IWriteIndexRepository) (input: TransactionInput) =
asyncResult {
match input with
| CoinBase _ -> return input, None
| TxInput txInput ->
let! output = indexRepo.GetOutputByIndex(txInput.TransactionId, txInput.OutputIndex)
return input, Some output
}
let indexTransaction (context: Context) (addressMap: Map<string, int64>) (blockHash: BlockHash, blockTime: DateTimeOffset) (transaction: TransactionDetails) =
asyncResult {
// Create All Input Indices
let inputIndices : InputIndex list = [
for input in transaction.Inputs do
match input with
| CoinBase coinBaseDetails ->
yield {
OutputId = None
InputId = 0L
TransactionId = transaction.Id
Sequence = coinBaseDetails.Sequence
IsCoinBase = true
OutputTransactionId = None
OutputTransactionIndex = None
}
| TxInput txInputDetails ->
yield {
OutputTransactionIndex = txInputDetails.OutputIndex |> Some
OutputTransactionId = txInputDetails.TransactionId |> Some
OutputId = None
InputId = 0L
TransactionId = transaction.Id
Sequence = txInputDetails.Sequence
IsCoinBase = false
}
]
// Create Output Indices
let outputIndices: OutputIndex list = [
for output in transaction.Outputs do
let addresses = output.ScriptPubKey |> Helpers.parseAddresses
yield {
OutputId = 0L
TransactionId = transaction.Hash
Index = output.Index
Value = decimal output.Value
ScriptPubKey = output.ScriptPubKey.Asm
Addresses = [
for address in addresses do
// Locate Address From Address Map, so we can get the ID of the Address
let idOption = addressMap |> Map.tryFind address
if idOption.IsSome then
yield {
Id = idOption.Value
Value = address
Balance = 0M<Bitcoins>
TransactionCount = 0
Date = blockTime
}
]
}
]
// Construct A Single Transaction
let indexTransaction: TransactionIndex = {
TransactionId = transaction.Id
BlockHash = blockHash
Inputs = inputIndices
Outputs = outputIndices
}
do! context.Index.AddTransaction indexTransaction
}
let indexTransactions (context: Context) (addressMap: Map<string, int64>) (blockDetails: BlockDetails) =
asyncResult {
let logger = context.Logger
logger.LogInformation("Saving Block Transactions")
// Get Already Indexed Transactions By Block
let! indexedTransactionIds =
blockDetails.Hash
|> context.Index.GetTransactionsByBlock
|> AsyncResult.map(List.map(fun t -> t.TransactionId))
// Filter Pending Transactions
let pendingTransactions =
blockDetails.Transactions
|> List.filter(fun t -> not <| indexedTransactionIds.Contains(t.Id))
for transaction in pendingTransactions do
let blockData = blockDetails.Hash, blockDetails.Time
do! indexTransaction context addressMap blockData transaction
}
/// Creates a map of addresses
let createAddressMap (index: IWriteIndexRepository) (blockTime: DateTimeOffset) (addresses: string list): Async<Result<Map<string, int64>,AppError>> =
asyncResult {
// Create a Set to remove Duplicates
let addressSet = Set addresses
// Fetch Existing Addresses from Index
let! existingAddresses =
addressSet
|> List.ofSeq
|> index.GetAddresses
// Create Address set using existing Addresses
let existingAddressSet =
existingAddresses
|> List.map _.Value
|> Set
// Extract new addresses and save them to the index
let newAddressSet = addressSet - existingAddressSet
let! newAddressRecords =
newAddressSet
|> Set.map (fun a -> {
AddressIndex.Id = 0L
AddressIndex.Value = a
AddressIndex.Date = blockTime
Balance = 0M<Bitcoins>
TransactionCount = 0
})
|> List.ofSeq
|> index.SaveAddresses
// Return existing and new addresses Combined with their Ids
return
newAddressRecords @ existingAddresses
|> List.map (fun address -> address.Value, address.Id)
|> Map
}
let indexBlock (sp: IServiceProvider) (height: int) =
asyncResult {
use scope = sp.CreateScope()
let context = Context.create scope.ServiceProvider
let logger = context.Logger
// Fetch Block Details
let! blockHash = context.Node.GetBlockHash height
logger.LogInformation("Fetching Block {Hash}", blockHash)
let! blockDetails = context.Node.GetBlockDetails blockHash
logger.LogInformation("Successfully Fetched Block")
// Extract All Addresses from the Block
let allAddresses =
blockDetails.Transactions
|> List.collect _.Outputs
|> List.map _.ScriptPubKey
|> List.collect Helpers.parseAddresses
// Create AddressMap or "AddressBook"
let! addressMap =
allAddresses
|> createAddressMap context.Index blockDetails.Time
logger.LogInformation("Saving Block")
let! existing = context.Index.GetBlock(blockDetails.Hash)
match existing with
| None -> do! context.Index.AddBlock(blockDetails)
| Some block -> logger.LogWarning("Block {Height} has already been indexed", block.Height)
do! indexTransactions context addressMap blockDetails
logger.LogInformation("Marking Block {Height} as Completed", blockDetails.Height)
do! context.Index.SetBlockStatus(blockDetails.Hash, IndexStatus.Complete);
}
let indexPendingBlocks (sp: IServiceProvider) (context: Context) =
asyncResult {
let! pendingBlocks = context.Index.GetPendingBlocks()
if pendingBlocks.Length > 0 then
context.Logger.LogInformation("Found {count} Pending Blocks", pendingBlocks.Length)
let! missingBlocks = context.Index.GetMissingBlocks()
if missingBlocks.Length > 0 then
context.Logger.LogInformation("Found {count} Missing Blocks", missingBlocks.Length)
return!
missingBlocks @ pendingBlocks
|> List.map(indexBlock sp)
|> List.sequenceAsyncResultM
}
let rootActor (sp: IServiceProvider) (mailbox:Actor<Msg>) =
let indexChain (context: Context) =
asyncResult {
let! chainInfo = context.Node.GetChainInfo()
let nodeBlockHeight = chainInfo.Blocks
let! indexBlockHeight = context.Index.GetBlockChainHeight()
let! pendingBlocks = indexPendingBlocks sp context
if nodeBlockHeight > indexBlockHeight then
context.Logger.LogInformation("Current Block Height {nodeBlockHeight}\n", nodeBlockHeight)
let batches = [ indexBlockHeight .. nodeBlockHeight ] |> List.batchesOf 10
for batch in batches do
let startIndex , endIndex = batch.Head, batch |> List.last
context.Logger.LogInformation("Processing Batch {Start} .. {End}", startIndex, endIndex)
// Run Block Index in Parallel
let! results =
batch
|> List.map(indexBlock sp)
|> Async.Parallel
// If any fails, Collect the Errors into an Aggregate Error
do! results
|> List.ofArray
|> List.sequenceResultA
|> Result.mapError AggregateError
|> Result.ignore
context.Logger.LogInformation("Consolidating Detached Inputs")
do! context.Index.ConsolidateInputs() |> AsyncResult.ignore
let percentageCompleted = float endIndex / float nodeBlockHeight * 100.0
context.Logger.LogInformation("Processing Complete for Batch. Overall Index is {percentageComplete:N4}% Complete\n\n", percentageCompleted)
return nodeBlockHeight - indexBlockHeight + pendingBlocks.Length
else
return pendingBlocks.Length
}
let rec loop () =
actor {
use scope = sp.CreateScope()
let! msg = mailbox.Receive()
let context = Context.create scope.ServiceProvider
match msg with
| StartIndex ->
let blocksIndexed =
indexChain context
|> Async.RunSynchronously
match blocksIndexed with
| Ok 0 -> context.Logger.LogInformation "No Blocks were processed, Index is up to date"
| Ok blocks -> context.Logger.LogInformation("{blocks} Blocks were processed", blocks)
| Error appError ->
let msg = describeError appError
context.Logger.LogError ("An Error Occurred indexing Blocks {msg}", msg)
return! loop()
}
loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment