Created
January 5, 2017 09:41
-
-
Save eransharv/a3edef22de72458722083526f55f7aca to your computer and use it in GitHub Desktop.
Test scripts for redis pubsub feature. You can define the numbers of subscribers and channels, as well as other configurations.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package RedisPubSubMulti; | |
import java.util.concurrent.TimeUnit; | |
import redis.clients.jedis.Jedis; | |
public class Publisher extends Thread{ | |
private Thread t; | |
private String h; | |
private int p; | |
private String pass; | |
private String[] channelsToPublish = null; | |
private String msg = null; | |
private long pausa = 0; //in milisec | |
private long duration = 0; //in milisec | |
private Jedis jedis; | |
public Publisher(String message, String[] totalChannels, long sleepBetweenMsg, long elapsedTime, String hostName, int port, String password) { | |
System.out.println("Creating Publisher"); | |
channelsToPublish = totalChannels; | |
msg = message; | |
pausa = sleepBetweenMsg; | |
duration = elapsedTime; | |
h = hostName; | |
p = port; | |
pass = password; | |
} | |
public void run(){ | |
System.out.println("Running Publisher"); | |
System.out.println("Publish to all channels the message: " + msg); | |
jedis = new Jedis(h, p); | |
jedis.auth(pass); | |
if(duration == 0) | |
{ | |
while(true) { | |
publish(); | |
} | |
} | |
else { | |
for (long stop=System.nanoTime()+TimeUnit.MILLISECONDS.toNanos(duration);stop>System.nanoTime();) { | |
publish(); | |
} | |
} | |
jedis.close(); | |
System.out.println("exiting Publisher"); | |
} | |
public void start() { | |
//Starts the thread in a separate path of execution, then invokes the run() method on this Thread object. | |
System.out.println("Publisher started"); | |
if (t == null) { | |
t = new Thread (this, "Publisher"); | |
t.start (); | |
} | |
} | |
public void publish() { | |
for(int i=0; i < channelsToPublish.length; i++) { | |
jedis.publish(channelsToPublish[i], msg); | |
try { | |
Thread.sleep(pausa); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package RedisPubSubMulti; | |
import redis.clients.jedis.Jedis; | |
import redis.clients.jedis.JedisPubSub; | |
public class Subscriber extends Thread { | |
private Thread t; | |
private String subscriberName; | |
private String h; | |
private int p; | |
private String pass; | |
private String[] channelsToListen = null; | |
public Subscriber(String name, String[] channels, String hostName, int port, String password) { | |
subscriberName = name; | |
channelsToListen = channels; | |
h = hostName; | |
p = port; | |
pass = password; | |
System.out.println("Creating " + subscriberName ); | |
} | |
public void run(){ | |
System.out.println("Running " + subscriberName ); | |
Jedis jedis = new Jedis(h, p); | |
jedis.auth(pass); | |
jedis.subscribe((new JedisPubSub(){ | |
@Override | |
public void onMessage(String channel, String message) { | |
System.out.println(subscriberName + " Receiving On: " + channel + " message: " + message); | |
} | |
@Override | |
public void onPMessage(String arg0, String arg1, String arg2) { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public void onPSubscribe(String arg0, int arg1) { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public void onPUnsubscribe(String arg0, int arg1) { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public void onSubscribe(String arg0, int arg1) { | |
// TODO Auto-generated method stub | |
} | |
@Override | |
public void onUnsubscribe(String arg0, int arg1) { | |
// TODO Auto-generated method stub | |
} | |
}), channelsToListen); | |
jedis.close(); | |
System.out.println("exiting " + subscriberName); | |
} | |
public void start() { | |
//Starts the thread in a separate path of execution, then invokes the run() method on this Thread object. | |
System.out.println("Starting " + subscriberName ); | |
if (t == null) { | |
t = new Thread (this, subscriberName); | |
t.start (); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package RedisPubSubMulti; | |
import java.util.Arrays; | |
public class TestPubSub { | |
public static int subscribersNum = 1; //numbers of subscribers to create | |
public static int channelsPerSubscriber = 50000; //how many channels to create for each subscriber (each subscriber not sharing his channels with others). | |
public static String message = "Abra Kadabra"; //the message to publish for all the subscribers | |
public static long sleepBetweenMsg = 0; //sleep between each message. in milisec. | |
public static long elapsedTime = 0; //the duration time for the publish loop (that runs on the subscribers) in milisec. 0 means infinite loop. | |
public static String hostName = "pub-redis-12010.us-east-mz.1.ec2.garantiadata.com"; | |
public static int port = 12010; | |
public static String password = "XXX"; | |
public static void main(String[] args) { | |
System.out.println("Creating " + subscribersNum + " subscribers"); | |
int totalChannelsNum = subscribersNum * channelsPerSubscriber; | |
String[] totalChannels = new String[totalChannelsNum]; //holds the channels names to subscribe | |
//creating all the channels | |
for(int i=0; i<totalChannelsNum; i++) { | |
totalChannels[i] = "channel-" + String.valueOf(i + 1); | |
} | |
int channelsCtr = 0; | |
//creating the subscribers | |
for(int i = 0; i < subscribersNum; i++) { | |
String subName = "subscriber-" + String.valueOf(i+1); | |
String[] channelsForClient = new String[channelsPerSubscriber]; | |
//channelsForClient = Arrays.copyOfRange(totalChannels,channelsCtr,channelsCtr+channelsPerClient); | |
//channelsCtr += channelsPerClient; | |
//prepare just the relevant channels for this subscriber | |
for(int j = 0; j < channelsPerSubscriber; j++,channelsCtr++) { | |
channelsForClient[j] = totalChannels[channelsCtr]; | |
} | |
//create the current subscriber with it channels. each subscriber is created in different thread with it's own connection | |
Subscriber sub = new Subscriber(subName, channelsForClient, hostName, port, password); //one connection per one subscriber | |
sub.start(); | |
} | |
//creating the publisher in different thread. the publisher will publish the message to all the channels in a loop, or in the duration that defined in elapsedTime variable. | |
Publisher pub = new Publisher(message, totalChannels, sleepBetweenMsg, elapsedTime, hostName, port, password); | |
pub.start(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment