Skip to content

Instantly share code, notes, and snippets.

@JesterXL
Last active March 29, 2018 13:49
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save JesterXL/d2f89ccb17b26574b233 to your computer and use it in GitHub Desktop.
Save JesterXL/d2f89ccb17b26574b233 to your computer and use it in GitHub Desktop.
Streams in JavaScript Sample Code

Streams in JavaScript

The slides for this code.

If you're trying to learn the basics of the Array methods, this thorough tutorial with answers is a great place to start.

Found some WONDERFUL documentation if you're curious what type of observable to use or method to use on your data. It shows it in a big table of user stories.

Lee Campbell has some examples as well.

Function used for printing in jsfiddle:

function out() {
    var a = Array.prototype.slice.call(arguments, 0);
    document.getElementById("debug").innerHTML += a.join(" ") + "\n";
    console.log.apply(console, arguments);
}

Array Filtering

Basic lodash.

var party = [{
    race: "Dwarf",
    charClass: "Fighter",
    name: "Karl",
    hitPoints: 12,
    totalHitPoints: 24
}, {
    race: "Elf",
    charClass: "Wizard",
    name: "Therin",
    hitPoints: 8,
    totalHitPoints: 16
}, {
    race: "Half-Elf",
    charClass: "Cleric",
    name: "James",
    hitPoints: 20,
    totalHitPoints: 20
}, {
    race: "Halfling",
    charClass: "Thief",
    name: "Happyfoot",
    hitPoints: 12,
    totalHitPoints: 18
}, {
    race: "Human",
    charClass: "Paladin",
    name: "Brittany",
    hitPoints: 16,
    totalHitPoints: 23
}, {
    race: "Human",
    charClass: "Ranger",
    name: "Tamara",
    hitPoints: 16,
    totalHitPoints: 18
}];

var humans = _.filter(party, function (i) {
    return i.race == "Human";
});
out("humans:", humans);

Array Filtering a Filter

var party = [{
    race: "Dwarf",
    charClass: "Fighter",
    name: "Karl",
    hitPoints: 12,
    totalHitPoints: 24
}, {
    race: "Elf",
    charClass: "Wizard",
    name: "Therin",
    hitPoints: 8,
    totalHitPoints: 16
}, {
    race: "Half-Elf",
    charClass: "Cleric",
    name: "James",
    hitPoints: 20,
    totalHitPoints: 20
}, {
    race: "Halfling",
    charClass: "Thief",
    name: "Happyfoot",
    hitPoints: 12,
    totalHitPoints: 18
}, {
    race: "Human",
    charClass: "Paladin",
    name: "Brittany",
    hitPoints: 16,
    totalHitPoints: 23
}, {
    race: "Human",
    charClass: "Ranger",
    name: "Tamara",
    hitPoints: 16,
    totalHitPoints: 18
}];

var healthyHumans = _.chain(party)
    .filter(function (i) {
    return i.race == "Human";
})
    .filter(function (i) {
    return i.totalHitPoints > 18;
})
.value();
healthyHumans.forEach(function(i)
{
    out("name:" + i.name);
}); // Brittany

Events vs Streams

myButton.addEventListener("click", function(event)
{
	console.log(event); // MouseEvent
});
myButton.addEventListener("dblclick", function(event)
{
	console.log(event); // MouseEvent
});

// doesn't really work with use strict, but doesn't matter, no real need
// myButton.removeEventListener("click", arguments.callee);

Various Event Types

// jquery
$( "#target" ).click(function(mouseEvent)
{
	console.log("mouseEvent:", mouseEvent);
});
// Bacon
$('#someButton').asEventStream('click').subscribe(function(mouseEvent)
{
	console.log("mouseEvent:", mouseEvent);
});
// Dart & Polymer
someButton.onClick.subscribe((MouseEvent mouseEvent)
{
	console.log("mouseEvent:", mouseEvent);
});
// RxJS
Rx.DOM.click($('#input')).subscribe(function (x)
{
	console.log('clicked!');
});

Submit Once

$("#submit").one("click", function()
{
  // submit
});

// RxJS
Rx.DOM.click($('submit'))
.first()
.subscribe(function (x)
{
	// submit
});
// last is infinity

Event Bubbling Still Intact

Loop through a list, create a ton of sprites, attach a listener to each.

// Dart Bad
hitArea.onMouseClick.listen((MouseEvent event)
{
    _controller.add(item.name);
});

OR... use event bubbling.

// Dart Good
onMouseClick.where((MouseEvent event)
{
	return event.target is Sprite;
})
.listen((MouseEvent event)
{
	Object data = event.target.userData;
	_controller.add(data["data"]);
});

Publish Subscribe / Event Message Buses

// Polymer (DOM only)
someComponent.addEventListener('global-message', function(e)
{
    //
});

anotherComponent.addEventListener('global-message', function(e)
{
    //
});

this.fire('global-message');
this.asyncFire('global-message');
// Dart
var pub = new StreamController();
var sub = pub.asBroadcast();

var subscription1 = sub.listen((_)
{
	//
});

var subscription1 = sub.listen((_)
{
	//
});

pub.add('global-message'); // async
// RxJS
var pubsub = new Rx.Subject();

var subscription1 = pubsub.subscribe(function(data)
{
    //
});

var subscription2 = pubsub.subscribe(function(data)
{
    //
});

pubsub.onNext('global-message'); // sync unless Promise

Async / Promise Support Built In

// Dart
String url = 'http://api.openweathermap.org/data/2.5/weather?q=Richmond,VA&units=imperial';
HttpRequest.request(url, responseType: 'json')
.asStream()
.listen((data)
{
	print(data.response);
});
// RxJS polls the service
function getWeather()
{
    return $.get('http://api.openweathermap.org/data/2.5/weather?q=Richmond,VA&units=imperial')
    .promise();
}

var weather = Rx.Observable.defer(getWeather);

var poll = Rx.Observable
    .empty()
    .delay(2000);

weather
	.concat(poll)
	.repeat()
	.subscribe(function(weatherData)
	{
		// print out weather data
	});
// Dart Unit Test, example of ES7 await
 test("GameLoop starts and generats a tick using our mocked window", () async
    {
      bool called = false;
      GameLoop gameLoop = new GameLoop();
      gameLoop.start();
     await gameLoop.stream.listen((GameLoopEvent event)
      {
        called = true;
      });

      expect(called, isTrue);
    });
  });

Don't get it? What if I change await to yield...?

yield gameLoop.stream.listen((GameLoopEvent event)

Hot vs Cold Streams

Here's a cold stream. Events are added before you listen, yet you get the events in order once you listen. Think "Github"; how you get a stream of commits whenever you decide to pull.

var subject = new Rx.ReplaySubject();

subject.onNext('a');
subject.onNext('b');
subject.onNext('c');

setTimeout(function()
           {
var subscription = subject.subscribe(
    function (x) {
        out('Next: ' + x.toString());
    });
           }, 3000);

subject.onNext('d');

Here's a hot Stream. You can click the button for 2 seconds; the code is added, and then you start getting events.

var source = Rx.Observable.fromEvent(document.getElementById('submit'), 'click');
setTimeout(function()
{
    out("Added subscription.");
var sub = source.subscribe(function(e)
{
    out("submit");
});
}, 2000);

Here's a hot stream where you'll miss all the events if you add the subscribe after they've already run. Cold streams can help race conditions, but here's where they can cause them.

var subject = new Rx.BehaviorSubject(42);

//setTimeout(function()
//{
var subsub = subject.subscribe(
    function (x) {
        out('Next: ' + x.toString());
    });
//}, 2000);
subject.onNext('cow');
subject.onNext(new Date());
subject.onCompleted();

It should be noted, Dart streams are cold by default. I haven't figured out Rx/Bacon/Node yet.

// Dart Cold Stream by default
void test()
{
    StreamController pub = new StreamController();
    pub.add("uno");
    pub.add("dos");
    pub.add("tres");

    new Timer(new Duration(seconds: 3), ()
    {
        var sub = pub.stream.listen((_)
        {
            print("_: $_");
        });
    });

    new Timer(new Duration(seconds: 4), ()
    {
        pub.add("quatro");
    });

}

Orchestration

Orchestration meaning "When I ask for a url, orchestration is the work done to get me that data based on that url." Any Flash/Flex developer will remember the days of orchestrating various URL's to different servers/domains just to show a user 1 page. Nowadays we do that better via Node.

Node Restify

Here's 2 ways to orchestrate in Node Restify:

// Node Restify: internal routes
server.get('/foo/:id', function (req, res, next) {
   next('foo2');
});

server.get({
    name: 'foo2',
    path: '/foo/:id'
}, function (req, res, next) {
   assert.equal(count, 1);
   res.send(200);
   next();
});

// with chains
server.get(
    '/foo/:id',
    function(req, res, next) {
        console.log('Authenticate');
        // parse headers, verify
        return next();
    },
    function(req, res, next) {
        res.send(200);
        return next();
    }
);

Orchestrating Using Promises [Still writing]

A common occurence; I need to go to 4 different places just to show data for 1 screen. Authorize the request if need be, snag back a user object, verify that with AuthZ, get a list of Accounts from MongoDB, snag some content from a CMS system, that doesn't embrace Promises, then snag a config file from Redis cache, then combine it all and spit back to the client.

// authz, data, then content, then config
// assume Bluebird Promisfy worked
app.get('/api/getdata', function (req, res, next)
{
    var user                   = null;
    var listOfAllAccounts      = null;
    var screenLocalizedContent = null;
    var config                 = null;

    auth.authorize(req)
    .then(function(res)
    {
        return AuthZParseHeaders.send(res);
    })
    .then(function(authenticatedUser)
    {
        user = authenticatedUser;
        return Account.getAll();
    })
    .then(function(accounts)
    {
        // get content, cache in Redis
        return new Promise(function(success, failure)
        {
            hippo.getScreenContent(user.locale, function(result)
            {
                if(err) failure(err);
                success(result);   
            });
        });
    })
    .then(function(content)
    {
        screenLocalizedContent = content;
        return new Promise(function(success, failure)
        {
            if(redisUICache.get('intro') !== null)
            {
                success(redisUICache.get('intro'));
            }
            else
            {
                fs.loadConfig('intro', function(err, config)
                {
                    if(err) failure(err);
                    redisUICache.set('intro', config);
                    success(config);
                });
            }
        });
    })
    .then(function(cachedConfig)
    {
        res.send(200, {
            response: true,
            data: {
                user: user.info,
                accounts: listOfAllAccounts,
                content: screenLocalizedContent,
                config: config,
            }
        });
    });
});

Cray. We'll solve in a minute. Let's start from the beginning on how we merge data from various async sources.

Dart example of using data from first service to populate the 2nd:

// Dart
EventStream getWeatherFromGeoIP = new EventStream(new GetLocation().asStream())
.debounce(new Duration(milliseconds: 500))
.map((HttpRequest request)
{
	return new Point(request.response.lat, request.response.lon);
})
.asyncMap((Point point)
{
	return getWeather(point.x, point.y);
})
.distinct();
getWeatherFromGeoIP.listen((num weather)
{
	print("It is $weather degrees.");
});

new Timer(new Duration(minutes: 1), ()
{
	locationService.getLocation();
});

Merging 2 similiar services together and taking the same value from the combined stream:

// these guys update their values faster...
function getWeatherOpen()
{
    return Rx.Observable.just(72);
}
// ... but these guys are more accurate.
function getWeatherPrivate()
{
    return Rx.Observable.just(72);
}
var fastWeather            = Rx.Observable.defer(getWeatherOpen);
var accurateButSlowWeather = Rx.Observable.defer(getWeatherPrivate);
var weather                = fastWeather.merge(accurateButSlowWeather).distinct();
weather.subscribe(function(data)
{
    out("data:", data);
});

Now, let's refactor our 4 prong orchestration of Node Restify above to a single stream:

function kind(item)
{
     return Object.prototype.toString.call(item).slice(8, -1);
}

function n(name)
{
    return function toString()
    {
        return name;
    }
}

function Config(configs)
{
        this.configs = configs;
}
Config.prototype.toString = n("Config");

function User(name, info)
{
   this.name = name;
    this.info = info;
}
User.prototype.toString = n('User');

function Content(text, values)
{
  this.text = text;
    this.values = values;
}
Content.prototype.toString = n("Content");

function Account(name, id, amount)
{
    this.firstName = name;
    this.id = id;
    this.amount = amount;
}
Account.prototype.toString = n("Account");

var getUser = Rx.Observable.create(function(observer)
  {
      observer.onNext(new User('Jesse', '9823498'));
    observer.onCompleted();                                
  });

var getContent = Rx.Observable.return(new Content("Some Text", ['some', 'values']));

var redisUICache = {
    get: function(key)
    {
        if(key == 'intro')
        {
            return new Config(['list', 'of', 'configs']);
        }
        return null;
    },
    set: function(key, value)
    {
        return true;
    }
};

var getAndCacheConfig = Rx.Observable.create(function(observer)
  {
        if(redisUICache.get('intro') !== null)
            {
                observer.onNext(redisUICache.get('intro'));
            }
            else
            {
                fs.loadConfig('intro', function(err, config)
                {
                    if(err) observer.onError(err);
                    redisUICache.set('intro', config);
                    observer.onNext(config);
                });
            }
  });

var getAccountList = Rx.Observable.return(
    [
        new Account("James", "1", 100),
        new Account("Kelly", "2", 200),
        new Account("Karl", "3", 300)
    ]
);
var intro = getUser
    .merge(getContent)
    .merge(getAndCacheConfig)
    .bufferWithCount(3)
    .merge(getAccountList)
    .bufferWithCount(2);

out("****");
intro.subscribe(function(value)
           {
               out("value:", value); 
               //out("kind:", kind(value));
           });

Let's modify the subscribe function to convert it into some JSON we can return to the client:

var intro = getUser
    .merge(getContent)
    .merge(getAndCacheConfig)
    .bufferWithCount(3)
    .merge(getAccountList)
    .bufferWithCount(2)
.map(function(list)
    {
  return {
    user: list[0][0],
    content: list[0][1],
    config: list[0][2],
    accounts: list[1]
  };
});

Sockets

Very simple socket; data is pushed to you on your stream; it effectively never "completes".

// RxJS Socket push
var socketSubject = fromWebSocket('ws://localhost:9999', 'sampleProtocol');

// Receive data
socketSubject.subscribe(
    function (data) {
        // Do something with the data
    });

// Send data, usually coming from the server. If this is Node,
// this is your chance to transform it before it goes to client,
// or other Node middlewares.
socketSubject.onNext(42);

Angular Models

Many people, as their models grow, will employ an Observer pattern approach to make their Models more scaleable, and more fine tuned data to not cause excessing change events. Joel Hooks outlines how to do that on his blog.

What you end up with is this in controllers:

$rootScope.$on('workoutsChanged', function()
{
	vm._updateWorkout();
});

$rootScope.$on('currentDateChanged', function()
{
	vm._updateWorkout();
});

It's like Backbone events in Angular; some global message is sent, and we respond. Unlike events, Streams can pause these events, react to them, and change the entire sequence of reacting vs. calling a method. Here's a basic example from the docs:

// Helper function that wraps $watch
Rx.Observable.$watch = function (scope, watchExpression, objectEquality) {
    return Rx.Observable.create(function (observer) {
        // Create function to handle old and new Value
        function listener (newValue, oldValue) {
            observer.onNext({ oldValue: oldValue, newValue: newValue });
        }

        // Returns function which disconnects the $watch expression
        return scope.$watch(watchExpression, listener, objectEquality);
    });
};

Once you have that helper, you can orchestrate a lot more effectively in your Angular Controller. Notice we throttle it (vs. me debouncing in my Directive link function to avoid DOM thrashing).

Rx.Observable.$watch(scope, 'name')
    .throttle(1000) 
    .map(function (e)
    {
        return e.newValue;
    })
    .do(function ()
    { 
        // Set loading and reset data
        scope.isLoading = true;
        scope.data = [];
    })
    .flatMapLatest(querySomeService)
    .subscribe(function (data)
    {
        // Set the data
        scope.isLoading = false;
        scope.data = data;
    });

Builds and Parsers

In Gulp and Dart, we pipe data (our source files) through a stream, and modify/inspect/verify as we go.

Here's a basic Node pipe, reading a stream of errors and shoving to a text file, seemingly for Splunk to come eat later.

var errorsFromClient = getErrorPostStream();
var errorLogFile     = fs.createWriteStream('error.log');
errorsFromClient.pipe(errorLogFile);

Piping Data via Pipes

This stream analyzes the code through jshint, then verifies its style via jscs, and finally verifies it's not too complex through Gulp.

return gulp.src(CONFIG.client.sourceFiles)
    .pipe(jshint())
    .pipe(jshint.reporter('jshint-stylish'))
    .pipe(jshint.reporter('fail'))
    .on('error', function(e)
    {
    	console.warn('jshint failed.');
    })
    .pipe(jscs())
    .on('error', function(e)
    {
    	console.warn('jscs failed');
    })
    .pipe(complexity({
        	cyclomatic: [3, 7, 12],
            halstead: [8, 13, 20],
            maintainability: 100
        })
    )
    .on('error', function(e)
    {
    	console.warn('complexity failed');
    });

Notice there is a LOT of work actually going on in each of those things getting piped to (jshint, jscs, etc).

Dart Consumers

Here's a Dart example using the StreamConsumer interface to effectively participate in those pipes.

Here's the encrypted JSON string chat message we're parsing:

// Dart
var json = {
	"id": "2k3l4j09";
	"userID": "09234jkljsd-12093898",
	"username": "JesterXL",
	"originalMessage": "Hey RVA.js, Gaelen buys free beer!"
};

The chat message we'll parse out from the socket:

class ChatMessage
{
	String messageID;
	String userID;
	String username;
	DateTime timestamp;
	String message;
	String oringalMessage;

	ChatMessage(this.messageID,
	            this.userID,
	            this.username,
	            this.oringalMessage,
	            this.timestamp,
	            this.message)
	{
	}
}

The consumer that implements the interface to parse the messages as they come through the stream:

class ChatJSONConsumer implements StreamConsumer
{
	List<ChatMessage> messages = new List<ChatMessage>();
	String secretKey;

	ChatJSONConsumer(this.secretKey)
	{
	}

	Future consume(Stream stream)
	{
		var completer = new Completer();
		stream.listen((item)
		{
				Object obj = Cipher.decrypt(item, secretKey);
				ChatMessage message = new ChatMessage(obj.id,
														obj.userID,
														obj.username,
														obj.originalMessage);
				message.timestamp = new DateTime.fromMillisecondsSinceEpoch(message.mse,
																			{isUtc: true});
				message.message = new GifFilledHTMLFromUnicodeString(obj.originalMessage);
				messages.add(message);
			},
			onError: (err) => print("Error $err"),
			// when the stream is finished,
			// complete the Promise, returning the ChatJSONConsumer instance
			onDone: () => completer.complete(this));

		return completer.future; // return a Promise immediately
	}
}

And finally, wiring it up:

Stream getChatStream()
{

}

void test()
{
	Stream chatStream = getChatStream();
	chatStream
	.pipe(new ChatJSONConsumer())
	.any((consumer)
		=> consumer.messages.any((singleMessage)
		=> singleMessage.message.contains('JesterXL')))
	.then((consumer)
	{
		// list of messages that contain my username
		print(consumer.messages);
	});
}

Promises vs Merging Streams

Here's an index build task using Promises (Bluebirdjs in 0.10, native in 0.12):

return new Promise(function(resolve, reject)
{
    gulp.src('src/client/index.html')
    .pipe(wiredep({ignorePath: "../../"}))
    .pipe(gulp.dest('./build'))
    .on('end', resolve)
    .on('error', reject);
})
.then(function()
{
    return new Promise(function(resolve, reject)
    {
         gulp.src(CONFIG.client.sourceFiles)
        .pipe(gulp.dest('./build'))
        .on('end', resolve)
        .on('error', reject);
    });
})
.then(function()
{
    return new Promise(function(resolve, reject)
    {
         gulp.src(CONFIG.client.templateFiles)
        .pipe(gulp.dest('./build'))
        .on('end', resolve)
        .on('error', reject);
    });
})
.then(function()
{
    // NOTE: this guy, even in stream mode, breaks, so putting here.
    browserSync.reload();
});

And here's the stream merge equivalent:

var templateStream = gulp.src('src/client/index.html')
    .pipe(wiredep({ignorePath: "../../"}))
    .pipe(gulp.dest('./build'))
    .on('end', resolve)
    .on('error', reject);

var jsStream = gulp.src(CONFIG.client.sourceFiles)
    .pipe(gulp.dest('./build'))
    .on('end', resolve)
    .on('error', reject);

var htmlStream = gulp.src(CONFIG.client.templateFiles)
    .pipe(gulp.dest('./build'))
    .on('end', resolve)
    .on('error', reject);
});
return merge(htmlStream, jsStream, templateStream);

Which do you like better?

Managing Multiple State Machines in Gaming

Filtering Events

Filtering events in Dart:

initiativeStream.
.listen((event)
{
	if(event.type == InitiativeEvent.CHARACTER_READY)
	{
		print("character ready: ${event.character}");
	}
});

Even worse if you need types:

initiativeStream.
.listen((event)
{
	if(event.character is Player)
	{
		if(event.type == InitiativeEvent.CHARACTER_READY)
		{
			print("character ready: ${event.character}");
		}
	}
});

Let's filter the event before we react to it:

// filtering events
initiativeStream.where((InitiativeEvent event)
{
	return event.type == InitiativeEvent.CHARACTER_READY;
})
.listen((event)
{
	print("character ready: ${event.character}");
});

Cool, now let's fix the 2 filters, and make it easier to read and unit test:

initiativeStream.where((InitiativeEvent event)
{
	return event.type == InitiativeEvent.CHARACTER_READY;
})
.where((InitiativeEvent event)
{
	return event.character is Player;
})
.map((InitiativeEvent event)
{
	return event.character;
})
.listen((Player player)
{
	print("player is ready: ${character}");
});

Merging Events

Using Baconjs, we can merge 2 related events to trigger the same thing:

// Bacon
var loginButton = $("#loginButton").asEventStream("click");
var loginField = $("#passwordField").asEventStream("click");
var loginStream = loginButton.merge(loginField);
loginStream.onValue(function()
{
	$.post('server.com/login');
});

Same with a Dart game using Frappe:

EventStream monsterAttacks = new EventStream(new StreamController().stream);
EventStream playerAttacks = new EventStream(new StreamController().stream);
EventStream attacks = monsterAttacks.merge(playerAttacks);
attacks.listen((AttackResult result)
{
	textDropper.addTextDrop(result.attackTarget, result.hitValue);
});

This includes Sockets where different modes activate different polling types:

EventStream ajaxStream = new EventStream(new StreamController().stream);
EventStream socketStream = new EventStream(new StreamController().stream);
EventStream messages = ChatMessage.merge(socketStream);
messages.listen((ChatMessage message)
{
	querySelector("#chatField").text = message.text;
});

Streams Through Composition vs. Merging

Here we listen to our game loop:

_gameLoopStreamSubscription = _gameLoopStream
.where((GameLoopEvent event)
{
	return event.type == GameLoopEvent.TICK;
})
.listen((GameLoopEvent event)
{
	tick(event.time);
});

And whenever a user pauses the battle by opening a menu, we can pause this battle timer stream by simply stopping listening to the GameLoop. We don't to pause the whole entire game, that'd stop animating the sprites; instead, just the battle timer:

_gameLoopStreamSubscription.cancel();
_gameLoopStreamSubscription = null;
_streamController.add(new BattleTimerEvent(BattleTimerEvent.PAUSED, this));

If our state machine is set to a character, we calculate his battle timer for each game loop tick, and dispatch the even when ready on the stream:

void onCharacterTick()
{
	num result = (((effect * (speed + 20)) / 16));
	gauge += result.round();
	if (gauge >= MAX)
	{
		gauge = MAX;
		_streamController.add(new BattleTimerEvent(BattleTimerEvent.COMPLETE, this));
	}
}

Given these streams are just classes/objects, we can loop through all when pausing the game when there are multiple characters:

void pause()
{
	_battleTimers.forEach((TimerCharacterMap object)
	{
		BattleTimer timer = object.battleTimer;
		timer.pause();
	});
}

We can do heavy lifting to take care of initiative who fights. When a character is ready, we put them in the appropriate array, and dispatch a change event on the stream. Notice we're now inspecting and filtering on a stream here. It's still single, but even before Composition, we're still interested in only a particular event with a particular target. This pattern doesn't change no matter how many streams you merge/nest:

StreamSubscription<BattleTimerEvent> timerSubscription = timer.stream
.listen((BattleTimerEvent event)
{
	TimerCharacterMap matched = _battleTimers.firstWhere((TimerCharacterMap map)
	{
		return map.battleTimer == event.target;
	});
	if(event.type == BattleTimerEvent.COMPLETE)
	{
		// NOTE: pausing the BattleTimer, not the Stream listener... lol, streams!
		matched.battleTimer.pause();
		Character targetCharacter = matched.character;
		charactersReady.add(targetCharacter);
		_streamController.add(new InitiativeEvent(InitiativeEvent.CHARACTER_READY,
		character: targetCharacter));
	}
	else if(event.type == BattleTimerEvent.PROGRESS)
	{
		event.character = matched.character;
		_streamController.add(event);
	}
});

As we keep creating streams that manage async parts of the battle, we still can pause it from anywhere, and react to it from anywhere. Here, we compose the initiative stream, and write logic for the monsters:

_initiative.stream
.where((event)
{
	return event is InitiativeEvent && event.type == InitiativeEvent.CHARACTER_READY;
})
.where((event)
{
	return event.character is Monster;
})
.listen((event)
{
	List players = _initiative.players.toList();
	Random randomInt = new Random().nextInt(_initiative.players.length - 1);
	List shuffledPlayers = players.shuffle(randomInt);
	Character randomTarget = shuffledPlayers[0];
	attack(event.character, [randomTarget], AttackTypes.ATTACK);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment