Skip to content

Instantly share code, notes, and snippets.

@fehmicansaglam
Created April 3, 2012 14:44
Show Gist options
  • Save fehmicansaglam/2292578 to your computer and use it in GitHub Desktop.
Save fehmicansaglam/2292578 to your computer and use it in GitHub Desktop.
Akka ya da Java ExecutorService ve Hazelcast ile kullanıcı bildirim kuyruğu
public class Producer extends UntypedActor {
private static final Gson gson = new GsonBuilder().create();
private final String[] keys = {"user1", "user2", "user3", "user4", "user5", "user6"};
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof Integer) {
Integer opCount = (Integer) message;
final Random random = new Random();
final MultiMap<String, String> notificationMap = HazelcastManager.getNotificationMap();
final IMap<String, String> notificationMetadataMap = HazelcastManager.getNotificationMetadataMap();
for (int i = 0; i < opCount; ++i) {
final String key = keys[random.nextInt(keys.length)];
final String text = UUID.randomUUID().toString();
final long timestamp = System.currentTimeMillis();
final Notification notification = new Notification(timestamp, key, text);
notificationMap.lock(key);
notificationMetadataMap.lock(key);
notificationMap.put(key, gson.toJson(notification));
QueueMetadata metadata = gson.fromJson(notificationMetadataMap.get(key), QueueMetadata.class);
if (metadata == null) {
metadata = new QueueMetadata(1, timestamp);
} else {
metadata = metadata.increment();
}
notificationMetadataMap.put(key, gson.toJson(metadata));
notificationMetadataMap.unlock(key);
notificationMap.unlock(key);
}
getSender().tell("finished", getSelf());
} else {
unhandled(message);
}
}
}
public final class Producer extends LatchWorker {
private final int opCount;
private final String[] keys = {"user1", "user2", "user3", "user4", "user5", "user6"};
private final Gson gson = new GsonBuilder().create();
public Producer(int opCount, CountDownLatch startGate, CountDownLatch endGate, int id) {
super(startGate, endGate, id);
this.opCount = opCount;
}
@Override
void doJob() {
final MultiMap<String, String> notificationMap = HazelcastManager.getNotificationMap();
final IMap<String, String> notificationMetadataMap = HazelcastManager.getNotificationMetadataMap();
final Random random = new Random();
for (int i = 0; i < opCount; ++i) {
final String key = keys[random.nextInt(keys.length)];
notificationMap.lock(key);
notificationMetadataMap.lock(key);
final String text = UUID.randomUUID().toString();
final long timestamp = System.currentTimeMillis();
final Notification notification = new Notification(timestamp, key, text);
notificationMap.put(key, gson.toJson(notification));
QueueMetadata metadata = gson.fromJson(notificationMetadataMap.get(key), QueueMetadata.class);
if (metadata == null) {
metadata = new QueueMetadata(1, timestamp);
} else {
metadata = (QueueMetadata) metadata.increment();
}
notificationMetadataMap.put(key, gson.toJson(metadata));
notificationMetadataMap.unlock(key);
notificationMap.unlock(key);
}
}
}
public final class HazelcastManager {
private static final HazelcastInstance client;
private static final String NOTIFICATION_MAP = "notification-map";
private static final String NOTIFICATION_METADATA_MAP = "notification-metadata-map";
static {
ClientConfig clientConfig = new ClientConfig();
clientConfig.getGroupConfig().setName("dev").setPassword("dev-pass");
clientConfig.addAddress("10.35.1.43:5701");
client = HazelcastClient.newHazelcastClient(clientConfig);
getNotificationMap().clear();
getNotificationMetadataMap().clear();
}
private HazelcastManager() {
}
public static MultiMap<String, String> getNotificationMap() {
return client.getMultiMap(NOTIFICATION_MAP);
}
public static IMap<String, String> getNotificationMetadataMap() {
return client.getMap(NOTIFICATION_METADATA_MAP);
}
public static void shutdown(){
client.getLifecycleService().shutdown();
}
}
public abstract class LatchWorker implements Runnable {
protected final int id;
private final CountDownLatch startGate;
private final CountDownLatch endGate;
public LatchWorker(CountDownLatch startGate, CountDownLatch endGate, int id) {
this.id = id;
this.startGate = startGate;
this.endGate = endGate;
}
@Override
public void run() {
System.out.println("Thread[" + this.id + "]: waiting for latch");
try {
startGate.await();
System.out.println("Thread[" + this.id + "]: started");
doJob();
System.out.println("Thread[" + this.id + "] completed");
endGate.countDown();
} catch (InterruptedException ex) {
}
}
abstract void doJob();
}
public class Main {
private static final int threadCount = 8;
private static final int opCount = 16384 / threadCount;
private static void runJava() throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(threadCount);
final ExecutorService executor = Executors.newFixedThreadPool(threadCount + 1);
for (int i = 0; i < threadCount; ++i) {
executor.execute(new workers.java.Producer(opCount, startGate, endGate, i));
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
System.out.println("Java Total time: " + (end - start) / 1000);
executor.shutdownNow();
}
private static void runAkka() {
ActorSystem system = ActorSystem.create("NotificationSystem");
ActorRef master = system.actorOf(new Props(new UntypedActorFactory() {
@Override
public UntypedActor create() {
return new Master(threadCount, opCount);
}
}), "producer");
master.tell("start");
}
public static void main(String[] args) throws InterruptedException {
HazelcastManager.getNotificationMap();
Thread.sleep(1000);
runJava();
runAkka();
}
public class Master extends UntypedActor {
private final ActorRef producerRouter;
private final int nrOfProducers;
private final int opCount;
private int nrOfResults;
private long start;
public Master(final int nrOfProducers, final int opCount) {
this.nrOfProducers = nrOfProducers;
this.opCount = opCount;
this.producerRouter = this.getContext().actorOf(new Props(Producer.class).withRouter(new RoundRobinRouter(nrOfProducers)),
"producerRouter");
}
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
if ("start".equals(message)) {
start = System.nanoTime();
nrOfResults = 0;
for (int i = 0; i < this.nrOfProducers; i++) {
producerRouter.tell(this.opCount, getSelf());
}
} else if ("finished".equals(message)) {
nrOfResults++;
if (nrOfResults == nrOfProducers) {
long end = System.nanoTime();
System.out.println("Akka Total time: " + (end - start) / 1000);
getContext().system().shutdown();
HazelcastManager.shutdown();
}
}
}
}
}
public final class Notification implements Serializable {
public final long timestamp; //bildirimin oluşturulma zamanı
public final String user; //ilgili kullanıcı adı
public final String text; //bildirim mesajı
public Notification(final long timestamp, final String user, final String text) {
this.timestamp = timestamp;
this.user = user;
this.text = text;
}
}
public final class QueueMetadata implements Serializable {
public final int entryCount; //bildirim kuyruğu eleman sayısı
public final long oldestEntryTimestamp; //kuyruktaki en eski elemanın oluşturulma zamanı
public QueueMetadata(int entryCount, long oldestEntryTimestamp) {
this.entryCount = entryCount;
this.oldestEntryTimestamp = oldestEntryTimestamp;
}
public QueueMetadata increment() {
return new QueueMetadata(entryCount + 1, oldestEntryTimestamp);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment