Last active
September 7, 2015 13:42
-
-
Save sixeyed/a72335b37681db69f9d4 to your computer and use it in GitHub Desktop.
Simple stub for emitting Event Hub-style tuples to Storm context, for testing .NET Storm applications. Usage here: https://blog.sixeyed.com/a-stub-event-hub-spout-for-testing-storm-net/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Microsoft.SCP; | |
using Newtonsoft.Json; | |
using System; | |
using System.Collections.Generic; | |
namespace My.Storm.App.Tests | |
{ | |
/// <summary> | |
/// Simple stub for emitting Event Hub-style tuples to Storm context. | |
/// </summary> | |
/// <remarks> | |
/// Serializes events using Json.NET but emits them using the CustomizedInteropJSONSerializer so | |
/// the tuples hva ehte smae format as those emitted by the Java Event Hub Spout. | |
/// </remarks> | |
/// <typeparam name="TEvent"></typeparam> | |
public class StubEventHubSpout<TEvent> : ISCPSpout | |
{ | |
private Context _context; | |
private Func<TEvent> _eventFactory; | |
private Dictionary<long, string> cachedTuples = new Dictionary<long, string>(); | |
private long _sequenceId; | |
public StubEventHubSpout(Context context, Func<TEvent> eventFactory, Dictionary<string, object> parms = null) | |
{ | |
_context = context; | |
_eventFactory = eventFactory; | |
Dictionary<string, List<Type>> outputSchema = new Dictionary<string, List<Type>>(); | |
outputSchema.Add(Constants.DEFAULT_STREAM_ID, new List<Type>() { typeof(string) }); | |
_context.DeclareComponentSchema(new ComponentStreamSchema(null, outputSchema)); | |
_context.DeclareCustomizedSerializer(new CustomizedInteropJSONSerializer()); | |
} | |
public static StubEventHubSpout<TEvent> Get(Context context, Func<TEvent> eventFactory, Dictionary<string, Object> parms = null) | |
{ | |
return new StubEventHubSpout<TEvent>(context, eventFactory, parms); | |
} | |
public void NextTuple(Dictionary<string, object> parms) | |
{ | |
var evt = _eventFactory(); | |
var json = JsonConvert.SerializeObject(evt); | |
cachedTuples.Add(_sequenceId, json); | |
_context.Emit(Constants.DEFAULT_STREAM_ID, new List<object>() { json }, _sequenceId); | |
Context.Logger.Info("Emitted event JSON. Sequence ID: {0}", _sequenceId); | |
_sequenceId++; | |
} | |
public void Ack(long sequenceId, Dictionary<string, object> parms) | |
{ | |
cachedTuples.Remove(sequenceId); | |
} | |
public void Fail(long sequenceId, Dictionary<string, object> parms) | |
{ | |
_context.Emit(Constants.DEFAULT_STREAM_ID, new List<object>() { cachedTuples[sequenceId] }, sequenceId); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment