Last active
January 7, 2017 10:03
-
-
Save davidfowl/ca0f175d02f25e2404752d97de66a9e0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Buffers; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
using System.Threading.Tasks.Channels; | |
namespace ConsoleApp39 | |
{ | |
public class Program | |
{ | |
public static void Main(string[] args) | |
{ | |
var channel = Channel.CreateUnbounded<ByteBuffer>(); | |
var consumeTask = ConsumeRaw2(channel); | |
var produceTask = ProduceRaw2(channel); | |
Console.Read(); | |
channel.Complete(); | |
Task.WaitAll(consumeTask, produceTask); | |
} | |
private static async Task ProduceRaw2(IWritableChannel<ByteBuffer> channel) | |
{ | |
byte[] data = Encoding.UTF8.GetBytes(@"GET /developer/documentation/data-insertion/r-sample-http-get HTTP/1.1 | |
Host: marketing.adobe.com | |
Connection: keep-alive | |
Cache-Control: max-age=0 | |
Upgrade-Insecure-Requests: 1 | |
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36 | |
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8 | |
Accept-Encoding: gzip, deflate, sdch, br | |
Accept-Language: en-US,en;q=0.8,it;q=0.6,ms;q=0.4 | |
"); | |
int at = 0; | |
var buffer = new ByteBuffer(1024); | |
while (at < data.Length) | |
{ | |
// Need a new buffer | |
if (buffer.WritableBytes == 0) | |
{ | |
buffer = new ByteBuffer(1024); | |
} | |
buffer.WriteByte(data[at++]); | |
await channel.WriteAsync(buffer); | |
await Task.Delay(1000); | |
} | |
} | |
private static async Task ConsumeRaw2(IReadableChannel<ByteBuffer> channel) | |
{ | |
while (await channel.WaitToReadAsync()) | |
{ | |
ByteBuffer buffer; | |
if (channel.TryRead(out buffer)) | |
{ | |
ConsumeBuffer(buffer); | |
if (buffer.ReadableBytes == 0) | |
{ | |
buffer.Dispose(); | |
buffer = null; | |
} | |
} | |
} | |
} | |
private static async Task ProduceRaw(IWritableChannel<ByteBuffer> channel) | |
{ | |
byte[] data = Encoding.UTF8.GetBytes(@"GET /developer/documentation/data-insertion/r-sample-http-get HTTP/1.1 | |
Host: marketing.adobe.com | |
Connection: keep-alive | |
Cache-Control: max-age=0 | |
Upgrade-Insecure-Requests: 1 | |
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36 | |
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8 | |
Accept-Encoding: gzip, deflate, sdch, br | |
Accept-Language: en-US,en;q=0.8,it;q=0.6,ms;q=0.4 | |
"); | |
int at = 0; | |
while (at < data.Length) | |
{ | |
var buffer = new ByteBuffer(1024); | |
buffer.WriteByte(data[at++]); | |
await channel.WriteAsync(buffer); | |
await Task.Delay(1000); | |
} | |
} | |
private static async Task ConsumeRaw(IReadableChannel<ByteBuffer> channel) | |
{ | |
ByteBuffer cumulation = null; | |
while (await channel.WaitToReadAsync()) | |
{ | |
ByteBuffer buffer; | |
if (channel.TryRead(out buffer)) | |
{ | |
if (cumulation == null) | |
{ | |
cumulation = buffer; | |
} | |
else | |
{ | |
if (cumulation.WritableBytes < buffer.ReadableBytes) | |
{ | |
// Can't fit, return all and uber copy | |
var oldCumulation = cumulation; | |
int size = oldCumulation.ReadableBytes + buffer.ReadableBytes; | |
cumulation = new ByteBuffer(size); | |
cumulation.Write(oldCumulation); | |
oldCumulation.Dispose(); | |
} | |
cumulation.Write(buffer); | |
buffer.Dispose(); | |
} | |
ConsumeBuffer(cumulation); | |
if (cumulation.ReadableBytes == 0) | |
{ | |
cumulation.Dispose(); | |
cumulation = null; | |
} | |
} | |
} | |
} | |
private static void ConsumeBuffer(ByteBuffer buffer) | |
{ | |
for (int i = 0; i < buffer.ReadableData.Length; i++) | |
{ | |
if (buffer.ReadableData[i] == ' ') | |
{ | |
buffer.Advance(i + 1); | |
break; | |
} | |
} | |
} | |
public class ByteBuffer | |
{ | |
private readonly BufferPool _pool = NativeBufferPool.Shared; | |
private readonly OwnedMemory<byte> _data; | |
public ByteBuffer(int size) | |
{ | |
_data = _pool.Rent(size); | |
} | |
public ReadOnlySpan<byte> ReadableData => _data.Span.Slice(Start, ReadableBytes); | |
public Span<byte> WriteableData => _data.Span.Slice(End, WritableBytes); | |
public int Start { get; private set; } | |
public int End { get; private set; } | |
public int ReadableBytes => End - Start; | |
public int WritableBytes => _data.Length - End; | |
public void Write(ByteBuffer buffer) | |
{ | |
if (buffer.ReadableBytes > WritableBytes) | |
{ | |
throw new IndexOutOfRangeException(); | |
} | |
buffer.ReadableData.CopyTo(WriteableData); | |
End += buffer.ReadableBytes; | |
} | |
public void Write(byte[] data) | |
{ | |
if (data.Length > WritableBytes) | |
{ | |
throw new IndexOutOfRangeException(); | |
} | |
data.Slice().CopyTo(WriteableData); | |
End += data.Length; | |
} | |
public void WriteByte(byte b) | |
{ | |
var span = WriteableData; | |
span[0] = b; | |
End++; | |
} | |
public void Advance(int amount) | |
{ | |
Start += amount; | |
} | |
public void Dispose() | |
{ | |
_pool.Return(_data); | |
} | |
public override string ToString() | |
{ | |
var sb = new StringBuilder(); | |
var data = ReadableData; | |
for (int i = 0; i < data.Length; i++) | |
{ | |
sb.Append((char)data[i]); | |
} | |
return sb.ToString(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment