Skip to content

Instantly share code, notes, and snippets.

@DylanDmitri
Last active June 8, 2018 04:37
Show Gist options
  • Save DylanDmitri/148b6c25df0779ffa4e97744a0bd2889 to your computer and use it in GitHub Desktop.
Save DylanDmitri/148b6c25df0779ffa4e97744a0bd2889 to your computer and use it in GitHub Desktop.

New client-side class, "StreamingUploadHandler", that's a wrapper around the write end of a pipe. This is a nice interface, and can be more consistently ported to other clients where we don't have the same c# channel class.

Constructing a "StreamingUploadHandler" is async, because it confirms the server is ready to receive incoming chunks. The constructor needs two types, one for inputs (what data do you send?) and one for outputs (what data do you receive?).

For a basic file upload, you would send byte arrays and receive a status message.

In this example, you send letters one-by-one, and the server concats them and returns a string.

var handler = await StreamHandler.NewUploadStream<char, string>(
  targetMethod: "UploadWord",
  channelSize: 5,
);

foreach (char c in "hello world") 
{
    await handler.Send(c);
    await Task.Delay(1000);
}

await handler.Complete();
Console.Write(handler.result);

And here's how the protocol works...

// NewUploadStream("UploadWord", channelOptions:{size=5, bounded=true})
client
  handler
    generates a new invocationId
    
    creates a new pipe with the given options
    saves (client pipe input) to the handler
    starts background thread "client relay loop"
      passes in (client pipe output)
      "client relay loop" awaits items from the pipe
        
    sends an "open stream" message
      awaits: an invocation with id="42" and target="UploadWord"
    
server
  receives the "open stream" message
    creates a new pipe
    saves (server pipe input) in _pipeStore["42"]

    starts background thread "UploadWord"
      passes in (server pipe output)
      passes in (additional args from "open stream" message)
      "UploadWord" awaits items from the pipe

    returns an "invocation complete" message
      this ends the client's waiting


// foreach chunk of data ~ handler.Send
client
  handler.Send
    packs chunk into a `StreamItem`
    attaches `invocationId` to the item
    puts `streamItem` into the pipe 
  client relay loop
    retreives item from pipe
    acquires connectionlock, validates state, etc
    sends "stream item" to server
    no response is needed

server
  receives the "stream item" message
    using the items's invocationId
      puts message.data into _pipeStore["42"]
      
  :note: data is processed strictly in arrival order
  :note: there's only one thread of "UploadWord"
  :note: and it pulls chunks from the pipe one-by-one    
  
  background thread "UploadWord"
    retreives data from pipe
    does whatever user defined server code is needed
    

// await handler.Complete();
client
  handler.Complete
    awaits a "Stream Complete" message to the server
    
server
  receives the "stream complete" message
    gets relevant pipe: _pipeStore[message.id]
    disallows new streamItems from entering the pipe
    waits until the pipe is empty (ie all queued items have been processed)
    awaits pipe.Complete()
     
  background thread "UploadWord"
    (client pipe output) signals that it is complete
    user code runs
    the return value is calculated and returned
    thread ends
    
  back to receiving the "stream complete" message
    the awaiting of pipe.Complete() needs to return a value
    this means it is a special pipe, a wrapper around a normal pipe
    server returns an "invocation response" message containing the result

client
  receives the invocation response
    saves the result to handler.result
    this is usually a response like "ok" or "upload complete"
    sometimes an error like "corrupted data stream"
    sometimes it can be more complicated

TODO :: better buffering to deal with backpressure

TODO :: the special server-side pipe wrapper

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment