Skip to content

Instantly share code, notes, and snippets.

@russcam russcam/elastic-gnaf.fsx
Last active Sep 6, 2019

Embed
What would you like to do?
Building a realtime Australian address search with the G-NAF dataset
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// Ensure the paths to these DLLS are set correctly
#r "Elasticsearch.Net.dll"
#r "Nest.dll"
#r "System.Diagnostics.DiagnosticSource.dll"
#r "System.Buffers.dll"
open Nest
open System
open System.Data.SqlClient
open System.IO
open System.Text.RegularExpressions
open System.Globalization
open System.Threading
// Variables to change
let unzippedGNAFDir = @"D:\AUG19_GNAF_PipeSeparatedValue\G-NAF"// Will require approx 2GB of disk space
let sqlConnectionString = @"Server=SQLEXPRESS;Database=gnaf;Integrated Security=true;" // Will require approx 12GB of disk space
let elasticsearch = new Uri("http://localhost:9200") // Will require approx 8GB of disk space
let elasticsearchIndex = "address"
// Log with timestamp
let logFormat = Printf.TextWriterFormat<int->int->int->string->unit>("[%02d:%02d:%02d] %s")
let logf format =
let time = DateTime.Now
Printf.ksprintf (fun s -> printfn logFormat time.Hour time.Minute time.Second s) format
let log message =
let time = DateTime.Now
printfn logFormat time.Hour time.Minute time.Second message
// Address types for Elasticsearch
type AddressComponent =
{ BuildingName: string
Number: string
Street: string
Locality: string
State: string
Postcode: string }
let toDisplay (address:AddressComponent) =
seq {
yield address.Number
yield address.Street
yield address.Locality
yield address.State
yield address.Postcode
yield address.BuildingName
}
|> String.concat " "
type Address =
{ Id: string
Display: string
Location: GeoLocation
Component: AddressComponent }
// SQL operations
let initSql(sqlConnection:SqlConnection) =
let getCommand file =
let fixLine (line:string) = line.Replace("CREATE OR REPLACE VIEW ADDRESS_VIEW", "CREATE VIEW ADDRESS_VIEW")
let fixLines file = File.ReadAllLines(file) |> Array.map fixLine
String.Join(Environment.NewLine, fixLines file)
let tableScriptsDir = Path.Combine(unzippedGNAFDir, @"Extras\GNAF_TableCreation_Scripts")
let createTables = Path.Combine(tableScriptsDir, "create_tables_sqlserver.sql")
let constraints = Path.Combine(tableScriptsDir, "add_fk_constraints.sql")
let createView = Path.Combine(unzippedGNAFDir, @"Extras\GNAF_View_Scripts\address_view.sql")
log "Initialising SQL database"
for setupFile in [| createTables; constraints; createView |] do
let commandText = getCommand setupFile
let command = new SqlCommand(commandText, sqlConnection)
if command.ExecuteNonQuery() <> -1 then failwith (sprintf "Received failure return value for %s" setupFile)
sqlConnection
let indexSql(sqlConnection:SqlConnection) =
let indexFiles(dir, regex) =
let bulkSqlInsert command table =
let command = new SqlCommand(command, sqlConnection)
command.CommandTimeout <- 600
let returnValue = command.ExecuteNonQuery()
if returnValue = 0 then failwith (sprintf "No records inserted into %s" table)
else logf "Inserted %i records into %s" returnValue table
for file in Directory.EnumerateFiles(dir) do
let fileInfo = FileInfo file
let rMatch = Regex.Match(fileInfo.Name, regex)
let table = rMatch.Groups.["table"].Value
let bulkInsert = sprintf "BULK INSERT %s FROM '%s' WITH (FIELDTERMINATOR = '|', FIRSTROW = 2)" table fileInfo.FullName
bulkSqlInsert bulkInsert table
log "Indexing Authority Code data"
let dataAuthorityDir = Path.Combine(unzippedGNAFDir, @"G-NAF AUGUST 2019\Authority Code")
indexFiles (dataAuthorityDir, "^Authority_Code_(?<table>.*?)_psv.psv$")
log "Indexing State data"
let dataStandardDir = Path.Combine(unzippedGNAFDir, @"G-NAF AUGUST 2019\Standard")
indexFiles (dataStandardDir, "^[^_]*_(?<table>.*?)_psv.psv$")
sqlConnection
// Elasticsearch operations
let createIndex (elasticClient:ElasticClient) =
let properties = new Properties<Address>()
properties.Add(PropertyName.op_Implicit "display", new SearchAsYouTypeProperty())
properties.Add(PropertyName.op_Implicit "location", new GeoPointProperty())
let mapping = new TypeMapping()
mapping.Properties <- properties
let settings = new IndexSettings()
settings.NumberOfReplicas <- Nullable 0
settings.NumberOfShards <- Nullable 1
let createIndexRequest = new CreateIndexRequest(IndexName.op_Implicit elasticsearchIndex)
createIndexRequest.Settings <- settings
createIndexRequest.Mappings <- mapping
logf "Creating index %s" elasticsearchIndex
let indexResponse = elasticClient.Indices.Create(createIndexRequest)
logf "Created index %O" indexResponse
let bulkIndex (elasticClient:ElasticClient) (sqlConnection:SqlConnection) =
let timeout = TimeSpan.FromMinutes(60.0)
let currentPage = ref 0
let perBulkRequest = Nullable 10000
let backoffTime = "30s"
let backoffRetries = Nullable 2
let parallelism = Nullable 4
let columnValue column (reader:SqlDataReader) =
let ordinal = reader.GetOrdinal(column)
if reader.IsDBNull(ordinal) then String.Empty
else reader.[ordinal].ToString()
let columnDecimalValue column (reader:SqlDataReader) =
reader.GetOrdinal(column) |> reader.GetDecimal
let address (reader:SqlDataReader) =
let addressNumber (reader:SqlDataReader) =
let addressPart item (reader:SqlDataReader) =
seq {
yield columnValue (item + "_PREFIX") reader
yield columnValue item reader
yield columnValue (item + "_SUFFIX") reader
}
|> Seq.filter (fun elem -> String.length elem > 0)
|> String.concat " "
let addressPartWrapped item prefix suffix (reader:SqlDataReader) =
let joined = addressPart item reader
if String.length joined > 0 then prefix + joined + suffix
else joined
let lotNumber = addressPartWrapped "LOT_NUMBER" "Lot " "" reader
let flatNumber = addressPartWrapped "FLAT_NUMBER" "" "/" reader
let numberFirst = addressPart "NUMBER_FIRST" reader
let numberLast = addressPartWrapped "NUMBER_LAST" "-" "" reader
[| lotNumber; flatNumber; numberFirst; numberLast |]
|> Seq.filter (fun elem -> String.length elem > 0)
|> String.concat ""
let culture = new CultureInfo("en-AU", false)
let toTitleCase (column:string) =
let (|Prefix|_|) prefix (candidate:string) =
if candidate.StartsWith(prefix) then Some(candidate.Substring(prefix.Length))
else None
let titleCase = culture.TextInfo.ToTitleCase(column.ToLowerInvariant())
match titleCase with
| Prefix "Mc" trailing -> "Mc" + culture.TextInfo.ToTitleCase(trailing)
| _ -> titleCase
let number = addressNumber reader
let street = columnValue "STREET_NAME" reader |> toTitleCase
let streetNameSuffix = columnValue "STREET_SUFFIX_TYPE" reader |> toTitleCase
let streetName =
let streetTypeCode = columnValue "STREET_TYPE_CODE" reader
if String.length streetNameSuffix > 0 then sprintf "%s %s" streetTypeCode streetNameSuffix else streetTypeCode
let locality = columnValue "LOCALITY_NAME" reader |> toTitleCase
let state = columnValue "STATE_ABBREVIATION" reader
let postcode = columnValue "POSTCODE" reader
let buildingName = columnValue "BUILDING_NAME" reader |> toTitleCase
{ BuildingName = buildingName
Number = number
Street = sprintf "%s %s" street streetName
Locality = locality
State = state
Postcode = postcode }
let readCommand = new SqlCommand("SELECT * FROM ADDRESS_VIEW", sqlConnection)
readCommand.CommandTimeout <- 600
let reader = readCommand.ExecuteReader()
let results =
seq {
while reader.Read() do
let id = columnValue "ADDRESS_DETAIL_PID" reader
let lat = columnDecimalValue "LATITUDE" reader
let lon = columnDecimalValue "LONGITUDE" reader
let addressParts = address reader
yield {
Id = id
Display = addressParts |> toDisplay
Location = GeoLocation.TryCreate(double lat, double lon)
Component = addressParts
}
}
log "Bulk indexing into Elasticsearch"
elasticClient.BulkAll(results, fun (b:BulkAllDescriptor<Address>) ->
b.Index(IndexName.op_Implicit elasticsearchIndex)
.BackOffTime(Time.op_Implicit (backoffTime))
.BackOffRetries(backoffRetries)
.RefreshOnCompleted()
.MaxDegreeOfParallelism(parallelism)
.Size(perBulkRequest) :> IBulkAllRequest<Address>).Wait(timeout, fun next ->
let page = Interlocked.Increment(currentPage)
logf "%i addresses indexed" (page * perBulkRequest.Value)
) |> ignore
log "Bulk indexing complete"
let displayResults (searchResponse:ISearchResponse<Address>) =
for hit in searchResponse.Hits do
Console.WriteLine hit.Source.Display
Console.WriteLine("Took: {0}ms", searchResponse.Took)
let searchAsYouTypeDemo (elasticClient:ElasticClient) =
let search text =
let query = new MultiMatchQuery()
query.Query <- text
query.Type <- Nullable.op_Implicit TextQueryType.BoolPrefix
query.Fields <- Fields.op_Implicit "display,display._2gram,display._3gram"
let request = new SearchRequest<Address>()
request.Query <- new QueryContainer(query)
let searchAsYouTypeResponse = elasticClient.Search<Address>(request)
displayResults searchAsYouTypeResponse
let readLine () =
Console.Write "\nEnter search (or type 'quit'): "
Console.ReadLine()
let readlines = Seq.initInfinite (fun _ -> readLine())
let run item = if item = "quit"
then Some(item)
else
search item
None
Seq.pick run readlines |> ignore
elasticClient
let geoSearchDemo (elasticClient:ElasticClient) =
let query = new GeoDistanceQuery()
query.Field <- Field.op_Implicit "location"
query.Distance <- Distance.op_Implicit "200m"
query.Location <- new GeoLocation(-25.2379091, 130.9855969)
let request = new SearchRequest<Address>()
request.Query <- new QueryContainer(query)
let geoSearchResponse = elasticClient.Search<Address>(request)
displayResults geoSearchResponse
// Execute script
let connectionSettings = new ConnectionSettings(elasticsearch)
connectionSettings.DefaultIndex(elasticsearchIndex)
let elasticClient = new ElasticClient(connectionSettings)
elasticClient
|> createIndex
let sqlConnection = new SqlConnection(sqlConnectionString)
sqlConnection.Open()
sqlConnection
|> initSql
|> indexSql
|> bulkIndex elasticClient
sqlConnection.Close()
elasticClient
|> searchAsYouTypeDemo
|> geoSearchDemo
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.