Skip to content

Instantly share code, notes, and snippets.

@dbeattie71
Forked from itn3000/pipelinetest.cs
Created March 23, 2018 21:12
Show Gist options
  • Save dbeattie71/88726699d5cd8f3f2b010e57b5ee5c41 to your computer and use it in GitHub Desktop.
Save dbeattie71/88726699d5cd8f3f2b010e57b5ee5c41 to your computer and use it in GitHub Desktop.
System.IO.Pipelines tests
using System;
using System.Collections;
using System.Collections.Generic;
struct TestEnumerator<T> : IEnumerator<T>
{
ArraySegment<T> m_Segment;
public TestEnumerator(ref ArraySegment<T> seg)
{
m_Segment = seg;
}
public T Current => throw new NotImplementedException();
object IEnumerator.Current => throw new NotImplementedException();
public void Dispose()
{
throw new NotImplementedException();
}
public bool MoveNext()
{
throw new NotImplementedException();
}
public void Reset()
{
throw new NotImplementedException();
}
}
namespace pipelinetest
{
using System.IO.Pipelines;
using System.Buffers;
using System.Threading.Tasks;
using System.Linq;
using System.Threading;
static class ArraySegmentExtension
{
public struct ArraySegmentEnumerator<T>
{
public ArraySegmentEnumerator(ArraySegment<T> seg)
{
m_Segment = seg;
m_CurrentIndex = 0;
m_IsEnd = false;
}
ArraySegment<T> m_Segment;
public T Current => m_Segment.Array[m_Segment.Offset + m_CurrentIndex];
int m_CurrentIndex;
bool m_IsEnd;
public bool MoveNext()
{
if (!m_IsEnd)
{
m_CurrentIndex++;
if (m_CurrentIndex < m_Segment.Count)
{
return true;
}
m_IsEnd = true;
}
return false;
}
}
public static ArraySegmentEnumerator<T> GetEnumerator<T>(this ArraySegment<T> seg)
{
return new ArraySegmentEnumerator<T>(seg);
}
}
class Program
{
static async Task PipelineTest()
{
const int DataSize = 1000;
const int MaxLoop = 1000;
var opt = new PipeOptions(minimumSegmentSize: DataSize);
var pipe = new Pipe(opt);
await Task.WhenAll(Task.Run(() =>
{
for (int i = 0; i < MaxLoop; i++)
{
var s = pipe.Writer.GetSpan(DataSize);
s[0] = (byte)(i & 0xff);
pipe.Writer.Advance(10);
pipe.Writer.Commit();
// await pipe.Writer.FlushAsync();
}
pipe.Writer.Complete();
Console.WriteLine($"write done");
}),
Task.Run(async () =>
{
while(true)
{
var readResult = await pipe.Reader.ReadAsync();
Console.WriteLine($"read buffer length is {readResult.Buffer.Length}");
foreach(var b in readResult.Buffer)
{
Console.WriteLine($"{b.Span[0]}");
}
pipe.Reader.AdvanceTo(readResult.Buffer.End);
if(readResult.IsCompleted)
{
break;
}
}
pipe.Reader.Complete();
Console.WriteLine($"read done");
})).ConfigureAwait(false);
// PipeFactory is threadsafe?,and IPipe is not?
// using (var factory = new PipeFactory(BufferPool.Default))
// {
// var opt = new PipeOptions();
// var data = new byte[DataSize];
// var rbuf = new byte[DataSize];
// new Span<byte>(data).Fill(2);
// var pipe = factory.Create();
// Console.WriteLine($"{pipe.Reader.GetType()}, {pipe.Writer.GetType()}");
// long total = 0;
// for (int i = 0; i < MaxLoop; i++)
// {
// var wbuf = pipe.Writer.Alloc(DataSize);
// wbuf.Write(data);
// await wbuf.FlushAsync();
// // await pipe.Reader.ReadAsync(new ArraySegment<byte>(rbuf));
// var r = await pipe.Reader.ReadAsync();
// foreach (var buf in r.Buffer)
// {
// ArraySegment<byte> seg;
// if (buf.TryGetArray(out seg))
// {
// for (int j = seg.Offset; j < seg.Offset + seg.Count; j++)
// {
// total += seg[j];
// }
// }
// }
// pipe.Reader.Advance(r.Buffer.End);
// }
// Console.WriteLine($"{total}");
// }
}
static async Task MultiThreadPipeline()
{
const int LoopNum = 1000;
const int DataSize = 5000;
const int TaskNum = 5;
var opt = new PipeOptions(minimumSegmentSize: DataSize);
var pipe = new Pipe(opt);
try
{
}
finally
{
pipe.Reader.Complete();
}
// using (var pfac = new PipeFactory(BufferPool.Default))
// {
// var data = new byte[DataSize];
// new Span<byte>(data).Fill(2);
// var pipe = pfac.Create(opt);
// long total = 0;
// // IPipe is threadsafe?
// await Task.WhenAll(
// Task.WhenAll(Enumerable.Range(0, TaskNum)
// .Select(async (idx) =>
// {
// try
// {
// for (int i = 0; i < LoopNum / TaskNum; i++)
// {
// await Task.Yield();
// var wbuf = pipe.Writer.Alloc(DataSize);
// // wbuf.Ensure(DataSize);
// wbuf.Buffer.Span.Slice(0, DataSize).Fill((byte)(i & 0xff));
// wbuf.Advance(DataSize);
// // wbuf.Write(data);
// await wbuf.FlushAsync();
// }
// }
// catch (Exception e)
// {
// Console.WriteLine($"{e}");
// throw;
// }
// })).ContinueWith(t =>
// {
// Console.WriteLine($"write completed");
// pipe.Writer.Complete();
// }),
// Task.Run(async () =>
// {
// bool isFirstRead = true;
// // async pattern
// while (true)
// {
// var res = await pipe.Reader.ReadAsync();
// if (res.IsCancelled)
// {
// break;
// }
// foreach (var buf in res.Buffer)
// {
// for (int i = 0; i < buf.Length; i++)
// {
// total += buf.Span[i];
// }
// }
// Console.WriteLine($"buffer length is {res.Buffer.Length}");
// if (!res.Buffer.IsEmpty)
// {
// pipe.Reader.Advance(res.Buffer.End);
// }
// if (!isFirstRead && res.IsCompleted && res.Buffer.IsEmpty)
// {
// Console.WriteLine($"read done:{res.IsCompleted}, {res.Buffer.Length}");
// break;
// }
// isFirstRead = false;
// }
// // sync pattern
// // ReadResult res;
// // while (pipe.Reader.TryRead(out res))
// // {
// // foreach (var buf in res.Buffer)
// // {
// // for (int i = 0; i < buf.Length; i++)
// // {
// // total += buf.Span[i];
// // }
// // }
// // pipe.Reader.Advance(res.Buffer.End);
// // if (res.IsCompleted || res.IsCancelled)
// // {
// // Console.WriteLine($"read completed");
// // break;
// // }
// // }
// })
// );
// Console.WriteLine($"{total}");
// }
}
static void Main(string[] args)
{
// MultiThreadPipeline().Wait();
PipelineTest().Wait();
}
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="System.IO.Pipelines" Version="4.5.0-preview2-26216-01" />
</ItemGroup>
</Project>
<configuration>
<packageSources>
<add key="corefxlab" value="https://dotnet.myget.org/F/dotnet-core/api/v3/index.json"/>
</packageSources>
</configuration>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment