Skip to content

Instantly share code, notes, and snippets.

@codyaray
Created May 22, 2014 23:52
Show Gist options
  • Save codyaray/ac2eceb3ff92fa0eaf6b to your computer and use it in GitHub Desktop.
Save codyaray/ac2eceb3ff92fa0eaf6b to your computer and use it in GitHub Desktop.
The Big-4 Rules of Storm Tuning
// Copyright 2014 BrightTag, Inc. All rights reserved.
package com.brighttag.storm.utils;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Computes the parallelism for a particular topology and machine configuration.
*
* @author codyaray
* @since 4/21/2014
*/
public class StormParallelism {
private final int numberHosts;
private final int numberWorkersPerHost;
private final int numberWorkers;
private final int numberKafkaPartitions;
private final int numberKafkaPartitionsPerSpout;
private final int spoutParallelism;
private final int numberExecutorsPerCore;
private final int numberCoresPerHost;
private final int numberExecutorsPerHost;
private final int numberExecutorsPerWorker;
private final int transformParallelism;
private final int persistenceParallelism;
private StormParallelism(Builder builder) {
// Number of workers should be a multiple of number of machines
this.numberHosts = builder.numberHosts;
this.numberWorkersPerHost = builder.numberWorkersPerHost;
this.numberWorkers = numberHosts * numberWorkersPerHost;
// Number of partitions should be a multiple of spout parallelism
this.numberKafkaPartitions = builder.numberKafkaPartitions;
this.numberKafkaPartitionsPerSpout = builder.numberKafkaPartitionsPerSpout;
this.spoutParallelism = numberKafkaPartitions / numberKafkaPartitionsPerSpout;
// Parallelism should be a multiple of number of workers
this.numberExecutorsPerCore = builder.numberExecutorsPerCore;
this.numberCoresPerHost = builder.numberCoresPerHost;
this.numberExecutorsPerHost = numberExecutorsPerCore * numberCoresPerHost;
this.numberExecutorsPerWorker = numberExecutorsPerHost / numberWorkersPerHost;
this.transformParallelism = numberExecutorsPerWorker * numberWorkers;
// Reduce parallelism in persistence for best cache efficiency and lowest bulk-request overhead
this.persistenceParallelism = numberWorkers;
}
public int getNumberWorkers() {
return numberWorkers;
}
public int forSpoutLayer() {
return spoutParallelism;
}
public int forTransformLayer() {
return transformParallelism;
}
public int forPersistenceLayer() {
return persistenceParallelism;
}
public static Builder builder() {
return new Builder();
}
/**
* Builder of {@link StormParallelism} configurations.
*/
public static class Builder {
private int numberHosts;
private int numberWorkersPerHost = 1;
private int numberKafkaPartitions;
private int numberKafkaPartitionsPerSpout = 1;
private int numberCoresPerHost;
private int numberExecutorsPerCore = 1;
public Builder numberHosts(int numberHosts) {
this.numberHosts = numberHosts;
return this;
}
public Builder numberWorkersPerHost(int numberWorkersPerHost) {
this.numberWorkersPerHost = numberWorkersPerHost;
return this;
}
public Builder numberKafkaPartitions(int numberKafkaPartitions) {
this.numberKafkaPartitions = numberKafkaPartitions;
return this;
}
public Builder numberKafkaPartitionsPerSpout(int numberKafkaPartitionsPerSpout) {
this.numberKafkaPartitionsPerSpout = numberKafkaPartitionsPerSpout;
return this;
}
public Builder numberCoresPerHost(int numberCoresPerHost) {
this.numberCoresPerHost = numberCoresPerHost;
return this;
}
public Builder numberExecutorsPerCore(int numberExecutorsPerCore) {
this.numberExecutorsPerCore = numberExecutorsPerCore;
return this;
}
public StormParallelism build() {
checkArgument(numberHosts > 0);
checkArgument(numberKafkaPartitions > 0);
checkArgument(numberCoresPerHost > 0);
return new StormParallelism(this);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment