Skip to content

Instantly share code, notes, and snippets.

@WilliamBerryiii
Last active November 29, 2016 00:29
Show Gist options
  • Save WilliamBerryiii/ccd6231d6ac1952e6b072d39fda23c19 to your computer and use it in GitHub Desktop.
Save WilliamBerryiii/ccd6231d6ac1952e6b072d39fda23c19 to your computer and use it in GitHub Desktop.
FS Advent - Azure IoT
SELECT
location.Latitude,
location.Longitude,
windSpeed,
deviceId,
obsTime
INTO
[powerbi-out]
FROM
[decompshred-in]
AzureIoTHubConfig:
IoTHubUri: {hub name}.azure-devices.net
IotHubD2CEndpoint: messages/events
ConnectionString: HostName={hub name}.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey={my shared access key}
DeviceConfigs:
- Nickname: My Test Device
DeviceId: DeviceId
Key: {my device key}
Status: Enabled
//http://madskristensen.net/post/Compress-and-decompress-strings-in-C
let compress (data : string) =
let buffer = Encoding.UTF8.GetBytes(data)
let ms = new MemoryStream()
( use zip = new GZipStream(ms, CompressionMode.Compress, true)
zip.Write(buffer, 0, buffer.Length) )
ms.Position <- 0L
let compressed = Array.zeroCreate<byte> (int(ms.Length))
ms.Read(compressed, 0, compressed.Length) |> ignore
let gzipBuffer = Array.zeroCreate<byte> (int(compressed.Length) + 4)
Buffer.BlockCopy(compressed, 0, gzipBuffer, 4, compressed.Length)
Buffer.BlockCopy(BitConverter.GetBytes(buffer.Length), 0, gzipBuffer, 0, 4)
Convert.ToBase64String gzipBuffer
let decompress (data : string) =
let gzipBuffer = Convert.FromBase64String(data)
( use memoryStream = new MemoryStream()
let dataLength = BitConverter.ToInt32(gzipBuffer, 0)
memoryStream.Write(gzipBuffer, 4, gzipBuffer.Length - 4)
let buffer = Array.zeroCreate<byte> (int(dataLength))
memoryStream.Position <- 0L
( use zip = new GZipStream(memoryStream, CompressionMode.Decompress)
zip.Read(buffer, 0, buffer.Length) |> ignore)
Encoding.UTF8.GetString(buffer)
)
let dataSendTask (data : string) =
async {
let compressedData = compress data
let message = new Message(Encoding.UTF8.GetBytes(compressedData))
deviceClient.SendEventAsync(message)
|> Async.AwaitTask
|> ignore
printfn "%O > Sending message %s" (DateTime.Now.ToString()) (decompress compressedData)
}
let dataStreamTasks =
Seq.initInfinite ( fun x ->
String.concat "|" (msftSites |> Array.mapi (fun idx site -> windSpeedMessage site idx)
)
)
let dataSendTask (data : string) =
async {
let message = new Message(Encoding.UTF8.GetBytes(data))
deviceClient.SendEventAsync(message)
|> Async.AwaitTask
|> ignore
printfn "%O > Sending message %s" (DateTime.Now.ToString()) data
}
dataStreamTasks
|> Seq.iter (fun x ->
dataSendTask x |> Async.RunSynchronously
Async.Sleep 5000 |> Async.RunSynchronously)
type telemetryDataPoint = {
location : GeoCoordinate
deviceId : string
windSpeed : float
obsTime : DateTime
}
open System
open System.Linq
open System.Text
open System.IO
open System.Device.Location
open System.Threading
open FSharp.Configuration
open Microsoft.Azure.Devices.Client
open Newtonsoft.Json
type Config = YamlConfig<FilePath="../config/config.yaml", ReadOnly=true>
module Main =
[<EntryPoint>]
let main argv =
let config = Config()
config.Load("../../../config/config.yaml")
let iotHubUri = config.AzureIoTHubConfig.IoTHubUri
let deviceKey = config.DeviceConfigs.First().Key
let deviceId = config.DeviceConfigs.First().DeviceId
let deviceClient = DeviceClient.Create(iotHubUri, new DeviceAuthenticationWithRegistrySymmetricKey(deviceId, deviceKey))
Console.ReadLine() |> ignore
0 // return an integer exit code
// Copyright (c) Microsoft. All rights reserved. Licensed under the MIT license. See full license at the bottom of this file.
// Learn more about F# at http://fsharp.org
namespace Azure.IoTHub.Examples.FSharp.SimulatedDevice
open System
open System.Linq
open System.Text
open System.IO
open System.Device.Location
open System.Threading
open FSharp.Configuration
open Microsoft.Azure.Devices.Client
open Newtonsoft.Json
type Config = YamlConfig<FilePath="../config/config.yaml", ReadOnly=true>
type telemetryDataPoint = {
location : GeoCoordinate
deviceId : string
windSpeed : float
obsTime : DateTime
}
module Main =
[<EntryPoint>]
let main argv =
let config = Config()
config.Load("../../../config/config.yaml")
let iotHubUri = config.AzureIoTHubConfig.IoTHubUri
let deviceKey = config.DeviceConfigs.First().Key
let deviceId = config.DeviceConfigs.First().DeviceId
let deviceClient = DeviceClient.Create(iotHubUri, new DeviceAuthenticationWithRegistrySymmetricKey(deviceId, deviceKey))
let avgWindSpeed = 10.
let rand = new Random()
let windSpeedMessage location index =
let telemetryReading = {
deviceId = sprintf "%s%i" deviceId index
windSpeed = (avgWindSpeed + rand.NextDouble() * 4. - 2.)
location = location
obsTime = DateTime.UtcNow
}
let json = JsonConvert.SerializeObject(telemetryReading)
json
let getRandomGeoCoordinate seed (lat : float) (long : float) (radius : float) : GeoCoordinate =
// check out http://gis.stackexchange.com/a/68275 for where this calc originated.
let rand = new Random(seed)
let u = rand.NextDouble()
let v = rand.NextDouble()
let w = radius / 111000. * Math.Sqrt(u)
let t = 2. * Math.PI * v
let x = (w * Math.Cos(t)) / Math.Cos(lat)
let y = w * Math.Sin(t)
GeoCoordinate(y+lat, x+long)
let dataSendTask (data : string) =
async {
let message = new Message(Encoding.UTF8.GetBytes(data))
deviceClient.SendEventAsync(message)
|> Async.AwaitTask
|> ignore
printfn "%O > Sending message %s" (DateTime.Now.ToString()) (decompress compressedData)
}
let msftSites = Array.init 10 (fun index -> getRandomGeoCoordinate index 47.643417, -122.126083 (20000.))
// Start Cloud to Device Reader
dataReceiveTask deviceClient |> Async.Start
let batchDataStreamTasks =
Seq.initInfinite ( fun x ->
String.concat "|" (msftSites |> Array.mapi (fun idx site -> windSpeedMessage site idx)
)
)
batchDataStreamTasks
|> Seq.iter (fun x ->
dataSendTask x
|> Async.RunSynchronously
Async.Sleep 10000 |> Async.RunSynchronously)
Console.ReadLine() |> ignore
0 // return an integer exit code
let getRandomGeoCoordinate seed (lat : float) (long : float) (radius : float) : GeoCoordinate =
// check out http://gis.stackexchange.com/a/68275 for where this calc originated.
let rand = new Random(seed)
let u = rand.NextDouble()
let v = rand.NextDouble()
let w = radius / 111000. * Math.Sqrt(u)
let t = 2. * Math.PI * v
let x = (w * Math.Cos(t)) / Math.Cos(lat)
let y = w * Math.Sin(t)
GeoCoordinate(y+lat, x+long)
let msftSites = Array.init 10 (fun index -> getRandomGeoCoordinate index 47.643417, -122.126083 (20000.))
let baseWindSpeed = 10.
let rand = new Random()
let windSpeedMessage location index =
let telemetryReading = {
deviceId = sprintf "%s%i" deviceId index
windSpeed = (baseWindSpeed + rand.NextDouble() * 4. - 2.)
location = location
}
let json = JsonConvert.SerializeObject(telemetryReading)
json
let eventHubName = "postshred"
let connectionString = "Endpoint=sb://iot-fsharp-demo.servicebus.windows.net/;SharedAccessKeyName=default;SharedAccessKey=YFFV/x/6lc9KYi30AgRb3wbv+ZBDf/k/vh6KtmwjSXk="
let decompress (data : string) =
let gzipBuffer = Convert.FromBase64String(data)
( use memoryStream = new MemoryStream()
let dataLength = BitConverter.ToInt32(gzipBuffer, 0)
memoryStream.Write(gzipBuffer, 4, gzipBuffer.Length - 4)
let buffer = Array.zeroCreate<byte> (int(dataLength))
memoryStream.Position <- 0L
( use zip = new GZipStream(memoryStream, CompressionMode.Decompress)
zip.Read(buffer, 0, buffer.Length) |> ignore)
Encoding.UTF8.GetString(buffer)
)
#r "Microsoft.ServiceBus.dll"
#r "System.Runtime.Serialization.dll"
open System
open System.Text
open System.IO
open System.Diagnostics
open System.IO.Compression
open Microsoft.ServiceBus.Messaging
open System.Runtime.Serialization
{
"name": "iot-fsharp-demo",
"version": "0.0.1",
"description": "Example decompression & shred Azure Function",
"copyright": "Microsoft",
"title": "Decompression & Shred Azure Function",
"authors": ["Microsoft"],
"language": "F#",
"frameworks": {
"net46":{
"dependencies": {
"WindowsAzure.ServiceBus": "3.4.0"
}
}
}
}
let Run (input: string, log: TraceWriter) =
let groupedData = decompress input
let eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName)
#r "Microsoft.ServiceBus.dll"
#r "System.Runtime.Serialization.dll"
open System
open System.Text
open System.IO
open System.Diagnostics
open System.IO.Compression
open Microsoft.ServiceBus.Messaging
open System.Runtime.Serialization
let eventHubName = "postshred"
let connectionString = "Endpoint=sb://iot-fsharp-demo.servicebus.windows.net/;SharedAccessKeyName=default;SharedAccessKey=YFFV/x/6lc9KYi30AgRb3wbv+ZBDf/k/vh6KtmwjSXk="
let decompress (data : string) =
let gzipBuffer = Convert.FromBase64String(data)
( use memoryStream = new MemoryStream()
let dataLength = BitConverter.ToInt32(gzipBuffer, 0)
memoryStream.Write(gzipBuffer, 4, gzipBuffer.Length - 4)
let buffer = Array.zeroCreate<byte> (int(dataLength))
memoryStream.Position <- 0L
( use zip = new GZipStream(memoryStream, CompressionMode.Decompress)
zip.Read(buffer, 0, buffer.Length) |> ignore)
Encoding.UTF8.GetString(buffer)
)
let Run (input: string, log: TraceWriter) =
let groupedData = decompress input
let eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName)
groupedData.Split([|"|"|], StringSplitOptions.RemoveEmptyEntries)
|> Array.iter (fun data ->
eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(data)))
log.Info(sprintf "F# Queue trigger function processed: '%s'" data)
)
{"location":{"Latitude":40.572785337127179,"Longitude":73.94234667988286,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId0","windSpeed":10.904973079871839,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.77039546021787,"Longitude":73.936369994380513,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId1","windSpeed":8.9946743366283712,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.802419980175664,"Longitude":74.137364620009123,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId2","windSpeed":11.084375593384902,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.620429899604716,"Longitude":74.0377327342062,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId3","windSpeed":9.174076850141434,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.70360108888142,"Longitude":73.8420657430119,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId4","windSpeed":11.263778106897966,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.815168724564174,"Longitude":74.028574918004367,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId5","windSpeed":9.3534793636544968,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.634263716975241,"Longitude":74.154688016340259,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId6","windSpeed":11.443180620411029,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.632095640981504,"Longitude":73.928272253624172,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId7","windSpeed":9.5328818771675614,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.8602118423841,"Longitude":73.917591851158747,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId8","windSpeed":11.622583133924092,"obsTime":"2016-11-28T20:14:07.4050906Z"}|{"location":{"Latitude":40.743483308358982,"Longitude":74.1206610716174,"Altitude":"NaN","HorizontalAccuracy":"NaN","VerticalAccuracy":"NaN","Speed":"NaN","Course":"NaN","IsUnknown":false},"deviceId":"DeviceId9","windSpeed":9.7122843906806242,"obsTime":"2016-11-28T20:14:07.4050906Z"}
let registryManager = RegistryManager.CreateFromConnectionString(connectionString)
let addDevice deviceId = registryManager.AddDeviceAsync(new Device(deviceId))
let printDeviceKey (device: Device) = printfn "Generated device key: %A" device.Authentication.SymmetricKey.PrimaryKey
let writeDeviceKey (device : Device) =
config.DeviceConfigs
|> Seq.iter (fun dc ->
if String.Compare(dc.DeviceId, device.Id) = 0 then
dc.Key <- device.Authentication.SymmetricKey.PrimaryKey
)
config.Save(configFilePath)
device
addDevice deviceId
|> Async.AwaitTask
|> Async.RunSynchronously
|> writeDeviceKey
|> printDeviceKey
let getDevice deviceId = registryManager.GetDeviceAsync(deviceId)
try
addDevice deviceId
|> Async.AwaitTask
|> Async.RunSynchronously
|> writeDeviceKey
|> printDeviceKey
with
| :? System.AggregateException as e ->
e.InnerExceptions
|> Seq.iter (fun ex ->
if ex :? DeviceAlreadyExistsException then
getDevice deviceId
|> Async.AwaitTask
|> Async.RunSynchronously
|> writeDeviceKey
|> printDeviceKey
)
open System
open System.Linq
open FSharp.Configuration
type Config = YamlConfig<FilePath="../config/config.yaml">
module Main =
[<EntryPoint>]
let main argv =
let config = Config()
let configFilePath = "../../../config/config.yaml"
config.Load(configFilePath)
printfn "%s" config.AzureIoTHubConfig.ConnectionString
let console = Console.ReadLine()
0 // return an integer exit code
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment