// rankedHashtag subscription definition AddField(new FieldType { Name = "rankedHashtag", Description = "Hashtags are published with their new rank", Type = typeof(RankedHashtagTypeQl), Arguments = new QueryArguments(new QueryArgument<FloatGraphType>() {Name = "sampleIntervalSec", DefaultValue = 0, Description = "The sampling interval expressed in seconds."}), Resolver = new FuncFieldResolver<RankedHashtag>(ResolveHashtag), StreamResolver = new SourceStreamResolver<RankedHashtag>(GetHashtagAddedResolver) }); private RankedHashtag ResolveHashtag(IResolveFieldContext context) { var message = context.Source as RankedHashtag; return message; } private IObservable<RankedHashtag> GetHashtagAddedResolver(IResolveFieldContext context) { var samplingIntervalSeconds = context.GetArgument<double>("sampleIntervalSec"); return _tweetHashtagService.GetRankedHashtagObservable(samplingIntervalSeconds); } // In another class public IObservable<RankedHashtag> GetRankedHashtagObservable(double samplingIntervalSeconds) { return _rankedHashtagStream.AsObservable().Sample(TimeSpan.FromSeconds(samplingIntervalSeconds)); }