Skip to content

Instantly share code, notes, and snippets.

@Rast1234
Last active January 6, 2019 23:06
Show Gist options
  • Save Rast1234/09ca717457526fb49cc53cb895b19afd to your computer and use it in GitHub Desktop.
Save Rast1234/09ca717457526fb49cc53cb895b19afd to your computer and use it in GitHub Desktop.
async parallel downloading
public async Task ProcessAudio(long id, Storage storage, CancellationToken token, ILog log)
{
var allPlaylists = await GetAllPlaylists(id, token, log);
var throttler = new Throttler();
foreach (var playlist in allPlaylists)
{
var playlistStorage = storage.Descend(playlist.Name, true);
var downloadTasks = playlist.Tracks
.Select(async track =>
{
var trackName = string.Join(" - ", new[]{ track.Artist, track.Name }.Where(x => !string.IsNullOrEmpty(x)));
if (track.Url != null)
{
var client = new RestClient($"{track.Url.Scheme}://{track.Url.Authority}");
var response = await client.ExecuteGetTaskAsync(new RestRequest(track.Url.PathAndQuery), token);
var file = playlistStorage.GetFile($"{trackName}.mp3").FullName;
log.Debug($"Saving {file}");
await File.WriteAllBytesAsync(file, response.RawBytes, token);
}
else
{
playlistStorage.GetFile($"{trackName}.mp3.deleted");
}
if (track.Lyrics != null)
{
var lyricsFile = playlistStorage.GetFile($"{trackName}.txt").FullName;
await File.WriteAllTextAsync(lyricsFile, track.Lyrics, token);
}
})
.ToArray();
await throttler.ProcessWithThrottling(downloadTasks, 3, token);
}
}
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace VOffline.Services
{
public class Throttler
{
public async Task<IReadOnlyList<T>> ProcessWithThrottling<T>(Task<T>[] tasks, int limit, CancellationToken token)
{
using (var semaphore = new SemaphoreSlim(limit))
{
var newTasks = tasks.Select(async t =>
{
await semaphore.WaitAsync(token);
try
{
var data = await t;
return data;
}
finally
{
semaphore.Release();
}
});
return await Task.WhenAll(newTasks);
}
}
public async Task ProcessWithThrottling(Task[] tasks, int limit, CancellationToken token)
{
using (var semaphore = new SemaphoreSlim(limit))
{
var newTasks = tasks.Select(async t =>
{
await semaphore.WaitAsync(token);
try
{
await t;
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(newTasks);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment