Skip to content

Instantly share code, notes, and snippets.

@rdhabalia
Last active March 21, 2017 07:46
Show Gist options
  • Save rdhabalia/475167af627aba7ce24dc3488c10242a to your computer and use it in GitHub Desktop.
Save rdhabalia/475167af627aba7ce24dc3488c10242a to your computer and use it in GitHub Desktop.
load-test
ZK-latency log at broker:
[pulsar-zk-session-watcher-10-1] WARN c.y.p.z.ZooKeeperSessionWatcher - zoo keeper disconnected, waiting to reconnect, time remaining = 25 seconds
ZK nc stat
(1) before test
zk_version 3.4.6--1, built on 08/05/2015 23:17 GMT
zk_avg_latency 11
zk_max_latency 3720
zk_min_latency 0
zk_packets_received 30146197
zk_packets_sent 30222025
zk_num_alive_connections 3
zk_outstanding_requests 0
zk_server_state follower
(2) after test
stat = zk_version 3.4.6--1, built on 08/05/2015 23:17 GMT
zk_avg_latency 31
zk_max_latency 3720
zk_min_latency 0
zk_packets_received 52797270
zk_packets_sent 53600852
zk_num_alive_connections 5
zk_outstanding_requests 53
zk_server_state follower
Sample code:
package com.yahoo.cloud.messaging.load.test;
import java.io.FileInputStream;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.google.common.collect.Lists;
import com.yahoo.pulsar.client.admin.PulsarAdmin;
import com.yahoo.pulsar.client.admin.PulsarAdminException;
import com.yahoo.pulsar.client.api.ClientConfiguration;
import com.yahoo.pulsar.client.api.ConsumerConfiguration;
import com.yahoo.pulsar.client.api.PulsarClient;
import com.yahoo.pulsar.client.api.PulsarClientException;
import com.yahoo.pulsar.client.api.SubscriptionType;
import com.yahoo.pulsar.client.impl.PulsarClientImpl;
import com.yahoo.pulsar.common.policies.data.BundlesData;
import com.yahoo.pulsar.common.policies.data.NamespaceOwnershipStatus;
import io.netty.util.concurrent.DefaultThreadFactory;
public class BrokerBunldeUnloader {
private PulsarAdmin admin;
private final ScheduledExecutorService unloaderScheduler;
private final UnloaderTestConfig config;
private final PulsarClient client;
private final ConsumerConfiguration consumerConfig;
private List<String> namespaces;
private final ExecutorService executor;
public BrokerBunldeUnloader(UnloaderTestConfig config) throws Exception {
this.unloaderScheduler = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("cms-bundle-unloader"));
this.executor = Executors.newFixedThreadPool(16, new DefaultThreadFactory("unload-admin"));
this.admin = new PulsarAdmin(new URL(config.getAdminURL()), config.authPluginClassName, config.authParams);
this.config = config;
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setConnectionsPerBroker(config.maxConnections);
clientConf.setIoThreads(this.config.ioThreads);
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
if (StringUtils.isNotBlank(config.authPluginClassName)) {
clientConf.setAuthentication(config.authPluginClassName, config.authParams);
}
client = new PulsarClientImpl(config.brokerURL, clientConf);
consumerConfig = new ConsumerConfiguration();
consumerConfig.setSubscriptionType(SubscriptionType.Shared);
this.namespaces = config.listNamespaces();
}
public void scheduleBundleUnlaoding() {
unloaderScheduler.scheduleAtFixedRate(() -> unloadAllBrokersBundles(), config.unloadingDelaySecs,
config.unloadingInSecs, TimeUnit.SECONDS);
}
public void unloadAllBrokerBundles() {
this.namespaces.forEach(ns -> {
try {
admin.namespaces().unload(ns);
} catch (Exception e) {
log.error("Failed to unload namespace {}, {}", ns, e.getMessage());
}
});
}
public void unloadAllBrokersBundles() {
try {
admin.brokers().getActiveBrokers(config.clusterName).forEach(brokerUrl -> {
try {
final AtomicInteger totalBundles = new AtomicInteger(0);
Map<String, NamespaceOwnershipStatus> bundleMap = admin.brokers()
.getOwnedNamespaces(config.clusterName, brokerUrl);
bundleMap.forEach((bundle, stat) -> {
if (stat.is_active) {
if (bundle.startsWith(config.propertyName)) {
int bundleIndex = bundle.lastIndexOf("/");
String ns = bundle.substring(0, bundleIndex);
String bundleName = bundle.substring(bundleIndex + 1);
executor.submit(() -> {
try {
admin.namespaces().unloadNamespaceBundle(ns, bundleName);
totalBundles.getAndIncrement();
} catch (PulsarAdminException e) {
log.error("failed to unload bundle {} {}", bundle, e.getMessage());
}
});
}
log.info("unloaded bundle for broker {} {}", brokerUrl, bundle);
}
});
log.info("Unloaded total {} bundles on {}", totalBundles.get(), brokerUrl);
} catch (Exception e) {
log.error("failed to get owned bundles brokers {}", brokerUrl, e.getMessage());
}
});
} catch (PulsarAdminException e) {
log.error("failed to get active brokers {}", e.getMessage());
}
}
public void startTest() {
final AtomicInteger producerCount = new AtomicInteger(0);
final AtomicInteger consumerCount = new AtomicInteger(0);
// (1) create namespace with bundles
createNamespace();
// (2) start producers and consumers
this.namespaces.forEach(ns -> {
for (int i = 0; i < config.numberOfTopics; i++) {
final String topic = "persistent://" + ns + "/" + i;
for (int p = 0; p < config.numberOfProduces; p++) {
try {
client.createProducer(topic);
log.info("Started {} producer on {}", producerCount.getAndIncrement(), topic);
} catch (PulsarClientException e) {
log.error("Failed to create producer {} {}", topic, e.getMessage());
}
}
for (int c = 0; c < config.numberOfConsumers; c++) {
try {
client.subscribe(topic, "my-sub", consumerConfig);
log.info("Started {} consumer on {}", consumerCount.getAndIncrement(), topic);
} catch (PulsarClientException e) {
log.error("Failed to create consumer {} {}", topic, e.getMessage());
}
}
log.info("created {} producers and {} consumers for {}", config.numberOfProduces,
config.numberOfConsumers, topic);
}
});
// (3) schedule task to unload all namespaces
scheduleBundleUnlaoding();
}
private void createNamespace() {
final BundlesData bundle = new BundlesData(config.totalNsBundle);
this.namespaces.forEach(ns -> {
if (config.deleteNamespace) {
try {
admin.namespaces().deleteNamespace(ns);
} catch (Exception e) {
// ok
}
}
try {
admin.namespaces().createNamespace(ns, bundle);
admin.namespaces().setNamespaceMessageTTL(ns, 60 * 60 * 24 * 1/* 2 days: in seconds */);
} catch (PulsarAdminException e) {
log.error("Failed to create namespace {} {}", ns, e.getMessage(), e);
}
});
}
public static class UnloaderTestConfig {
@Parameter(names = { "-h", "--help" }, description = "Help message", help = true)
boolean help;
@Parameter(names = { "--conf-file" }, description = "Configuration file")
public String confFile;
@Parameter(names = { "-u", "--broker-url" }, description = "Pulsar broker URL")
public String brokerURL;
@Parameter(names = { "-au", "--admin-url" }, description = "Pulsar Admin URL")
public String adminURL;
@Parameter(names = { "--auth_plugin" }, description = "Authentication plugin class name")
public String authPluginClassName;
@Parameter(names = {
"--auth_params" }, description = "Authentication parameters, e.g., \"key1:val1,key2:val2\"")
public String authParams;
@Parameter(names = { "-prop", "--property-name" }, description = "Property name for load-test testing")
public String propertyName = "cms-load-test";
@Parameter(names = { "-cluster", "--cluster-name" }, description = "Cluster name for load-test testing")
public String clusterName = "perf1-gq1";
@Parameter(names = { "-nsp", "--ns-prefix" }, description = "Namespace prefix")
public String nsPrefix = "load";
@Parameter(names = { "-tns", "--total-ns" }, description = "Total number of namespaces")
public int totalNs = 2;
@Parameter(names = { "-dns", "--delete-ns" }, description = "Delete namespaces")
public boolean deleteNamespace = false;
@Parameter(names = { "-tnb", "--total-ns-bundle" }, description = "Total number of namespaces bundles per ns")
public int totalNsBundle = 1000;
@Parameter(names = { "-c",
"--max-connections" }, description = "Max number of TCP connections to a single broker")
public int maxConnections = 1;
@Parameter(names = { "-ni",
"--io-threads" }, description = "pulsar client io threads")
public int ioThreads = 16;
@Parameter(names = { "-t", "--num-topics" }, description = "Number of topics")
public int numberOfTopics = 1000;
@Parameter(names = { "-np", "--num-producers" }, description = "Number of producers")
public int numberOfProduces = 1;
@Parameter(names = { "-nc", "--num-consumers" }, description = "Number of consumers")
public int numberOfConsumers = 1;
@Parameter(names = { "-ut", "--unload-time" }, description = "Unloading bundle time in sec")
public long unloadingInSecs = 90;
@Parameter(names = { "-ud", "--unload-delay" }, description = "Unloading bundle delay time in sec")
public long unloadingDelaySecs = 3 * 60;
public List<String> listNamespaces() {
List<String> namespaces = Lists.newArrayList();
final String topic = propertyName + "/" + clusterName + "/%s";
for (int i = 0; i < totalNs; i++) {
final String namespaceName = nsPrefix + "-" + i;
namespaces.add(String.format(topic, namespaceName));
}
return namespaces;
}
public boolean isHelp() {
return help;
}
public void setHelp(boolean help) {
this.help = help;
}
public String getConfFile() {
return confFile;
}
public void setConfFile(String confFile) {
this.confFile = confFile;
}
public String getAdminURL() {
if (StringUtils.isEmpty(adminURL)) {
return this.brokerURL;
}
return adminURL;
}
public void setAdminURL(String adminURL) {
this.adminURL = adminURL;
}
public String getAuthPluginClassName() {
return authPluginClassName;
}
public void setAuthPluginClassName(String authPluginClassName) {
this.authPluginClassName = authPluginClassName;
}
public String getBrokerURL() {
return brokerURL;
}
public void setBrokerURL(String brokerURL) {
this.brokerURL = brokerURL;
}
public String getAuthParams() {
return authParams;
}
public void setAuthParams(String authParams) {
this.authParams = authParams;
}
public String getPropertyName() {
return propertyName;
}
public void setPropertyName(String propertyName) {
this.propertyName = propertyName;
}
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public int getTotalNs() {
return totalNs;
}
public void setTotalNs(int totalNs) {
this.totalNs = totalNs;
}
public int getTotalNsBundle() {
return totalNsBundle;
}
public void setTotalNsBundle(int totalNsBundle) {
this.totalNsBundle = totalNsBundle;
}
public int getMaxConnections() {
return maxConnections;
}
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
public int getNumberOfTopics() {
return numberOfTopics;
}
public void setNumberOfTopics(int numberOfTopics) {
this.numberOfTopics = numberOfTopics;
}
public int getNumberOfProduces() {
return numberOfProduces;
}
public void setNumberOfProduces(int numberOfProduces) {
this.numberOfProduces = numberOfProduces;
}
public int getNumberOfConsumers() {
return numberOfConsumers;
}
public void setNumberOfConsumers(int numberOfConsumers) {
this.numberOfConsumers = numberOfConsumers;
}
public long getUnloadingInSecs() {
return unloadingInSecs;
}
public void setUnloadingInSecs(long unloadingInSecs) {
this.unloadingInSecs = unloadingInSecs;
}
public long getUnloadingDelaySecs() {
return unloadingDelaySecs;
}
public void setUnloadingDelaySecs(long unloadingDelaySecs) {
this.unloadingDelaySecs = unloadingDelaySecs;
}
@Override
public String toString() {
return "UnloaderTestConfig [help=" + help + ", confFile=" + confFile + ", brokerURL=" + brokerURL
+ ", adminURL=" + adminURL + ", authPluginClassName=" + authPluginClassName + ", authParams="
+ authParams + ", propertyName=" + propertyName + ", clusterName=" + clusterName + ", nsPrefix="
+ nsPrefix + ", totalNs=" + totalNs + ", deleteNamespace=" + deleteNamespace + ", totalNsBundle="
+ totalNsBundle + ", maxConnections=" + maxConnections + ", ioThreads=" + ioThreads
+ ", numberOfTopics=" + numberOfTopics + ", numberOfProduces=" + numberOfProduces
+ ", numberOfConsumers=" + numberOfConsumers + ", unloadingInSecs=" + unloadingInSecs
+ ", unloadingDelaySecs=" + unloadingDelaySecs + "]";
}
}
public static void main(String[] args) throws Exception {
final UnloaderTestConfig arguments = new UnloaderTestConfig();
JCommander jc = new JCommander(arguments);
jc.setProgramName("system-test");
try {
jc.parse(args);
} catch (ParameterException e) {
System.out.println(e.getMessage());
jc.usage();
System.exit(-1);
}
if (arguments.help) {
jc.usage();
System.exit(-1);
}
if (arguments.confFile != null) {
Properties prop = new Properties(System.getProperties());
prop.load(new FileInputStream(arguments.confFile));
if (arguments.brokerURL == null) {
arguments.brokerURL = prop.getProperty("serviceUrl", null);
}
if (arguments.authPluginClassName == null) {
arguments.authPluginClassName = prop.getProperty("authPlugin", null);
}
if (arguments.authParams == null) {
arguments.authParams = prop.getProperty("authParams", null);
}
}
long start = System.currentTimeMillis();
log.info("Starting test with config " + arguments.toString());
BrokerBunldeUnloader bundleTest = new BrokerBunldeUnloader(arguments);
bundleTest.startTest();
long end = System.currentTimeMillis();
log.info("Bundle unload load-test has started in {} sec", ((end - start) / 1000));
}
private static final Logger log = LoggerFactory.getLogger(BrokerBunldeUnloader.class);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment