Skip to content

Instantly share code, notes, and snippets.

@amedina
Created March 27, 2014 21:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save amedina/9819834 to your computer and use it in GitHub Desktop.
Save amedina/9819834 to your computer and use it in GitHub Desktop.
QueryStatsBolt
package com.twitter.ads.adsquerylistener.storm;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.NotThreadSafe;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import com.google.inject.name.Named;
import net.lag.configgy.Config;
import org.slf4j.Logger;
import com.twitter.ads.adsquerylistener.DeployMode;
import com.twitter.ads.adsquerylistener.StatsConstants;
import com.twitter.ads.adsquerylistener.config.ConfigConstants;
import com.twitter.ads.adsquerylistener.humaneval.HumanEvalDispatcher;
import com.twitter.ads.adsquerylistener.topqueries.QueryStats;
import com.twitter.ads.adsquerylistener.topqueries.TrendingQueryDecider;
import com.twitter.ads.common.guice.Conf;
import com.twitter.ads.common.logging.LoggerFactory;
import com.twitter.ads.human_eval.HumanEvalException;
import backtype.storm.metric.api.AssignableMetric;
import backtype.storm.metric.api.CountMetric;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
/**
* Handles logic to select queries to be sent for human eval.
*/
@NotThreadSafe
public class QueryStatsBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.get();
private static final Object WORKER_LOCK = new Object();
private static HumanEvalDispatcher humanEvalDispatcher = null;
private static Injector injector;
private OutputCollector outputCollector;
private Cache<String, QueryStats> queryStatsCache;
private TrendingQueryDecider trendingQueryDecider;
private long halfLifeSeconds = 0L;
private String configString;
private DeployMode deployMode;
// Stats reporting-related members
private CountMetric numQueriesStats;
private AssignableMetric queryStatsCacheSizeStats;
/**
* Constructor to build a top queries bolt.
*/
@Inject
public QueryStatsBolt(
@Conf(ConfigConstants.QUERY_CACHE_NUM_KEYS) int cacheNumKeys,
@Conf(ConfigConstants.QUERY_CACHE_EXPIRATION_MINS) int cacheExpirationMinutes,
@Conf(ConfigConstants.QUERY_CACHE_HALF_LIFE_SECS) long halfLifeSeconds,
final TrendingQueryDecider trendingQueryDecider,
DeployMode deployMode,
@Named(ConfigConstants.CONF_STRING) String configString) {
queryStatsCache = CacheBuilder.newBuilder()
.maximumSize(cacheNumKeys)
.expireAfterAccess(cacheExpirationMinutes, TimeUnit.MINUTES)
.build();
this.configString = configString;
this.trendingQueryDecider = trendingQueryDecider;
this.halfLifeSeconds = halfLifeSeconds;
this.deployMode = deployMode;
}
/**
* Query stats bolt for testing purpose.
*/
@VisibleForTesting
public QueryStatsBolt(int cacheNumKeys, int cacheExpiration, long halfLifeSecs,
TrendingQueryDecider trendingQueryDecider,
HumanEvalDispatcher dispatcher, OutputCollector collector) {
queryStatsCache = CacheBuilder.newBuilder()
.maximumSize(cacheNumKeys)
.expireAfterAccess(cacheExpiration, TimeUnit.MINUTES)
.build();
this.trendingQueryDecider = trendingQueryDecider;
this.halfLifeSeconds = halfLifeSecs;
this.humanEvalDispatcher = dispatcher;
this.outputCollector = collector;
}
@VisibleForTesting
protected void initStats(TopologyContext topologyContext) {
this.numQueriesStats = (CountMetric) topologyContext.registerMetric(
StatsConstants.NUM_QUERIES,
new CountMetric(),
StatsConstants.SECONDS_TO_POLL);
this.queryStatsCacheSizeStats = (AssignableMetric) topologyContext.registerMetric(
StatsConstants.QUERY_BOLT_STATS_CACHE_SIZE,
new AssignableMetric(queryStatsCache.size()),
StatsConstants.SECONDS_TO_POLL);
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector) {
this.outputCollector = collector;
initStats(topologyContext);
// We want to build objects and their dependency tree on the storm cluster. If we
// don't delay this guice injection, storm will require us to serialize every class
// that is being injected.
synchronized (WORKER_LOCK) {
if (null == injector) {
injector = Guice.createInjector(
deployMode.getGuiceModule(Config.fromString(configString)));
humanEvalDispatcher = injector.getInstance(HumanEvalDispatcher.class);
humanEvalDispatcher.initialize(topologyContext);
}
}
}
// Note this is not thread-safe: If 2 threads provide the same query,
// this code may behave incorrectly.
@Override
public void execute(Tuple tuple) {
String query = (String) tuple.getValue(0);
int auctionDepth = ((Integer) tuple.getValue(1)).intValue();
long currentTimeInMillis = System.currentTimeMillis();
// Increment stats for number of queries observed and query stats cache size.
numQueriesStats.incr();
queryStatsCacheSizeStats.setValue(queryStatsCache.size());
// Insert into cache if not present
QueryStats queryStats = queryStatsCache.getIfPresent(query);
if (null == queryStats) {
queryStats = new QueryStats();
}
queryStats.updateStats(1, auctionDepth, currentTimeInMillis, halfLifeSeconds);
queryStatsCache.put(query, queryStats);
if (trendingQueryDecider.isTrendingQuery(queryStats)) {
try {
humanEvalDispatcher.dispatchQuery(query, queryStats.getRequests());
} catch (HumanEvalException e) {
LOG.warn("Error when dispatching query.");
}
outputCollector.emit(ImmutableList.<Object>of(query));
}
outputCollector.ack(tuple);
}
public Optional<QueryStats> getQueryStatsForQuery(String query) {
return Optional.fromNullable(queryStatsCache.getIfPresent(query));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields(StormConstants.QUERY_FIELD));
}
public long getQueryStatsCacheSize() {
return queryStatsCache.size();
}
}
@markdav
Copy link

markdav commented May 20, 2015

Hey cool stuff - do you have an example of the topology that calls this and builds the modules? Or the DeployMode class? I am trying to do something similar, but creating the modules in the prepare and seeing some issues when deployed to a cluster.

@bdine
Copy link

bdine commented Feb 3, 2017

Hey, this seems to be nice
markdav have you found how to implement the topology and the guice module creation in the prepare ?

Regards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment