Skip to content

Instantly share code, notes, and snippets.

@perokvist perokvist/fsharp_modeling_4.md Secret
Last active Aug 29, 2015

Embed
What would you like to do?
Blog posts

Exploring F# through modeling #4

Introduction

In three earlier posts we have explored modeling a Rock-paper-scissor game in F#. We have modeled the domain and infrastructure. In the last post we looked at refactoring and C# interop. In this post we'll go back to basics when we explore using C# infrastructure in our solution. We're going to look at record types, classes and interface implementation.

Scenario

We want to use our Domain with Commans and Events together with some C# infrastructure. The infrastruct might have interfaces that our F# solution need to implement to use the infrastrcutre. We could do some adapter clue, but in this post we're going to implement the interfaces in F#.

Command interface

The C# interface that the infrastructure uses looks like this;

public interface ICommand
{
	Guid AggregateId { get; }
	Guid CorrelationId { get; }
}

So let's take a look at one of our commmands - the CreateGameCommand.

type CreateGameCommand = {
          playerName: string
          firstMove: Move
          name:string
          id:string } 

We have used interfaces before in our solution - the Event marker interface.

type Event = 
    interface
    end

type MoveMadeEvent=
    {
    playerName:string
    move:Move
    } 
    interface Event

To implement the C# interface we use the same syntax, specifying the the interface then each memerber of the interface as follow;

type CreateGameCommand=
    {playerName: string
     firstMove: Move
     name:string
     id:System.Guid} interface Commanding.ICommand with
                        member x.AggregateId = x.id
                        member x.CorrelationId = Guid.NewGuid()

Note that all interface implementations in F# are Explicit

Here we changed the id to a Guid to align with the interface, pointing out the id property. The CorreclationId don't have a corresponding property yet, as we create a new guid. The CorrelationId would return a new Guid each time - not ok. The correlationId is just an identifier of each instance, nothing we need to set our self when creating a command. So what are our options regarding the CorrelationId ? If we want to stick to a record type we need to add a property to match CorrelationId.

  type CreateGameCommand =
    {playerName: string
     firstMove: Move
     name:string
     id:System.Guid
     correlationId:System.Guid} 
     interface Commanding.ICommand with
                        member x.AggregateId = x.id
                        member x.CorrelationId = x.correlationId

If we would like to skip passing a correlcationId, we could create an factory, or we need to create a class instead. A simple factory;

let createGameCommand playerName firstMove name gameId = 
{ playerName = playerName; firstMove = firstMove; name = name; id = gameId; correlationId = Guid.NewGuid() }

In this case a record type fits better (Type inference, Immubility and pattern matching - Record types vs Classes).

Just to have a look - lets go for the class approach;

type CreateGameCommand(playerName:string, firstMove:Move, name:string, gameId:Guid, correlationId:Guid) =
     member x.playerName = playerName
     member x.firstMove = firstMove
     member x.name = name
     member x.id = gameId
     member x.correlationId = correlationId

Adding the interface to the class;

type CreateGameCommand(playerName:string, firstMove:Move, name:string, gameId:Guid, correlationId:Guid) =
    interface Commanding.ICommand with 
        member x.AggregateId = x.id
        member x.CorrelationId = x.correlationId
    member x.playerName = playerName
    member x.firstMove = firstMove
    member x.name = name
    member x.id = gameId
    member x.correlationId = correlationId

Event interface

The C# interface that the infrastructure uses for Event looks like this;

 public interface IEvent
 {
    Guid CorrelationId { get; set; }
    Guid SourceId { get; }
 }

The SourceId would be the aggregate related to the change. And CorrelationId what command resulted in this event. This introduces a new challange. The infrastructure needs to set something in our immutable world. Our events are record types - immutable. But to comply with this interface we could declare one of it's properties as mutable;

type MoveMadeEvent=
    {
    playerName:string
    move:Move
    id:Guid
    mutable correlationId:Guid
    } 
    interface Events.IEvent with
        member x.SourceId = x.id
        member x.CorrelationId with get() = x.correlationId
        member x.CorrelationId with set(v) = x.correlationId <- v

We could skip the get();

member x.CorrelationId = x.correlationId
member x.CorrelationId with set(v) = x.correlationId <- v

###Conclusion

Hope the infrastructure is worth it. Our record types is now bulkier. There might have been other way to use the infrastrcutrue, wrapping it's contracts. But we got a good back to basics look on record types, classes and interface implementation in F#. Due to that the infrastructure where in C#, we haven't looked in creating interfaces in F#. Below is some resources on the topic we touched upon. Hope to get back to modeling in furute posts.

Enjoy!

Resources

Creating an API for a Rock-paper-scissors Game - using ASP.NET Web Api

Introduction

In earlier posts we often used the domain for a simple rock-papers-scissors game. In this post we're going to re-use the domain and create an HTTP API. The RPS domain was created using CQRS and event sourcing. Will that affect the API ? As we'll find out - no, it doesn't have to, but the async nature could be benficial to use in the API.

Scenario

The simple RPS game only has two user interactions/commands. CreateGame (including the player one’s move) and MakeMove (player two’s move).

We're going to list all available games, the ones a player could participate in, and all games played (ended).

Eventual consistency

The RPS game is eventual consistent. When a user creates a game the available games list is updated from the events caused by the CreateGame command. An immediate refresh of available games after we created a game might not contain our game.

When receiving a command we need to tell the API client that we received the command and will process it. We could also inform the API client where the game could be retrieved when the command is processed.

Mapping Commands to HTTP Verbs

It's easy to start off designing your API with an RPC style approach like /api/Games/{id}/move. Instead we're going let the HTTP verb be the action (one step towards a more RESTful API). The HTTP verbs differ in the matter of idempotency (an idempotent method can be called many times without different outcomes). POST are not idempotent but GET, PUT, PATCH and DELETE are. In our resources what is the mapping between commands and verb/action?

What about when a resource has several commands mapping to the same verb? One possible solution is with "5 levels of media type" - specifying the command in the content-type:

  Content-Type:application/json;domain-model=CreateGameCommand 

See resources at the bottom of the post for more information on "5 levels of media type".

Creating a Game

Creating a game maps well to the HTTP verb POST, each invocation creates a new game. We're going to start with creating a game on api/games - POST. To inform the API client that we have received the command for creating a game we use the HTTP Status Code Accepted - 202.

 [HttpPost]
 public async Task<HttpResponseMessage> Create(input.....)
 {
    ....
     await _commandBus.SendAsync(command);
     return Request.CreateResponse(HttpStatusCode.Accepted)
 }

We create an action for creating a game. We'll cover creating a command from input in a bit. With the command we await it being placed on the queue (via the commandbus), then we tell the API client we accepted the command. We don't yet tell the API client where to find the game upon creation.

Let's add that information.

 [HttpPost]
 public async Task<HttpResponseMessage> Create(input.....)
 {
    var gameId = Guid.NewGuid();
     ....
    await _commandBus.SendAsync(command);
     return Request.CreateResponse(HttpStatusCode.Accepted)
       .Tap(message => message.Headers.Location = new Uri(Url.Link(RouteConfiguration.AvailableGamesRoute, new { id = gameId })));
    }

We control what ID the game instance should get. Knowing that and the URL for the endpoint for available games, we send a location header with the response. We avoid route strings by having a simple object RouteConfiguration with the routes.

Note: Tap is custom extension to chain action upon an object.

What about the command? The API client doesn't need to know about domain command objects. When creating the API we form a public representation of the underlying domain. Parameters/fields that might be form post data or a JSON body, this might not be the same fields of the command we're issuing on the server side. We could represent these fields in an object that we bind to - a public command, part of the public domain.

Here is a public object that we use to modelbind, it has validation attributes.

  public class CreateGameCommand
  {
    [Required]
    public string PlayerName { get; set; }
    [Required]
    public string GameName { get; set; }
    [Required]
    public string Move { get; set; }
  }

This is the domain command with some infrastructure properties.

public class CreateGameCommand : IGameCommand, ICommand
  {
    public CreateGameCommand(Guid gameId, string playerName, string gameName, Move firstMove);
    public Guid GameId { get; }
    public string PlayerName { get; }
    public string GameName { get; }
       public Move FirstMove { get; }
       public Guid AggregateId { get; }
    public Guid CorrelationId { get; }
  }

Using these commands we could make our end point for creating a game look like:

    [HttpPost]
    [Route("")]
    public async Task<HttpResponseMessage> Create(Game.ReadModel.CreateGameCommand input)
    {
        var gameId = Guid.NewGuid();

        if (!ModelState.IsValid)
            return Request.CreateErrorResponse(HttpStatusCode.BadRequest, ModelState);

        Move move;
        if (!Enum.TryParse(input.Move, true, out move))
            return Request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid move");

        var command = new RPS.Game.Domain.CreateGameCommand(gameId, input.PlayerName, input.GameName, move);
        await _commandBus.SendAsync(command); 

        return Request.CreateResponse(HttpStatusCode.Accepted)
            .Tap(message => message.Headers.Location = new Uri(Url.Link(RouteConfiguration.AvailableGamesRoute, new { id = gameId })));
    }

Here we use model valdidation, and parses the move string to our enum.

Making a move

As a player you could only make a move on an available game. The MakeMoveCommand maps well to the HTTP verb PUT. We're going to add an action for PUT on api/games/available/{id}. Using attribute routing with a controller prefix "api/games", the PUT action looks like this;

    [HttpPut]
    [Route("available/{id:Guid}")]
    public async Task<HttpResponseMessage> Move(Guid id, Game.ReadModel.MakeMoveCommand input)
    {
        if (!ModelState.IsValid)
            return Request.CreateErrorResponse(HttpStatusCode.BadRequest, ModelState);

        Move move;
        if (!Enum.TryParse(input.Move, true, out move))
            return Request.CreateErrorResponse(HttpStatusCode.BadRequest, "Invalid move");

        var command = new Game.Domain.MakeMoveCommand(id, move, input.PlayerName);

        await _commandBus.SendAsync(command);
        return Request.CreateResponse(HttpStatusCode.Accepted).Tap(
                r => r.Headers.Location = new Uri(Url.Link(RouteConfiguration.EndedGamesRoute, new { id })));
    }

Note: The API client know the game ID, used as a part of the URI.

When we have accepted the command we return 202 - accepted, with a location URL pointing to the EndedGamesRoute.

Available and Ended games

Querying the available and ended games matches HTTP GET. We're going to have four GET endpoints: two for collections of games and two for single games. The endpoints for single games is to simplify the location headers for Create and Move.

Let's take a look at one of them - the single available game.

    [Route("available/{id:Guid}", Name = RouteConfiguration.AvailableGamesRoute)]
    public IHttpActionResult GetAvailableGame(Guid id)
    {
        var game = _readService
            .AvailableGames
            .SingleOrDefault(x => x.GameId == id);

        if (game != null)
            return Ok(game);

        return NotFound();
    }

Here we simply queries for a game with a given ID, if found, we return 200 with the game otherwise we return 404 - not found.

Note that we name the route, to aid the location headers in Create and Move.

The HTTP API

PlayerOne - Create a game with first move

POST (api/games)

{ playerName ="player", gameName = "testGame", move = "paper"}

returns Accepted (202) with location header.

PlayerTwo - Make a move

PUT (api/games/available/{gameId})

{ "playerName" : "player2", "move" : "rock" }

returns Accepted (202) with location header.

Available Games

GET (api/Games/available/)

returns available games.

GET api/Games/available/{ id }

returns single available game (200/404)

Ended Games

GET (api/Games/ended)

returns ended games. (200)

GET api/Games/ended/{ id }

returns single ended game (200/404)

E-tags

We have not used ETags in this solution. ETags could help us with caching and conditional requests, on the GET action as well as concurrency checks on PUT, PATCH and DELETE actions.

E-tags could help the API client when reading games. In the make move case we could use If-Match to report if move already been made.

This could be the topic of another post.

Conclusion

In the beginning we asked if the domain using CQRS, event sourcing and eventual consistency would affect the HTTP API. In our implementation we used 202 accept when retrieving a command with a location header for the client to go pull. So the async nature and eventual consistency part affected the API, other than that the public domain doesn't tell about the domain implementation.

We could have created an API where even the eventual consistency wasn't notable to the API client. The server could have issued commands and waited for to read model to updated before returning to the API client. The added responibility would affect the server. But it's up to your what fits you API best.

Enjoy!

Resources

Routing Commands using Azure Service Bus Sessions

Introduction

In an earlier post a colleague wrote about Message ordering and FIFO options in Azure Service Bus Queues and Topics. In this post we're going to look at how to use Azure Service Bus Topic/Queue Sessions to help us route commands, in a scenario when using Service Bus as a Command Bus.

Scenario

We're going to use a simplified domain of a Rock-Paper-Scissors game. The same domain used in earlier posts about F# modeling. In this domain we only have one Aggregate - the game aggregate.

To state the obvious, if we were to unleash this game as a service/app, immediate success would follow. That would mean heavy load on our service.

One node would struggle to process all commands with a decent throughput. If we add more nodes and let them compete in processing commands from the Topic/Queue, concurrency problems would occur with the aggregate state. One node could process a command out of order due to another node haven't yet processed the command before for that game.

We would like the same node to handle all commands for a given game - aggregate instance. How could we route all commands for a game instance to the same node?

Service Bus Sessions

To solve this problem message sessions could be used. We're going to use a session to identify a game. To send a message/command as part a session is as simple as setting the SessionId property on the BrokeredMessage. Simplified;

new BrokeredMessage(command) {SessionId = command.AggregateId};

The node processing the commands needs to start handling commands for a given session/game. To do this the client starts accepting a session.

var session = await _client.AcceptMessageSessionAsync();

When accepting the session we could specify what session we like to receive messages for, using an overload. But we don't know the session. Without session param the client would take the first session available.

We could also give a timespan as timeout when the node would release the session if no messages are received. This allows the node or another node to pick up the session when new messages comes in. Note if you don't specify a TimeSpan, a default time out is set, 60 seconds)

var session = await _client.AcceptMessageSessionAsync(TimeSpan.FromMinutes(1));

Ok, great. But how could one node listen to several sessions? The node needs to process as many messages/commands as possible.

When not using sessions, it is easy to handle messages using the message pump.

_client.OnMessageAsync(messageHandler, options);

Using sessions is similar, the message pump is on every session.

var session = await _client.AcceptMessageSessionAsync(TimeSpan.FromMinutes(1));
session.OnMessageAsync(messageHandler, options);

Ok, but that’s only one session, the node should process more than one game each minute (or one in total). We could let the node handle a session per thread.

private async Task StartSessionAsync(Func<BrokeredMessage, Task> messageHandler, OnMessageOptions options)
{
        var session = await _client.AcceptMessageSessionAsync(TimeSpan.FromMinutes(1));
        session.OnMessageAsync(messageHandler, options);
        await Task.Factory.StartNew(() => StartSessionAsync(messageHandler, options));
}

Note to keep the sample simple, we don't catch the Timeout Exception from AcceptMessageSession in the example code, nor do we have any cancellation token.

Here we start listening for new a session on a new thread when the first session is accepted. If the node would go down, the session should be released, letting another node take over.

Ok, simplified and not complete. But I hope if given some information on how session could be used for this kind of scenario.

public static async Task StartSessionAsync(Func<Task<MessageSession>> clientAccept,  Func<BrokeredMessage, Task> messageHandler, Action<string> logger,  OnMessageOptions options, CancellationToken token)
    {
        if (!token.IsCancellationRequested)
        {
            try
            {
                var session = await clientAccept();
                logger(string.Format("Session accepted: {0} on {1}", session.SessionId, Thread.CurrentThread.ManagedThreadId));
                session.OnMessageAsync(messageHandler, options);
            }
            catch (TimeoutException ex)
            {
                logger(string.Format("Session timeout: {0}", ex.Message));
            }
            catch(Exception ex)
            {
                logger(string.Format("Session exception: {0}", ex.Message));
                throw;
            }

             await Task.Run(() => StartSessionAsync(clientAccept, messageHandler, logger, options, token), token);
        }
    } 

Conclusion

We have looked at a simple scenario in theory (aka the happy place) to explore session’s usage when using Azure Service Bus Queues or Topics in a Command Bus scenario.

Below you find some other interesting, related topics. Maybe topics for future exploration.

Enjoy!

More Resources

There are a lot more on the topic of routing, partitioning and scaling. The Service Bus is now supported with Azure WebJobs, and web jobs support auto-scaling based on queue size.

If we were to model our Aggregates with Orleans as a Grain, the routing and scaling would be handled for us in a scenario like this, where the a game instance would be the grain instance identity.

On the topic of high throughput, the latest addition to the Service Bus - Event Hubs have some interesting features.

@AntonFagerberg

This comment has been minimized.

Copy link

commented Aug 14, 2014

"Service Bus" -> "ServiceBus"
"handling command for a given" -> "handling commands"
"the node would go done" go?

@niklaslundberg

This comment has been minimized.

Copy link

commented Aug 18, 2014

need to start --> needs to start
need to process --> needs to process

the node should process more games than one each minute -->
the node should process more than one game each minute

await Task.Factory.StartNew(() => StartSessionAsync(messageHandler, options)); -->
Task.Run()?

. Letting another node take over. -->
, letting another node take over.

Skriv ut hela namn på typer, använd bara "var" när det uppenbart vilket typ det handlar om.

Flytta Orleans och andra referenser till ny rubrik, t.ex. More resources, då det handlar om helt nya frågor.
Skriv ut Azure WebJobs, vad Orleans är.

@johanhaleby

This comment has been minimized.

Copy link

commented Aug 26, 2014

"We could also give a timespan as timeout when the node would release the session if no messages are received. This allows the node or another node to pick up the session when new messages comes in." ska nog flyttas ovanför "var session = await _client.AcceptMessageSessionAsync(TimeSpan.FromMinutes(1));".

"When not using sessions, it is easy to handle messages using the message pump."
Behöver man beskriva mer vad en message pump är eller räcker det med länken? Jag vet inte vad det är för något.

"... we start listening for new a session on a new thread with a bit of delay ..."
Varför behöver man en delay!?

Allmänna frågor:
Vad händer om en nod går ner och inte lyckas hantera ett helt spel? Kan en annan nod ta över sessionen då?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.