Skip to content

Instantly share code, notes, and snippets.

@Isan-Rivkin
Last active August 8, 2018 20:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Isan-Rivkin/d44ef461ab1402bad5750a3578760447 to your computer and use it in GitHub Desktop.
Save Isan-Rivkin/d44ef461ab1402bad5750a3578760447 to your computer and use it in GitHub Desktop.
Concurrent HashMap With Time
package actor_map;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.typed.Behavior;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import static akka.pattern.Patterns.gracefulStop;
public class ActorMap<K,V> extends AbstractActor {
/* message defitions */
public static class PutMsg<K,V>{
public final K key;
public final V value;
public final TimeUnit unit;
public final long duration;
public PutMsg(K key, V value, long duration, TimeUnit unit){
this.key = key;
this.value=value;
this.unit = unit;
this.duration = duration;
}
}
public static class GetMsg<K>{
public final K key;
public GetMsg(K key){
this.key = key;
}
}
public static class RemoveMsg<K>{
public final K key;
public RemoveMsg(K key){
this.key = key;
}
}
public static class InnerRemoveMsg<K>{
public final K key;
public final long stamp;
public InnerRemoveMsg(K key, long stamp){
this.key = key;
this.stamp = stamp;
}
}
public static class SizeMsg{}
public static class TerminateMsg {}
/* end of messages */
private final Map<K,StampedItem<V>> innerMap = new HashMap<>();
private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor();
private final AtomicLong stateMarker = new AtomicLong(1);
private ActorRef selfRef;
@Override
public Receive createReceive() {
selfRef = getSelf();
return receiveBuilder()
.match(PutMsg.class, this::put)
.match(RemoveMsg.class, this::remove)
.match(GetMsg.class, this::get)
.match(SizeMsg.class, this::size)
.match(InnerRemoveMsg.class, this::innerRemove)
.match(TerminateMsg.class,this::terminate)
.build();
}
private void terminate(TerminateMsg msg){
cleaner.shutdownNow();
// getContext().stop(selfRef);
//getContext().stop(getSelf());
// try {
// cleaner.awaitTermination(1,TimeUnit.SECONDS);
// getContext().stop(getSelf());
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
}
private void put(PutMsg msg) {
long stamp = stateMarker.getAndIncrement();
innerMap.put((K)msg.key, new StampedItem<>((V)msg.value,stamp));
cleaner.schedule(()->{
getSelf().tell(new InnerRemoveMsg<>(msg.key, stamp),
getSelf());
//remove_st(msg,stamp);
},msg.duration,msg.unit);
}
private void remove_st(PutMsg msg,long stamp){
StampedItem item = innerMap.get(msg.key);
if (item != null && item.stamp == stamp)
innerMap.remove(msg.key);
}
private void remove(RemoveMsg msg){
StampedItem i = innerMap.remove(msg.key);
if (i == null)
sender().tell("", self());
else
sender().tell(i.item, self());
}
private void get(GetMsg msg){
StampedItem i = innerMap.get(msg.key);
if (i==null)
sender().tell("", self());
else
sender().tell(i.item, self());
}
private void size(SizeMsg msg){
sender().tell((long)innerMap.size(), self());
}
private void innerRemove(InnerRemoveMsg msg){
StampedItem item = innerMap.get(msg.key);
if (item != null && item.stamp == msg.stamp)
innerMap.remove(msg.key);
}
public static void main(String[] args) throws InterruptedException {
ActorSystem system = ActorSystem.create("PutaSystem");
Props p = Props.create(ActorMap.class,() -> new ActorMap<Integer,String>());
ActorRef actorMap = system.actorOf(p, "amzing");
actorMap.tell(new PutMsg<Integer,String>(
1,
"isan",
3,
TimeUnit.SECONDS),
actorMap);
Thread.sleep(5000);
//yarden.tell(new NuniActor.Greeting("from me isan rivka"), isan);
system.terminate();
}
}
package actor_map;
import akka.Done;
import akka.actor.*;
import akka.util.Timeout;
import first_map.MapInterface;
import scala.concurrent.Await;
import scala.concurrent.Future;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import static akka.pattern.Patterns.ask;
public class ActorTimedHaspMap<K,V> implements MapInterface.TimedSizableMap<K,V>{
private final ActorSystem system = ActorSystem.create("ActorSystem");
private final Props p = Props.create(ActorMap.class,() -> new ActorMap<K,V>());
private final ActorRef actorMap = system.actorOf(p, "theMap");
@Override
public void put(K key, V value, long duration, TimeUnit unit) {
actorMap.tell(new ActorMap.PutMsg<>(key, value, duration, unit), ActorRef.noSender());
}
@Override
public Optional<V> get(K key) {
Timeout timeout = Timeout.create(Duration.ofSeconds(1));
Future<Object> future = ask(actorMap,new ActorMap.GetMsg<>(key),timeout);
try {
V v = (V)Await.result(future,timeout.duration());
if (v != ""){
return Optional.ofNullable(v);
}
} catch (Exception e) {
e.printStackTrace();
}
return Optional.ofNullable(null);
}
@Override
public Optional<V> remove(K key) {
Timeout timeout = Timeout.create(Duration.ofSeconds(1));
Future<Object> future = ask(actorMap, new ActorMap.RemoveMsg<>(key),timeout);
try{
return Optional.ofNullable((V)Await.result(future,timeout.duration()));
}catch(Exception e){
e.printStackTrace();
return Optional.empty();
}
}
@Override
public long size() {
Timeout timeout = Timeout.create(Duration.ofSeconds(1));
Future<Object> future = ask(actorMap,new ActorMap.SizeMsg(),timeout);
try{
return (long)Await.result(future,timeout.duration());
}catch (Exception e){
e.printStackTrace();
return 0;
}
}
public void terminate(){
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// FiniteDuration dur = new FiniteDuration(1,TimeUnit.MILLISECONDS);
// gracefulStop(actorMap,dur,new ActorMap.TerminateMsg());
// system.mailboxes().deadLetterMailbox().cleanUp();
actorMap.tell(new ActorMap.TerminateMsg(),ActorRef.noSender());
system.stop(actorMap);
Future<?> f = system.terminate();
Timeout timeout = Timeout.create(Duration.ofSeconds(1));
try {
Await.result(f,timeout.duration());
} catch (Exception e) {
}
}
public static void main(String[]args) throws InterruptedException {
ActorTimedHaspMap<Integer, String> test= new ActorTimedHaspMap<>();
// test.put(1,"yarden",3,TimeUnit.SECONDS);
// test.put(1,"isan",3,TimeUnit.SECONDS);
// test.put(2,"nuni",3,TimeUnit.SECONDS);
// System.out.println(test.size());//2
// Thread.sleep(4000);
// System.out.println(test.size());//0
// test.put(1,"yarden",3,TimeUnit.SECONDS);
// test.put(1,"isan",3,TimeUnit.SECONDS);
// test.put(2,"nuni",3,TimeUnit.SECONDS);
// test.remove(1);
// System.out.println(test.size());//1
for (int i=0; i<100_000;++i){
test.put(i,i+"nuni",3,TimeUnit.MILLISECONDS);
}
Thread t1 = new Thread(()->{
for (int i=0; i<100_000;++i){
test.put(i+100_001,i+"nuni",3,TimeUnit.MILLISECONDS);
}
});
t1.start();
t1.join();
System.out.println(test.size());//1
test.remove(1);
test.terminate();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment