Skip to content

Instantly share code, notes, and snippets.

@aarondandy
Last active October 23, 2019 07:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aarondandy/ac70eace9c978098a5955eacd5f95e45 to your computer and use it in GitHub Desktop.
Save aarondandy/ac70eace9c978098a5955eacd5f95e45 to your computer and use it in GitHub Desktop.
// reference SqlStreamStore.Postgres, Newtonsoft.Json
// use C# 7.3+
using System;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using SqlStreamStore;
using SqlStreamStore.Streams;
namespace Mayonnaise
{
class Program
{
static async Task Main(string[] args)
{
// SQLStreamStore sits on top of an RDBMS so we need a connection string
// Just your usual connection string should do the trick here
// I put my secret password of "dev" right in the connection string and you too can live on the edge like me
var yourAverageEverydayDatabaseConnectionString
= "Host=localhost;Database=ssssandbox;Username=postgres;Password=dev"; // 🐍
// In this example I choose to use postgres because I'm an interesting person.
// We need to deal with the actual type for a little bit for setting things up.
var implementationSpecificStreamStore = new PostgresStreamStore(
new PostgresStreamStoreSettings(yourAverageEverydayDatabaseConnectionString));
// This could be the first time this has ever been run, so we need to set the DB schema up.
// If this was the first time, it will create the tables and other needed bits.
await implementationSpecificStreamStore.CreateSchema();
// I keep the variable typed as an interface however because most _real_ usages will likely be injected this way.
IStreamStore streamStore = implementationSpecificStreamStore;
// Our domain is crafting duck mayonnaise in a mayonnaise machine, as seen in Stardew Valley:
// See: https://stardewvalleywiki.com/Duck_Mayonnaise
// Lets drop an egg into the machine and see what comes out!
// We are going to have the machine be our aggregate and have the stream ID match, because I'm lazy.
var machineId = Guid.NewGuid();
var streamId = new StreamId(machineId.ToString("N"));
// Lets build our first mesage to indicate putting a duck egg into the mayonnaise machine
var insertItemMessage = new NewStreamMessage(Guid.NewGuid(), "item-inserted", JsonConvert.SerializeObject(new
{
itemName = "Duck Egg"
}));
// Because we just made the stream ID up, this will be the first message on the stream
var currentVersion = ExpectedVersion.NoStream;
// This will add the new message about our egg to the machine's stream
var appendResult = await streamStore.AppendToStream(streamId, currentVersion, insertItemMessage);
// Appending a message increments the stream's version
currentVersion = appendResult.CurrentVersion;
// It takes three hours for a duck egg to convert into duck mayonnaise
// Lets pass some time, and then some, with events
for (var i = 0; i < 5; i++)
{
var workingMessage = new NewStreamMessage(Guid.NewGuid(), "passed-some-time", JsonConvert.SerializeObject(new
{
hours = 1
}));
appendResult = await streamStore.AppendToStream(streamId, currentVersion, workingMessage);
currentVersion = appendResult.CurrentVersion;
}
// When the operator of the machinery notices the product is ready, they remove it
// I don't have any idea what this message should look like, have an emoji
var extractItemMessage = new NewStreamMessage(Guid.NewGuid(), "extract-item", "\"πŸ™Œ\"");
appendResult = await streamStore.AppendToStream(streamId, currentVersion, extractItemMessage);
currentVersion = appendResult.CurrentVersion;
// πŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯š
//
// Now, lets play back what happened to recreate this culinary abomination.
//
// πŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯šπŸ¦†πŸ₯š
// We are going to build a projection from the events.
// First, we need some state, I'm going to use some local variables
string machineContents = null;
var machineItemHoursWorked = 0;
// Next, lets define some handlers, I'm using local methods for brevity
// when we encounter an item inserted event, reset the state of the machine for it
void insertItem(string itemName)
{
Console.WriteLine($"Inserted {itemName} into the machine.");
machineContents = itemName;
machineItemHoursWorked = 0;
}
// as time passes transformations may or may not take place
void passTime(int hours)
{
Console.WriteLine($"Waited for {hours} hours.");
machineItemHoursWorked += hours;
// This is from our business rules. It takes 3 hours for an egg to become mayonnaise.
if (machineContents == "Duck Egg" && machineItemHoursWorked >= 3)
{
Console.WriteLine($"Our item has transformed after waiting {machineItemHoursWorked} hours!");
machineContents = "Duck Mayonnaise";
machineItemHoursWorked -= 3;
}
}
// when the item is removed from the machine, we just reset the machine
void extractItem()
{
Console.WriteLine($"Item {machineContents} was removed from the machine.");
machineContents = null;
machineItemHoursWorked = 0;
}
// Then, for the last step handle all of the messages in sequence
await forEachMessage(streamId, async m =>
{
switch (m.Type)
{
case "item-inserted":
var itemName = JsonConvert.DeserializeObject<JObject>(await m.GetJsonData()).Value<string>("itemName");
insertItem(itemName);
break;
case "passed-some-time":
var hours = JsonConvert.DeserializeObject<JObject>(await m.GetJsonData()).Value<int>("hours");
passTime(hours);
break;
case "extract-item":
extractItem();
break;
default:
throw new NotSupportedException();
}
Console.WriteLine($"Machine status: {machineContents ?? "idle"} for {machineItemHoursWorked} hours.");
});
// We no longer need the connection or access to the stream
streamStore.Dispose();
Console.WriteLine("Press the Any Key to enjoy artisanal goods...");
Console.ReadKey();
// A terrible replacement for having IAsyncEnumerable 😭😭😭
async Task forEachMessage(string streamIdToRead, Func<StreamMessage, Task> processMessage)
{
int pageStartVersion = 0;
ReadStreamPage page;
do
{
page = await streamStore.ReadStreamForwards(streamIdToRead, pageStartVersion, maxCount: 2);
foreach (var message in page.Messages)
{
await processMessage(message);
}
pageStartVersion = page.NextStreamVersion;
}
while (!page.IsEnd);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment