Skip to content

Instantly share code, notes, and snippets.

@dj-nitehawk
Last active July 9, 2021 06:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dj-nitehawk/dc87f368746cb8666b18cc00dd5ecf88 to your computer and use it in GitHub Desktop.
Save dj-nitehawk/dc87f368746cb8666b18cc00dd5ecf88 to your computer and use it in GitHub Desktop.
full watcher example
using MongoDB.Bson;
using MongoDB.Entities;
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
namespace TestApplication
{
public static class Program
{
public class Book : Entity
{
public string Title { get; set; }
}
private static async Task Main()
{
await DB.InitAsync("test", "localhost");
var watcher = DB.Watcher<Book>("book-watcher");
var cts = new CancellationTokenSource();
watcher.OnChangesAsync += async books =>
{
Console.WriteLine(">>> received a batch!");
foreach (var book in books)
{
Console.WriteLine("--> " + book.Title);
await Task.Delay(500);
if (book.Title == "book 2")//stop the watcher after 2nd book is received.
{
//this will cause the OnStop event to fire enabling us to store the resume token there.
cts.Cancel();
//NOTE: OnStop is fired only at the end of the current batch!!!
// so it will resume starting at book 6 when app restarts,
// because our batchSize is 5.
}
}
};
watcher.OnError += async exception =>
{
Console.WriteLine("error: " + exception.Message);
if (watcher.CanRestart)
{
watcher.ReStart();
}
else
{
Console.WriteLine("your watch has ended (due to an error)");
await File.WriteAllTextAsync("resume.token", watcher.ResumeToken.ToJson());
}
};
watcher.OnStop += async () =>
{
if (watcher.CanRestart)
{
watcher.ReStart();
}
else
{
Console.WriteLine("your watch has ended (due to stopping)");
Console.WriteLine("restart the app to resume from where it stopped...");
await File.WriteAllTextAsync("resume.token", watcher.ResumeToken.ToJson());
}
};
if (File.Exists("resume.token"))
{
var resumeToken = BsonDocument.Parse(File.ReadAllText("resume.token"));
watcher.StartWithToken(
resumeToken: resumeToken,
eventTypes: EventType.Created,
batchSize: 5,
cancellation: cts.Token);
Console.WriteLine("watcher resumed with last saved token...");
}
else
{
watcher.Start(
eventTypes: EventType.Created,
batchSize: 5,
cancellation: cts.Token);
Console.WriteLine("watcher started...");
for (int i = 1; i <= 20; i++)
{
await new Book { Title = $"book {i}" }.SaveAsync();
}
}
await Task.Delay(10000);
Console.WriteLine("press a key to exit!");
Console.ReadLine();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment