Skip to content

Instantly share code, notes, and snippets.

@davidfowl
Last active January 7, 2017 10:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save davidfowl/ca0f175d02f25e2404752d97de66a9e0 to your computer and use it in GitHub Desktop.
Save davidfowl/ca0f175d02f25e2404752d97de66a9e0 to your computer and use it in GitHub Desktop.
using System;
using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading.Tasks.Channels;
namespace ConsoleApp39
{
public class Program
{
public static void Main(string[] args)
{
var channel = Channel.CreateUnbounded<ByteBuffer>();
var consumeTask = ConsumeRaw2(channel);
var produceTask = ProduceRaw2(channel);
Console.Read();
channel.Complete();
Task.WaitAll(consumeTask, produceTask);
}
private static async Task ProduceRaw2(IWritableChannel<ByteBuffer> channel)
{
byte[] data = Encoding.UTF8.GetBytes(@"GET /developer/documentation/data-insertion/r-sample-http-get HTTP/1.1
Host: marketing.adobe.com
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Encoding: gzip, deflate, sdch, br
Accept-Language: en-US,en;q=0.8,it;q=0.6,ms;q=0.4
");
int at = 0;
var buffer = new ByteBuffer(1024);
while (at < data.Length)
{
// Need a new buffer
if (buffer.WritableBytes == 0)
{
buffer = new ByteBuffer(1024);
}
buffer.WriteByte(data[at++]);
await channel.WriteAsync(buffer);
await Task.Delay(1000);
}
}
private static async Task ConsumeRaw2(IReadableChannel<ByteBuffer> channel)
{
while (await channel.WaitToReadAsync())
{
ByteBuffer buffer;
if (channel.TryRead(out buffer))
{
ConsumeBuffer(buffer);
if (buffer.ReadableBytes == 0)
{
buffer.Dispose();
buffer = null;
}
}
}
}
private static async Task ProduceRaw(IWritableChannel<ByteBuffer> channel)
{
byte[] data = Encoding.UTF8.GetBytes(@"GET /developer/documentation/data-insertion/r-sample-http-get HTTP/1.1
Host: marketing.adobe.com
Connection: keep-alive
Cache-Control: max-age=0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8
Accept-Encoding: gzip, deflate, sdch, br
Accept-Language: en-US,en;q=0.8,it;q=0.6,ms;q=0.4
");
int at = 0;
while (at < data.Length)
{
var buffer = new ByteBuffer(1024);
buffer.WriteByte(data[at++]);
await channel.WriteAsync(buffer);
await Task.Delay(1000);
}
}
private static async Task ConsumeRaw(IReadableChannel<ByteBuffer> channel)
{
ByteBuffer cumulation = null;
while (await channel.WaitToReadAsync())
{
ByteBuffer buffer;
if (channel.TryRead(out buffer))
{
if (cumulation == null)
{
cumulation = buffer;
}
else
{
if (cumulation.WritableBytes < buffer.ReadableBytes)
{
// Can't fit, return all and uber copy
var oldCumulation = cumulation;
int size = oldCumulation.ReadableBytes + buffer.ReadableBytes;
cumulation = new ByteBuffer(size);
cumulation.Write(oldCumulation);
oldCumulation.Dispose();
}
cumulation.Write(buffer);
buffer.Dispose();
}
ConsumeBuffer(cumulation);
if (cumulation.ReadableBytes == 0)
{
cumulation.Dispose();
cumulation = null;
}
}
}
}
private static void ConsumeBuffer(ByteBuffer buffer)
{
for (int i = 0; i < buffer.ReadableData.Length; i++)
{
if (buffer.ReadableData[i] == ' ')
{
buffer.Advance(i + 1);
break;
}
}
}
public class ByteBuffer
{
private readonly BufferPool _pool = NativeBufferPool.Shared;
private readonly OwnedMemory<byte> _data;
public ByteBuffer(int size)
{
_data = _pool.Rent(size);
}
public ReadOnlySpan<byte> ReadableData => _data.Span.Slice(Start, ReadableBytes);
public Span<byte> WriteableData => _data.Span.Slice(End, WritableBytes);
public int Start { get; private set; }
public int End { get; private set; }
public int ReadableBytes => End - Start;
public int WritableBytes => _data.Length - End;
public void Write(ByteBuffer buffer)
{
if (buffer.ReadableBytes > WritableBytes)
{
throw new IndexOutOfRangeException();
}
buffer.ReadableData.CopyTo(WriteableData);
End += buffer.ReadableBytes;
}
public void Write(byte[] data)
{
if (data.Length > WritableBytes)
{
throw new IndexOutOfRangeException();
}
data.Slice().CopyTo(WriteableData);
End += data.Length;
}
public void WriteByte(byte b)
{
var span = WriteableData;
span[0] = b;
End++;
}
public void Advance(int amount)
{
Start += amount;
}
public void Dispose()
{
_pool.Return(_data);
}
public override string ToString()
{
var sb = new StringBuilder();
var data = ReadableData;
for (int i = 0; i < data.Length; i++)
{
sb.Append((char)data[i]);
}
return sb.ToString();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment