Skip to content

Instantly share code, notes, and snippets.

View jchapuis's full-sized avatar

Jonas Chapuis jchapuis

View GitHub Profile
@jchapuis
jchapuis / gist:56c7c50fc8cd1b39e091
Created May 12, 2014 13:32
ValveSubject: supports turning on/off the output stream. Incoming events are buffered while the valve is closed, and released in order when opening it. The valve supports opening/closing any number of times.
public class ValveSubject<T> : ISubject<T>
{
private enum Valve
{
Open,
Closed
}
private readonly Subject<T> input = new Subject<T>();
private readonly Subject<Valve> valveSubject = new Subject<Valve>();
@jchapuis
jchapuis / gist:b94e199ce9e790f011aa
Created June 6, 2014 13:26
Higher order functions example
[RegisterAsImplementedInterfaces(PerOwnedType = typeof (IConnectionScope))]
internal class ResponseDispatcher : IResponseDispatcher
{
private static readonly ILog Log = LogFactory.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly IRequestsPendingResponseRepository requestsPendingResponse;
private readonly IConnectionContext context;
public ResponseDispatcher(IRequestsPendingResponseRepository requestsPendingResponse, IConnectionContext context)
{
this.requestsPendingResponse = requestsPendingResponse;
@jchapuis
jchapuis / ParallelQueryExtensions.cs
Last active August 29, 2015 14:07
Observable parallel query
public static class ParallelQueryExtensions
{
public static IObservable<T> ToUnorderedObservable<T>(this ParallelQuery<T> query)
{
return Observable.Create<T>(
obs =>
{
try
{
query.WithMergeOptions(ParallelMergeOptions.NotBuffered).ForAll(obs.OnNext);
@jchapuis
jchapuis / gist:739ad3f812a3d78b51f4
Created December 2, 2014 16:15
Monitoring log4net UDP appender in LinqPad
async void Main()
{
var endPoint = new IPEndPoint(IPAddress.Loopback, 8080);
using (var listener = new UdpClient(endPoint))
{
while(true) {
var result = await listener.ReceiveAsync();
var message = Encoding.UTF8.GetString(result.Buffer, 0, result.Buffer.Length);
XElement.Parse(message).Elements().ToDictionary(el=>el.Name, el=>el.Value).Dump();
}
@jchapuis
jchapuis / ToggleParser.cs
Last active February 20, 2020 11:35
C# command-line parser
/// <summary>
/// Simple command line toggles parser:
/// - toggles are identified with (any number of) '-' prefixes
/// - toggle can be with or without associated value
/// - toggles are case-insensitive
///
/// <example>--toggle_without_value -toggle value</example>
/// </summary>
public class ToggleParser
{
@jchapuis
jchapuis / MonixGrpcAdapters.scala
Last active July 28, 2017 05:33
Adapters to convert a grpc stream to a back-pressured observable (supporting back-pressure)
import io.grpc.stub.{ClientCallStreamObserver, ClientResponseObserver, StreamObserver}
import monix.execution.Ack.{Continue, Stop}
import monix.execution.{Cancelable, Scheduler}
import monix.reactive.observers.Subscriber
import monix.reactive.{Observable, Observer}
import scala.concurrent.{CancellationException}
import scala.util.{Failure, Success}
object MonixGrpcAdapters {
@jchapuis
jchapuis / tasks.json
Created May 19, 2017 07:56 — forked from scottaddie/tasks.json
Webpack integration via tasks.json
{
"version": "0.1.0",
"command": "${workspaceRoot}/node_modules/.bin/webpack",
"isShellCommand": true,
"args": [
"--display-modules",
"--progress"
],
"echoCommand": true,
"tasks": [
@jchapuis
jchapuis / ZipLatest.scala
Last active July 27, 2018 21:12
Akka stream ZipLatest GraphStage
import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
/**
* Zips two streams, picking always the latest of the elements of each source
*
* No element is emitted until at least one element of each becomes available. Whenever a new
* element appears, a new tuple is emitted with the last seen element of the other type
*
* '''Emits when''' all of the inputs have at least an element available, and then each time an element becomes
@jchapuis
jchapuis / Aggregator.scala
Created February 5, 2019 21:11
Akka-typed receptionist helpers
import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.typed.scaladsl.ActorSource
import akka.util.Timeout
import com.typesafe.scalalogging.StrictLogging
import scala.concurrent.TimeoutException
case class Ride(id: Ride.ID,
origin: Address,
destination: Address,
pickupTime: Instant,
vehicle: Option[Vehicle.ID],
status: Ride.Status)
object Ride {
sealed trait Status
case object Pending extends Status