Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple stub for emitting Event Hub-style tuples to Storm context, for testing .NET Storm applications. Usage here: https://blog.sixeyed.com/a-stub-event-hub-spout-for-testing-storm-net/
using Microsoft.SCP;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
namespace My.Storm.App.Tests
{
/// <summary>
/// Simple stub for emitting Event Hub-style tuples to Storm context.
/// </summary>
/// <remarks>
/// Serializes events using Json.NET but emits them using the CustomizedInteropJSONSerializer so
/// the tuples hva ehte smae format as those emitted by the Java Event Hub Spout.
/// </remarks>
/// <typeparam name="TEvent"></typeparam>
public class StubEventHubSpout<TEvent> : ISCPSpout
{
private Context _context;
private Func<TEvent> _eventFactory;
private Dictionary<long, string> cachedTuples = new Dictionary<long, string>();
private long _sequenceId;
public StubEventHubSpout(Context context, Func<TEvent> eventFactory, Dictionary<string, object> parms = null)
{
_context = context;
_eventFactory = eventFactory;
Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>();
outputSchema.Add(Constants.DEFAULT_STREAM_ID, new List<Type>() { typeof(string) });
_context.DeclareComponentSchema(new ComponentStreamSchema(null, outputSchema));
_context.DeclareCustomizedSerializer(new CustomizedInteropJSONSerializer());
}
public static StubEventHubSpout<TEvent> Get(Context context, Func<TEvent> eventFactory, Dictionary<string, Object> parms = null)
{
return new StubEventHubSpout<TEvent>(context, eventFactory, parms);
}
public void NextTuple(Dictionary<string, object> parms)
{
var evt = _eventFactory();
var json = JsonConvert.SerializeObject(evt);
cachedTuples.Add(_sequenceId, json);
_context.Emit(Constants.DEFAULT_STREAM_ID, new List<object>() { json }, _sequenceId);
Context.Logger.Info("Emitted event JSON. Sequence ID: {0}", _sequenceId);
_sequenceId++;
}
public void Ack(long sequenceId, Dictionary<string, object> parms)
{
cachedTuples.Remove(sequenceId);
}
public void Fail(long sequenceId, Dictionary<string, object> parms)
{
_context.Emit(Constants.DEFAULT_STREAM_ID, new List<object>() { cachedTuples[sequenceId] }, sequenceId);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment