// rankedHashtag subscription definition
AddField(new FieldType
{
    Name = "rankedHashtag",
    Description = "Hashtags are published with their new rank",
    Type = typeof(RankedHashtagTypeQl),
    Arguments = new QueryArguments(new QueryArgument<FloatGraphType>() {Name = "sampleIntervalSec", DefaultValue = 0, Description = "The sampling interval expressed in seconds."}),
    Resolver = new FuncFieldResolver<RankedHashtag>(ResolveHashtag),
    StreamResolver = new SourceStreamResolver<RankedHashtag>(GetHashtagAddedResolver)
});

private RankedHashtag ResolveHashtag(IResolveFieldContext context)
{
    var message = context.Source as RankedHashtag;
    return message;
}

private IObservable<RankedHashtag> GetHashtagAddedResolver(IResolveFieldContext context)
{
    var samplingIntervalSeconds = context.GetArgument<double>("sampleIntervalSec");
    return _tweetHashtagService.GetRankedHashtagObservable(samplingIntervalSeconds);
}

// In another class
public IObservable<RankedHashtag> GetRankedHashtagObservable(double samplingIntervalSeconds)
{
    return _rankedHashtagStream.AsObservable().Sample(TimeSpan.FromSeconds(samplingIntervalSeconds));
}