Skip to content

Instantly share code, notes, and snippets.

@ruslander
Created October 18, 2016 20:06
Show Gist options
  • Save ruslander/0414202dd230690cd438b9463ef5bd7b to your computer and use it in GitHub Desktop.
Save ruslander/0414202dd230690cd438b9463ef5bd7b to your computer and use it in GitHub Desktop.
StreamStone
using System;
using System.Configuration;
using System.Linq;
using Microsoft.WindowsAzure.Storage;
using Newtonsoft.Json;
using Streamstone;
namespace InServiceCqrs
{
class Program
{
static void Main(string[] args)
{
var client = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]).CreateCloudTableClient();
var table = client.GetTableReference("Streams");
table.CreateIfNotExists();
var partition = new Partition(table, "default");
var stream = OpenOrCreate(partition);
AppendToStream(stream);
ReadAllFromStream(partition);
Console.ReadKey();
}
private static void ReadAllFromStream(Partition partition)
{
Console.WriteLine("Reading all events in a stream");
Console.WriteLine("If slice size is > than WATS limit, continuation token will be managed automatically");
StreamSlice<EventEnvelope> slice;
var nextSliceStart = 1;
do
{
slice = Stream.Read<EventEnvelope>(partition, nextSliceStart, sliceSize: 1);
foreach (var @event in slice.Events)
Console.WriteLine("{0}:{1} {2}-{3}", @event.Id, @event.Version, @event.Type, @event.Data);
nextSliceStart = slice.HasEvents
? slice.Events.Last().Version + 1
: -1;
} while (!slice.IsEndOfStream);
}
private static void AppendToStream(Stream stream)
{
Console.WriteLine("Writing to new stream in partition '{0}'", stream.Partition);
var id = Guid.NewGuid().ToString();
var result = Stream.Write(stream,
ToEventData(new InventoryItemCreated(id, "iPhone6")),
ToEventData(new InventoryItemCheckedIn(id, 100))
);
Console.WriteLine("Succesfully written to new stream.\r\nEtag: {0}, Version: {1}", result.Stream.ETag,
result.Stream.Version);
}
private static Stream OpenOrCreate(Partition partition)
{
Stream stream;
try
{
stream = Stream.Provision(partition);
Console.WriteLine("Provisioned new empty stream in partition '{0}'", stream.Partition);
}
catch (Exception)
{
stream = Stream.Open(partition);
Console.WriteLine("Openned stream in partition '{0}'", stream.Partition);
}
Console.WriteLine("Etag: {0}", stream.ETag);
Console.WriteLine("Version: {0}", stream.Version);
return stream;
}
static EventData ToEventData(object e)
{
var id = Guid.NewGuid();
return new EventData(EventId.From(id), EventProperties.From(new EventEnvelope
{
Id = id,
Type = e.GetType().Name,
Data = JsonConvert.SerializeObject(e)
}));
}
}
class EventEnvelope
{
public Guid Id { get; set; }
public string Type { get; set; }
public string Data { get; set; }
public int Version { get; set; }
}
public class InventoryItemCreated
{
public readonly string Id;
public readonly string Name;
public InventoryItemCreated(string id, string name)
{
Id = id;
Name = name;
}
}
public class InventoryItemCheckedIn
{
public readonly string Id;
public readonly int Count;
public InventoryItemCheckedIn(string id, int count)
{
Id = id;
Count = count;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment