Skip to content

Instantly share code, notes, and snippets.

@eransharv
Created January 5, 2017 09:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eransharv/a3edef22de72458722083526f55f7aca to your computer and use it in GitHub Desktop.
Save eransharv/a3edef22de72458722083526f55f7aca to your computer and use it in GitHub Desktop.
Test scripts for redis pubsub feature. You can define the numbers of subscribers and channels, as well as other configurations.
package RedisPubSubMulti;
import java.util.concurrent.TimeUnit;
import redis.clients.jedis.Jedis;
public class Publisher extends Thread{
private Thread t;
private String h;
private int p;
private String pass;
private String[] channelsToPublish = null;
private String msg = null;
private long pausa = 0; //in milisec
private long duration = 0; //in milisec
private Jedis jedis;
public Publisher(String message, String[] totalChannels, long sleepBetweenMsg, long elapsedTime, String hostName, int port, String password) {
System.out.println("Creating Publisher");
channelsToPublish = totalChannels;
msg = message;
pausa = sleepBetweenMsg;
duration = elapsedTime;
h = hostName;
p = port;
pass = password;
}
public void run(){
System.out.println("Running Publisher");
System.out.println("Publish to all channels the message: " + msg);
jedis = new Jedis(h, p);
jedis.auth(pass);
if(duration == 0)
{
while(true) {
publish();
}
}
else {
for (long stop=System.nanoTime()+TimeUnit.MILLISECONDS.toNanos(duration);stop>System.nanoTime();) {
publish();
}
}
jedis.close();
System.out.println("exiting Publisher");
}
public void start() {
//Starts the thread in a separate path of execution, then invokes the run() method on this Thread object.
System.out.println("Publisher started");
if (t == null) {
t = new Thread (this, "Publisher");
t.start ();
}
}
public void publish() {
for(int i=0; i < channelsToPublish.length; i++) {
jedis.publish(channelsToPublish[i], msg);
try {
Thread.sleep(pausa);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package RedisPubSubMulti;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
public class Subscriber extends Thread {
private Thread t;
private String subscriberName;
private String h;
private int p;
private String pass;
private String[] channelsToListen = null;
public Subscriber(String name, String[] channels, String hostName, int port, String password) {
subscriberName = name;
channelsToListen = channels;
h = hostName;
p = port;
pass = password;
System.out.println("Creating " + subscriberName );
}
public void run(){
System.out.println("Running " + subscriberName );
Jedis jedis = new Jedis(h, p);
jedis.auth(pass);
jedis.subscribe((new JedisPubSub(){
@Override
public void onMessage(String channel, String message) {
System.out.println(subscriberName + " Receiving On: " + channel + " message: " + message);
}
@Override
public void onPMessage(String arg0, String arg1, String arg2) {
// TODO Auto-generated method stub
}
@Override
public void onPSubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onUnsubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
}), channelsToListen);
jedis.close();
System.out.println("exiting " + subscriberName);
}
public void start() {
//Starts the thread in a separate path of execution, then invokes the run() method on this Thread object.
System.out.println("Starting " + subscriberName );
if (t == null) {
t = new Thread (this, subscriberName);
t.start ();
}
}
}
package RedisPubSubMulti;
import java.util.Arrays;
public class TestPubSub {
public static int subscribersNum = 1; //numbers of subscribers to create
public static int channelsPerSubscriber = 50000; //how many channels to create for each subscriber (each subscriber not sharing his channels with others).
public static String message = "Abra Kadabra"; //the message to publish for all the subscribers
public static long sleepBetweenMsg = 0; //sleep between each message. in milisec.
public static long elapsedTime = 0; //the duration time for the publish loop (that runs on the subscribers) in milisec. 0 means infinite loop.
public static String hostName = "pub-redis-12010.us-east-mz.1.ec2.garantiadata.com";
public static int port = 12010;
public static String password = "XXX";
public static void main(String[] args) {
System.out.println("Creating " + subscribersNum + " subscribers");
int totalChannelsNum = subscribersNum * channelsPerSubscriber;
String[] totalChannels = new String[totalChannelsNum]; //holds the channels names to subscribe
//creating all the channels
for(int i=0; i<totalChannelsNum; i++) {
totalChannels[i] = "channel-" + String.valueOf(i + 1);
}
int channelsCtr = 0;
//creating the subscribers
for(int i = 0; i < subscribersNum; i++) {
String subName = "subscriber-" + String.valueOf(i+1);
String[] channelsForClient = new String[channelsPerSubscriber];
//channelsForClient = Arrays.copyOfRange(totalChannels,channelsCtr,channelsCtr+channelsPerClient);
//channelsCtr += channelsPerClient;
//prepare just the relevant channels for this subscriber
for(int j = 0; j < channelsPerSubscriber; j++,channelsCtr++) {
channelsForClient[j] = totalChannels[channelsCtr];
}
//create the current subscriber with it channels. each subscriber is created in different thread with it's own connection
Subscriber sub = new Subscriber(subName, channelsForClient, hostName, port, password); //one connection per one subscriber
sub.start();
}
//creating the publisher in different thread. the publisher will publish the message to all the channels in a loop, or in the duration that defined in elapsedTime variable.
Publisher pub = new Publisher(message, totalChannels, sleepBetweenMsg, elapsedTime, hostName, port, password);
pub.start();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment