Skip to content

Instantly share code, notes, and snippets.

@schmitch
Created October 15, 2021 13:09
Show Gist options
  • Save schmitch/798525b13ca14e9970288d1392412575 to your computer and use it in GitHub Desktop.
Save schmitch/798525b13ca14e9970288d1392412575 to your computer and use it in GitHub Desktop.
using System;
using System.Diagnostics;
using System.IO;
namespace ConsoleApp1
{
/// <summary>
/// Helper stream class to represent a slice of a larger stream to save memory when dealing with large streams
/// and remove the extra copy operations
/// This class is inspired from System.IO.Compression in dot net core. Reference implementation can be found here
/// https://github.com/dotnet/corefx/blob/d59f6e5a1bdabdd05317fd727efb59345e328b80/src/System.IO.Compression/src/System/IO/Compression/ZipCustomStreams.cs#L147
/// </summary>
internal class ReadOnlySubStream : Stream
{
private readonly long _startInSuperStream;
private long _positionInSuperStream;
private readonly long _endInSuperStream;
private readonly Stream _superStream;
private bool _canRead;
private bool _isDisposed;
public ReadOnlySubStream(Stream superStream, long startPosition, long maxLength)
{
this._startInSuperStream = startPosition;
this._positionInSuperStream = startPosition;
this._endInSuperStream = startPosition + maxLength;
this._superStream = superStream;
this._canRead = true;
this._isDisposed = false;
}
public override long Length
{
get
{
ThrowIfDisposed();
return _endInSuperStream - _startInSuperStream;
}
}
public override long Position
{
get
{
ThrowIfDisposed();
return _positionInSuperStream - _startInSuperStream;
}
set
{
ThrowIfDisposed();
throw new NotSupportedException("seek not support");
}
}
public override bool CanRead => _superStream.CanRead && _canRead;
public override bool CanSeek => false;
public override bool CanWrite => false;
private void ThrowIfDisposed()
{
if (_isDisposed)
throw new ObjectDisposedException(GetType().ToString(), nameof(this._superStream));
}
private void ThrowIfCantRead()
{
if (!CanRead)
throw new NotSupportedException("read not support");
}
public override int Read(byte[] buffer, int offset, int count)
{
// parameter validation sent to _superStream.Read
int origCount = count;
ThrowIfDisposed();
ThrowIfCantRead();
if (_superStream.Position != _positionInSuperStream)
_superStream.Seek(_positionInSuperStream, SeekOrigin.Begin);
if (_positionInSuperStream + count > _endInSuperStream)
count = (int)(_endInSuperStream - _positionInSuperStream);
Debug.Assert(count >= 0);
Debug.Assert(count <= origCount);
int ret = _superStream.Read(buffer, offset, count);
_positionInSuperStream += ret;
return ret;
}
public override long Seek(long offset, SeekOrigin origin)
{
ThrowIfDisposed();
throw new NotSupportedException("seek not support");
}
public override void SetLength(long value)
{
ThrowIfDisposed();
throw new NotSupportedException("seek and write not support");
}
public override void Write(byte[] buffer, int offset, int count)
{
ThrowIfDisposed();
throw new NotSupportedException("write not support");
}
public override void Flush()
{
ThrowIfDisposed();
throw new NotSupportedException("write not support");
}
// Close the stream for reading. Note that this does NOT close the superStream (since
// the subStream is just 'a chunk' of the super-stream
protected override void Dispose(bool disposing)
{
if (disposing && !_isDisposed)
{
_canRead = false;
_isDisposed = true;
}
base.Dispose(disposing);
}
}
}
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using System.Xml.Serialization;
namespace ConsoleApp1
{
public class InitiateMultipartUploadResult
{
public string Bucket { get; set; }
public string Key { get; set; }
public string UploadId { get; set; }
}
public class ProgramSeaweedFs
{
private const int DefaultMaxSliceSize = 5 * 1024 * 1024;
public static async Task Main(string[] args)
{
var client = new HttpClient();
// await CreateBucket(client);
var obj = await InitiateMultipartUploadResult(client);
await using var _uploadStream = File.OpenRead("/Users/schmitch/Downloads/weedfs/myfile.txt");
var sliceRequests = GetUploadSliceRequests(_uploadStream);
int partNumber = 0;
foreach (var (RangeBegin, RangeLength, RangeTotal) in sliceRequests)
{
var sw = new Stopwatch();
sw.Start();
_uploadStream.Seek(RangeBegin, SeekOrigin.Begin);
await using var requestBodyStream = new ReadOnlySubStream(
_uploadStream,
RangeBegin,
RangeLength);
partNumber++;
Console.WriteLine($"Uploading Part: {partNumber}, {RangeBegin}-{RangeLength}");
await UploadPart(client, obj.Key, obj.UploadId, partNumber, requestBodyStream);
sw.Stop();
Console.WriteLine($"Uploading Part Finished: {partNumber}, {sw.ElapsedMilliseconds}ms");
// progress?.Report(RangeBegin); //report the progress of upload
}
await CompleteMultipartUpload(client, obj.Key, obj.UploadId);
}
private static async Task UploadPart(
HttpClient client,
string key,
string uploadId,
int partNumber,
Stream stream)
{
using var request = new HttpRequestMessage(HttpMethod.Put,
$"http://localhost:8333/hase/{key}?partNumber={partNumber}&uploadId={uploadId}")
{
Content = new StreamContent(stream),
};
using var response = await client.SendAsync(request);
response.EnsureSuccessStatusCode();
}
private static async Task CompleteMultipartUpload(HttpClient client, string key, string uploadId)
{
using var request =
new HttpRequestMessage(HttpMethod.Post, $"http://localhost:8333/hase/{key}?uploadId={uploadId}")
{
Content = new StringContent(
@"<?xml version=""1.0"" encoding=""UTF-8""?><CompleteMultipartUpload xmlns=""http://s3.amazonaws.com/doc/2006-03-01/""></CompleteMultipartUpload>"),
};
using var response = await client.SendAsync(request);
Console.WriteLine($"Text: {await response.Content.ReadAsStringAsync()}");
}
private static async Task<InitiateMultipartUploadResult> InitiateMultipartUploadResult(HttpClient client)
{
using var request = new HttpRequestMessage(HttpMethod.Post, $"http://localhost:8333/hase/hase.txt?uploads")
{
Content = new MultipartFormDataContent(),
};
using var response = await client.SendAsync(request);
var responseStream = await response.Content.ReadAsStreamAsync();
return (InitiateMultipartUploadResult)new XmlSerializer(
typeof(InitiateMultipartUploadResult),
"http://s3.amazonaws.com/doc/2006-03-01/").Deserialize(responseStream);
}
private static async Task CreateBucket(HttpClient client)
{
using var request = new HttpRequestMessage(HttpMethod.Put, $"http://localhost:8333/")
{
Content = new StringContent(
@"<CreateBucketConfiguration xmlns=""http://s3.amazonaws.com/doc/2006-03-01/"">
<LocationConstraint>string</LocationConstraint>
</CreateBucketConfiguration>", Encoding.UTF8, "text/xml"),
};
using var response = await client.SendAsync(request);
var responseText = await response.Content.ReadAsStringAsync();
Console.WriteLine($"ResponseText: {responseText}");
}
private static IEnumerable<(long, long, long)> GetUploadSliceRequests(Stream stream)
{
var currentRangeBegins = 0L;
var item2 = stream.Length;
while (currentRangeBegins <= item2)
{
var nextSliceSize = NextSliceSize(currentRangeBegins, item2);
// var uploadRequest = new UploadSliceRequest(
// Session.Key,
// _client,
// currentRangeBegins,
// currentRangeBegins + nextSliceSize - 1,
// TotalUploadLength);
yield return (currentRangeBegins, currentRangeBegins + nextSliceSize - 1, item2);
currentRangeBegins += nextSliceSize;
}
}
private static long NextSliceSize(long rangeBegin, long rangeEnd)
{
var sizeBasedOnRange = rangeEnd - rangeBegin + 1;
return sizeBasedOnRange > DefaultMaxSliceSize ? DefaultMaxSliceSize : sizeBasedOnRange;
}
private List<Tuple<long, long>> GetRangesRemaining(Stream uploadStream)
{
// nextExpectedRanges: https://dev.onedrive.com/items/upload_large_files.htm
// Sample: ["12345-55232","77829-99375"]
// Also, second number in range can be blank, which means 'until the end'
var newRangesRemaining = new List<Tuple<long, long>>();
foreach (var range in new[] { "0-" })
{
var rangeSpecifiers = range.Split('-');
newRangesRemaining.Add(new Tuple<long, long>(long.Parse(rangeSpecifiers[0]),
string.IsNullOrEmpty(rangeSpecifiers[1])
? uploadStream.Length - 1
: long.Parse(rangeSpecifiers[1])));
}
return newRangesRemaining;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment