Skip to content

Instantly share code, notes, and snippets.

@abdullin
Created March 4, 2013 06:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save abdullin/5080455 to your computer and use it in GitHub Desktop.
Save abdullin/5080455 to your computer and use it in GitHub Desktop.
Samples of commands in test client used for event store manipulation (fetching it to local storage and searching). They mostly leverage Lokad.CQRS framework and test client interactive shell (see, for example, beingtheworst.com source: https://github.com/beingtheworst/btw-gtd)
public class FetchEventStoreProcessor : ICommandProcessor
{
public string Key { get { return "FETCH"; } }
public string Usage { get { return "FETCH <remoteConfig> [<folderNameInLokadData>]"; } }
public bool Execute(CommandProcessorContext context, CancellationToken token, string[] args)
{
if (args == null || args.Length == 0)
{
context.Log.Error("No remoteConfig provided.");
return false;
}
var config = args[0];
var localSubPath = args.Length > 1 ? args[1] : "s2-store";
var localPath = Path.Combine(@"C:\LokadData", localSubPath, Setup.TapesContainer);
if (!Directory.Exists(localPath))
{
context.Log.Error(string.Format("ERROR: Directory \"{0}\" not found.", localPath));
return false;
}
IBlobReader blobReader;
try
{
const StringComparison icic = StringComparison.InvariantCultureIgnoreCase;
if (config.StartsWith("DefaultEndpointsProtocol=", icic)
|| config.StartsWith("UseDevelopmentStorage=true", icic))
{
blobReader = new AzureBlobReader(config);
}
else
{
var path = Path.Combine(Path.GetFullPath(config), Setup.TapesContainer);
blobReader = new FileBlobReader(path);
}
}
catch (Exception e)
{
context.Log.Error("ERROR: {0}", e.Message);
return false;
}
context.Log.Info("Sync \"{0}\" to local folder \"{1}\".", blobReader.GetConfig, localPath);
if (localPath.Equals(blobReader.GetConfig, StringComparison.InvariantCultureIgnoreCase))
{
context.Log.Error("ERROR: {0}", "Can not sync to itself.");
return false;
}
try
{
Sync(blobReader, localPath, context.Log);
}
catch (Exception e)
{
context.Log.Error("ERROR: {0}", e.Message);
var ae = e as AggregateException;
if (ae != null)
foreach (var ie in ae.InnerExceptions)
context.Log.Error("ERROR: {0}", ie.Message);
return false;
}
return true;
}
void Sync(IBlobReader blobReader, string localPath, ILogger log)
{
log.Info("Getting blob list");
// Get all names of local files. Find latest
var localNames = Directory.GetFiles(localPath).Select(Path.GetFileName).OrderBy(x => x).ToArray();
var remoteNames = blobReader.ListBlobs()
.AsParallel()
.OrderBy(x => x).ToArray();
// Check that local and remote are same streams
var chunk = 0;
for (; chunk < remoteNames.Length; ++chunk)
{
if (chunk >= localNames.Length)
break;
if (remoteNames[chunk] != localNames[chunk])
throw new InvalidOperationException(string.Format("Set of blobs differ: local=\"{0}\", remote=\"{1}\"", localNames[chunk], remoteNames[chunk]));
}
if (chunk < localNames.Length)
throw new InvalidOperationException(string.Format("Set of blobs differ: local=\"{0}\", remote not found", localNames[chunk]));
// Synchronize last local chunk
if (chunk > 0)
{
log.Info("Synchronizing last local chunk");
SyncLastChunk(blobReader, localPath, localNames[chunk - 1]);
}
if (chunk >= remoteNames.Length)
return;
// Copy missing chunks
log.Info("Copying missing chunks");
Parallel.For(chunk, remoteNames.Length, chIndex =>
{
var chunkName = remoteNames[chIndex];
var bytes = ReadWithRetry(blobReader, chunkName);
File.WriteAllBytes(Path.Combine(localPath, chunkName), bytes);
});
}
void SyncLastChunk(IBlobReader blobReader, string localPath, string blobName)
{
StorageFrameDecoded[] remoteFrames;
var remoteBytes = blobReader.ReadAllBytes(blobName);
using (var stream = new MemoryStream(remoteBytes))
{
remoteFrames = ReadAllFrames(stream);
}
StorageFrameDecoded[] localFrames;
var localChunkName = Path.Combine(localPath, blobName);
using (var stream = File.OpenRead(localChunkName))
{
localFrames = ReadAllFrames(stream);
}
// Compare local and remote from beginning
var equalPairs = remoteFrames
.Zip(localFrames, (r, l) => new {r, l})
.TakeWhile(p => p.r.IsEmpty == p.l.IsEmpty
|| (p.r.Name == p.l.Name && p.r.Stamp == p.l.Stamp && p.r.Bytes.SequenceEqual(p.l.Bytes)))
.Count();
// Exit if streams are equal
if (equalPairs == remoteFrames.Length && equalPairs == localFrames.Length)
return;
if (equalPairs < remoteFrames.Length && equalPairs < localFrames.Length)
throw new InvalidOperationException(string.Format(
"Remote and local have different branches. Common parent is \"{0}\"",
remoteFrames[equalPairs - 1].Name));
if (equalPairs < localFrames.Length)
throw new InvalidOperationException(string.Format(
"Local has more events that remote. Last remote is \"{0}\"",
remoteFrames[equalPairs - 1].Name));
File.Delete(localChunkName);
File.WriteAllBytes(localChunkName, remoteBytes);
}
static StorageFrameDecoded[] ReadAllFrames(Stream stream)
{
var list = new List<StorageFrameDecoded>();
StorageFrameDecoded result;
while (StorageFramesEvil.TryReadFrame(stream, out result))
{
list.Add(result);
}
return list.ToArray();
}
static byte[] ReadWithRetry(IBlobReader blobReader, string name)
{
var tryCount = 0;
while (true)
{
try
{
tryCount++;
return blobReader.ReadAllBytes(name);
}
catch (Exception)
{
if (tryCount == 4)
throw;
Thread.Sleep(500);
}
}
}
}
public class PrintMessagesProcessor : ICommandProcessor
{
public string Key { get { return "PRINT"; } }
public string Usage { get { return "PRINT <folderNameInLokadData> (<regexp> | '<regexp>' | \"<regexp>\") <regexp2> ..."; } }
public bool Execute(CommandProcessorContext context, CancellationToken token, string[] args)
{
if (args == null || args.Length < 2)
{
context.Log.Error("No local folder provided.");
return false;
}
var localSubPath = args.Length > 1 ? args[0] : "s2-store";
var localPath = Path.Combine(@"C:\LokadData", localSubPath, Setup.TapesContainer);
if (!Directory.Exists(localPath))
{
context.Log.Error(string.Format("ERROR: Directory \"{0}\" not found.", localPath));
return false;
}
var patterns = args.Skip(1).ToArray();
// unquote
for (var i = 0; i < patterns.Length; i++)
{
if (patterns[i].Length > 2 && (
(patterns[i].StartsWith("\'") && patterns[i].EndsWith("\'")) ||
(patterns[i].StartsWith("\"") && patterns[i].EndsWith("\""))))
patterns[i] = patterns[i].Substring(1, patterns[i].Length - 2);
}
Regex[] res;
try
{
res = patterns.Select(pattern => new Regex(pattern)).ToArray();
}
catch (ArgumentException e)
{
context.Log.Error(string.Format("ERROR: Expression is not valid. {0}", e));
return false;
}
using (var store = new FileAppendOnlyStore(new DirectoryInfo(localPath)))
{
store.Initialize();
var messageStore = new MessageStore(store, Contracts.CreateStreamer().MessageSerializer);
var origDateHandler = JsConfig.DateHandler;
JsConfig.DateHandler = JsonDateHandler.ISO8601;
foreach (var record in messageStore.EnumerateAllItems(0, int.MaxValue))
{
foreach (var item in record.Items)
{
var sb = new StringBuilder();
var typeName = item.GetType().Name;
var json = JsonSerializer.SerializeToString(item);
sb.AppendFormat("{0} {1}", typeName, json);
var message = sb.ToString();
var allMatch = true;
foreach (var re in res)
{
if (re.IsMatch(message))
continue;
allMatch = false;
break;
}
if (allMatch)
{
sb.Clear();
sb.AppendLine(typeName);
sb.AppendLine(JsvFormatter.Format(json));
context.Log.Info(sb.ToString());
}
}
}
JsConfig.DateHandler = origDateHandler;
}
return true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment