Last active
March 21, 2017 07:46
-
-
Save rdhabalia/475167af627aba7ce24dc3488c10242a to your computer and use it in GitHub Desktop.
load-test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
ZK-latency log at broker: | |
[pulsar-zk-session-watcher-10-1] WARN c.y.p.z.ZooKeeperSessionWatcher - zoo keeper disconnected, waiting to reconnect, time remaining = 25 seconds | |
ZK nc stat | |
(1) before test | |
zk_version 3.4.6--1, built on 08/05/2015 23:17 GMT | |
zk_avg_latency 11 | |
zk_max_latency 3720 | |
zk_min_latency 0 | |
zk_packets_received 30146197 | |
zk_packets_sent 30222025 | |
zk_num_alive_connections 3 | |
zk_outstanding_requests 0 | |
zk_server_state follower | |
(2) after test | |
stat = zk_version 3.4.6--1, built on 08/05/2015 23:17 GMT | |
zk_avg_latency 31 | |
zk_max_latency 3720 | |
zk_min_latency 0 | |
zk_packets_received 52797270 | |
zk_packets_sent 53600852 | |
zk_num_alive_connections 5 | |
zk_outstanding_requests 53 | |
zk_server_state follower | |
Sample code: | |
package com.yahoo.cloud.messaging.load.test; | |
import java.io.FileInputStream; | |
import java.net.URL; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.apache.commons.lang3.StringUtils; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.beust.jcommander.JCommander; | |
import com.beust.jcommander.Parameter; | |
import com.beust.jcommander.ParameterException; | |
import com.google.common.collect.Lists; | |
import com.yahoo.pulsar.client.admin.PulsarAdmin; | |
import com.yahoo.pulsar.client.admin.PulsarAdminException; | |
import com.yahoo.pulsar.client.api.ClientConfiguration; | |
import com.yahoo.pulsar.client.api.ConsumerConfiguration; | |
import com.yahoo.pulsar.client.api.PulsarClient; | |
import com.yahoo.pulsar.client.api.PulsarClientException; | |
import com.yahoo.pulsar.client.api.SubscriptionType; | |
import com.yahoo.pulsar.client.impl.PulsarClientImpl; | |
import com.yahoo.pulsar.common.policies.data.BundlesData; | |
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus; | |
import io.netty.util.concurrent.DefaultThreadFactory; | |
public class BrokerBunldeUnloader { | |
private PulsarAdmin admin; | |
private final ScheduledExecutorService unloaderScheduler; | |
private final UnloaderTestConfig config; | |
private final PulsarClient client; | |
private final ConsumerConfiguration consumerConfig; | |
private List<String> namespaces; | |
private final ExecutorService executor; | |
public BrokerBunldeUnloader(UnloaderTestConfig config) throws Exception { | |
this.unloaderScheduler = Executors | |
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("cms-bundle-unloader")); | |
this.executor = Executors.newFixedThreadPool(16, new DefaultThreadFactory("unload-admin")); | |
this.admin = new PulsarAdmin(new URL(config.getAdminURL()), config.authPluginClassName, config.authParams); | |
this.config = config; | |
ClientConfiguration clientConf = new ClientConfiguration(); | |
clientConf.setConnectionsPerBroker(config.maxConnections); | |
clientConf.setIoThreads(this.config.ioThreads); | |
clientConf.setStatsInterval(0, TimeUnit.SECONDS); | |
if (StringUtils.isNotBlank(config.authPluginClassName)) { | |
clientConf.setAuthentication(config.authPluginClassName, config.authParams); | |
} | |
client = new PulsarClientImpl(config.brokerURL, clientConf); | |
consumerConfig = new ConsumerConfiguration(); | |
consumerConfig.setSubscriptionType(SubscriptionType.Shared); | |
this.namespaces = config.listNamespaces(); | |
} | |
public void scheduleBundleUnlaoding() { | |
unloaderScheduler.scheduleAtFixedRate(() -> unloadAllBrokersBundles(), config.unloadingDelaySecs, | |
config.unloadingInSecs, TimeUnit.SECONDS); | |
} | |
public void unloadAllBrokerBundles() { | |
this.namespaces.forEach(ns -> { | |
try { | |
admin.namespaces().unload(ns); | |
} catch (Exception e) { | |
log.error("Failed to unload namespace {}, {}", ns, e.getMessage()); | |
} | |
}); | |
} | |
public void unloadAllBrokersBundles() { | |
try { | |
admin.brokers().getActiveBrokers(config.clusterName).forEach(brokerUrl -> { | |
try { | |
final AtomicInteger totalBundles = new AtomicInteger(0); | |
Map<String, NamespaceOwnershipStatus> bundleMap = admin.brokers() | |
.getOwnedNamespaces(config.clusterName, brokerUrl); | |
bundleMap.forEach((bundle, stat) -> { | |
if (stat.is_active) { | |
if (bundle.startsWith(config.propertyName)) { | |
int bundleIndex = bundle.lastIndexOf("/"); | |
String ns = bundle.substring(0, bundleIndex); | |
String bundleName = bundle.substring(bundleIndex + 1); | |
executor.submit(() -> { | |
try { | |
admin.namespaces().unloadNamespaceBundle(ns, bundleName); | |
totalBundles.getAndIncrement(); | |
} catch (PulsarAdminException e) { | |
log.error("failed to unload bundle {} {}", bundle, e.getMessage()); | |
} | |
}); | |
} | |
log.info("unloaded bundle for broker {} {}", brokerUrl, bundle); | |
} | |
}); | |
log.info("Unloaded total {} bundles on {}", totalBundles.get(), brokerUrl); | |
} catch (Exception e) { | |
log.error("failed to get owned bundles brokers {}", brokerUrl, e.getMessage()); | |
} | |
}); | |
} catch (PulsarAdminException e) { | |
log.error("failed to get active brokers {}", e.getMessage()); | |
} | |
} | |
public void startTest() { | |
final AtomicInteger producerCount = new AtomicInteger(0); | |
final AtomicInteger consumerCount = new AtomicInteger(0); | |
// (1) create namespace with bundles | |
createNamespace(); | |
// (2) start producers and consumers | |
this.namespaces.forEach(ns -> { | |
for (int i = 0; i < config.numberOfTopics; i++) { | |
final String topic = "persistent://" + ns + "/" + i; | |
for (int p = 0; p < config.numberOfProduces; p++) { | |
try { | |
client.createProducer(topic); | |
log.info("Started {} producer on {}", producerCount.getAndIncrement(), topic); | |
} catch (PulsarClientException e) { | |
log.error("Failed to create producer {} {}", topic, e.getMessage()); | |
} | |
} | |
for (int c = 0; c < config.numberOfConsumers; c++) { | |
try { | |
client.subscribe(topic, "my-sub", consumerConfig); | |
log.info("Started {} consumer on {}", consumerCount.getAndIncrement(), topic); | |
} catch (PulsarClientException e) { | |
log.error("Failed to create consumer {} {}", topic, e.getMessage()); | |
} | |
} | |
log.info("created {} producers and {} consumers for {}", config.numberOfProduces, | |
config.numberOfConsumers, topic); | |
} | |
}); | |
// (3) schedule task to unload all namespaces | |
scheduleBundleUnlaoding(); | |
} | |
private void createNamespace() { | |
final BundlesData bundle = new BundlesData(config.totalNsBundle); | |
this.namespaces.forEach(ns -> { | |
if (config.deleteNamespace) { | |
try { | |
admin.namespaces().deleteNamespace(ns); | |
} catch (Exception e) { | |
// ok | |
} | |
} | |
try { | |
admin.namespaces().createNamespace(ns, bundle); | |
admin.namespaces().setNamespaceMessageTTL(ns, 60 * 60 * 24 * 1/* 2 days: in seconds */); | |
} catch (PulsarAdminException e) { | |
log.error("Failed to create namespace {} {}", ns, e.getMessage(), e); | |
} | |
}); | |
} | |
public static class UnloaderTestConfig { | |
@Parameter(names = { "-h", "--help" }, description = "Help message", help = true) | |
boolean help; | |
@Parameter(names = { "--conf-file" }, description = "Configuration file") | |
public String confFile; | |
@Parameter(names = { "-u", "--broker-url" }, description = "Pulsar broker URL") | |
public String brokerURL; | |
@Parameter(names = { "-au", "--admin-url" }, description = "Pulsar Admin URL") | |
public String adminURL; | |
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name") | |
public String authPluginClassName; | |
@Parameter(names = { | |
"--auth_params" }, description = "Authentication parameters, e.g., \"key1:val1,key2:val2\"") | |
public String authParams; | |
@Parameter(names = { "-prop", "--property-name" }, description = "Property name for load-test testing") | |
public String propertyName = "cms-load-test"; | |
@Parameter(names = { "-cluster", "--cluster-name" }, description = "Cluster name for load-test testing") | |
public String clusterName = "perf1-gq1"; | |
@Parameter(names = { "-nsp", "--ns-prefix" }, description = "Namespace prefix") | |
public String nsPrefix = "load"; | |
@Parameter(names = { "-tns", "--total-ns" }, description = "Total number of namespaces") | |
public int totalNs = 2; | |
@Parameter(names = { "-dns", "--delete-ns" }, description = "Delete namespaces") | |
public boolean deleteNamespace = false; | |
@Parameter(names = { "-tnb", "--total-ns-bundle" }, description = "Total number of namespaces bundles per ns") | |
public int totalNsBundle = 1000; | |
@Parameter(names = { "-c", | |
"--max-connections" }, description = "Max number of TCP connections to a single broker") | |
public int maxConnections = 1; | |
@Parameter(names = { "-ni", | |
"--io-threads" }, description = "pulsar client io threads") | |
public int ioThreads = 16; | |
@Parameter(names = { "-t", "--num-topics" }, description = "Number of topics") | |
public int numberOfTopics = 1000; | |
@Parameter(names = { "-np", "--num-producers" }, description = "Number of producers") | |
public int numberOfProduces = 1; | |
@Parameter(names = { "-nc", "--num-consumers" }, description = "Number of consumers") | |
public int numberOfConsumers = 1; | |
@Parameter(names = { "-ut", "--unload-time" }, description = "Unloading bundle time in sec") | |
public long unloadingInSecs = 90; | |
@Parameter(names = { "-ud", "--unload-delay" }, description = "Unloading bundle delay time in sec") | |
public long unloadingDelaySecs = 3 * 60; | |
public List<String> listNamespaces() { | |
List<String> namespaces = Lists.newArrayList(); | |
final String topic = propertyName + "/" + clusterName + "/%s"; | |
for (int i = 0; i < totalNs; i++) { | |
final String namespaceName = nsPrefix + "-" + i; | |
namespaces.add(String.format(topic, namespaceName)); | |
} | |
return namespaces; | |
} | |
public boolean isHelp() { | |
return help; | |
} | |
public void setHelp(boolean help) { | |
this.help = help; | |
} | |
public String getConfFile() { | |
return confFile; | |
} | |
public void setConfFile(String confFile) { | |
this.confFile = confFile; | |
} | |
public String getAdminURL() { | |
if (StringUtils.isEmpty(adminURL)) { | |
return this.brokerURL; | |
} | |
return adminURL; | |
} | |
public void setAdminURL(String adminURL) { | |
this.adminURL = adminURL; | |
} | |
public String getAuthPluginClassName() { | |
return authPluginClassName; | |
} | |
public void setAuthPluginClassName(String authPluginClassName) { | |
this.authPluginClassName = authPluginClassName; | |
} | |
public String getBrokerURL() { | |
return brokerURL; | |
} | |
public void setBrokerURL(String brokerURL) { | |
this.brokerURL = brokerURL; | |
} | |
public String getAuthParams() { | |
return authParams; | |
} | |
public void setAuthParams(String authParams) { | |
this.authParams = authParams; | |
} | |
public String getPropertyName() { | |
return propertyName; | |
} | |
public void setPropertyName(String propertyName) { | |
this.propertyName = propertyName; | |
} | |
public String getClusterName() { | |
return clusterName; | |
} | |
public void setClusterName(String clusterName) { | |
this.clusterName = clusterName; | |
} | |
public int getTotalNs() { | |
return totalNs; | |
} | |
public void setTotalNs(int totalNs) { | |
this.totalNs = totalNs; | |
} | |
public int getTotalNsBundle() { | |
return totalNsBundle; | |
} | |
public void setTotalNsBundle(int totalNsBundle) { | |
this.totalNsBundle = totalNsBundle; | |
} | |
public int getMaxConnections() { | |
return maxConnections; | |
} | |
public void setMaxConnections(int maxConnections) { | |
this.maxConnections = maxConnections; | |
} | |
public int getNumberOfTopics() { | |
return numberOfTopics; | |
} | |
public void setNumberOfTopics(int numberOfTopics) { | |
this.numberOfTopics = numberOfTopics; | |
} | |
public int getNumberOfProduces() { | |
return numberOfProduces; | |
} | |
public void setNumberOfProduces(int numberOfProduces) { | |
this.numberOfProduces = numberOfProduces; | |
} | |
public int getNumberOfConsumers() { | |
return numberOfConsumers; | |
} | |
public void setNumberOfConsumers(int numberOfConsumers) { | |
this.numberOfConsumers = numberOfConsumers; | |
} | |
public long getUnloadingInSecs() { | |
return unloadingInSecs; | |
} | |
public void setUnloadingInSecs(long unloadingInSecs) { | |
this.unloadingInSecs = unloadingInSecs; | |
} | |
public long getUnloadingDelaySecs() { | |
return unloadingDelaySecs; | |
} | |
public void setUnloadingDelaySecs(long unloadingDelaySecs) { | |
this.unloadingDelaySecs = unloadingDelaySecs; | |
} | |
@Override | |
public String toString() { | |
return "UnloaderTestConfig [help=" + help + ", confFile=" + confFile + ", brokerURL=" + brokerURL | |
+ ", adminURL=" + adminURL + ", authPluginClassName=" + authPluginClassName + ", authParams=" | |
+ authParams + ", propertyName=" + propertyName + ", clusterName=" + clusterName + ", nsPrefix=" | |
+ nsPrefix + ", totalNs=" + totalNs + ", deleteNamespace=" + deleteNamespace + ", totalNsBundle=" | |
+ totalNsBundle + ", maxConnections=" + maxConnections + ", ioThreads=" + ioThreads | |
+ ", numberOfTopics=" + numberOfTopics + ", numberOfProduces=" + numberOfProduces | |
+ ", numberOfConsumers=" + numberOfConsumers + ", unloadingInSecs=" + unloadingInSecs | |
+ ", unloadingDelaySecs=" + unloadingDelaySecs + "]"; | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
final UnloaderTestConfig arguments = new UnloaderTestConfig(); | |
JCommander jc = new JCommander(arguments); | |
jc.setProgramName("system-test"); | |
try { | |
jc.parse(args); | |
} catch (ParameterException e) { | |
System.out.println(e.getMessage()); | |
jc.usage(); | |
System.exit(-1); | |
} | |
if (arguments.help) { | |
jc.usage(); | |
System.exit(-1); | |
} | |
if (arguments.confFile != null) { | |
Properties prop = new Properties(System.getProperties()); | |
prop.load(new FileInputStream(arguments.confFile)); | |
if (arguments.brokerURL == null) { | |
arguments.brokerURL = prop.getProperty("serviceUrl", null); | |
} | |
if (arguments.authPluginClassName == null) { | |
arguments.authPluginClassName = prop.getProperty("authPlugin", null); | |
} | |
if (arguments.authParams == null) { | |
arguments.authParams = prop.getProperty("authParams", null); | |
} | |
} | |
long start = System.currentTimeMillis(); | |
log.info("Starting test with config " + arguments.toString()); | |
BrokerBunldeUnloader bundleTest = new BrokerBunldeUnloader(arguments); | |
bundleTest.startTest(); | |
long end = System.currentTimeMillis(); | |
log.info("Bundle unload load-test has started in {} sec", ((end - start) / 1000)); | |
} | |
private static final Logger log = LoggerFactory.getLogger(BrokerBunldeUnloader.class); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment