Last active
August 8, 2018 20:36
-
-
Save Isan-Rivkin/d44ef461ab1402bad5750a3578760447 to your computer and use it in GitHub Desktop.
Concurrent HashMap With Time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actor_map; | |
import akka.actor.AbstractActor; | |
import akka.actor.ActorRef; | |
import akka.actor.ActorSystem; | |
import akka.actor.Props; | |
import akka.typed.Behavior; | |
import java.time.Duration; | |
import java.util.*; | |
import java.util.concurrent.*; | |
import java.util.concurrent.atomic.AtomicLong; | |
import static akka.pattern.Patterns.gracefulStop; | |
public class ActorMap<K,V> extends AbstractActor { | |
/* message defitions */ | |
public static class PutMsg<K,V>{ | |
public final K key; | |
public final V value; | |
public final TimeUnit unit; | |
public final long duration; | |
public PutMsg(K key, V value, long duration, TimeUnit unit){ | |
this.key = key; | |
this.value=value; | |
this.unit = unit; | |
this.duration = duration; | |
} | |
} | |
public static class GetMsg<K>{ | |
public final K key; | |
public GetMsg(K key){ | |
this.key = key; | |
} | |
} | |
public static class RemoveMsg<K>{ | |
public final K key; | |
public RemoveMsg(K key){ | |
this.key = key; | |
} | |
} | |
public static class InnerRemoveMsg<K>{ | |
public final K key; | |
public final long stamp; | |
public InnerRemoveMsg(K key, long stamp){ | |
this.key = key; | |
this.stamp = stamp; | |
} | |
} | |
public static class SizeMsg{} | |
public static class TerminateMsg {} | |
/* end of messages */ | |
private final Map<K,StampedItem<V>> innerMap = new HashMap<>(); | |
private final ScheduledExecutorService cleaner = Executors.newSingleThreadScheduledExecutor(); | |
private final AtomicLong stateMarker = new AtomicLong(1); | |
private ActorRef selfRef; | |
@Override | |
public Receive createReceive() { | |
selfRef = getSelf(); | |
return receiveBuilder() | |
.match(PutMsg.class, this::put) | |
.match(RemoveMsg.class, this::remove) | |
.match(GetMsg.class, this::get) | |
.match(SizeMsg.class, this::size) | |
.match(InnerRemoveMsg.class, this::innerRemove) | |
.match(TerminateMsg.class,this::terminate) | |
.build(); | |
} | |
private void terminate(TerminateMsg msg){ | |
cleaner.shutdownNow(); | |
// getContext().stop(selfRef); | |
//getContext().stop(getSelf()); | |
// try { | |
// cleaner.awaitTermination(1,TimeUnit.SECONDS); | |
// getContext().stop(getSelf()); | |
// } catch (InterruptedException e) { | |
// e.printStackTrace(); | |
// } | |
} | |
private void put(PutMsg msg) { | |
long stamp = stateMarker.getAndIncrement(); | |
innerMap.put((K)msg.key, new StampedItem<>((V)msg.value,stamp)); | |
cleaner.schedule(()->{ | |
getSelf().tell(new InnerRemoveMsg<>(msg.key, stamp), | |
getSelf()); | |
//remove_st(msg,stamp); | |
},msg.duration,msg.unit); | |
} | |
private void remove_st(PutMsg msg,long stamp){ | |
StampedItem item = innerMap.get(msg.key); | |
if (item != null && item.stamp == stamp) | |
innerMap.remove(msg.key); | |
} | |
private void remove(RemoveMsg msg){ | |
StampedItem i = innerMap.remove(msg.key); | |
if (i == null) | |
sender().tell("", self()); | |
else | |
sender().tell(i.item, self()); | |
} | |
private void get(GetMsg msg){ | |
StampedItem i = innerMap.get(msg.key); | |
if (i==null) | |
sender().tell("", self()); | |
else | |
sender().tell(i.item, self()); | |
} | |
private void size(SizeMsg msg){ | |
sender().tell((long)innerMap.size(), self()); | |
} | |
private void innerRemove(InnerRemoveMsg msg){ | |
StampedItem item = innerMap.get(msg.key); | |
if (item != null && item.stamp == msg.stamp) | |
innerMap.remove(msg.key); | |
} | |
public static void main(String[] args) throws InterruptedException { | |
ActorSystem system = ActorSystem.create("PutaSystem"); | |
Props p = Props.create(ActorMap.class,() -> new ActorMap<Integer,String>()); | |
ActorRef actorMap = system.actorOf(p, "amzing"); | |
actorMap.tell(new PutMsg<Integer,String>( | |
1, | |
"isan", | |
3, | |
TimeUnit.SECONDS), | |
actorMap); | |
Thread.sleep(5000); | |
//yarden.tell(new NuniActor.Greeting("from me isan rivka"), isan); | |
system.terminate(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actor_map; | |
import akka.Done; | |
import akka.actor.*; | |
import akka.util.Timeout; | |
import first_map.MapInterface; | |
import scala.concurrent.Await; | |
import scala.concurrent.Future; | |
import java.time.Duration; | |
import java.util.Optional; | |
import java.util.concurrent.TimeUnit; | |
import static akka.pattern.Patterns.ask; | |
public class ActorTimedHaspMap<K,V> implements MapInterface.TimedSizableMap<K,V>{ | |
private final ActorSystem system = ActorSystem.create("ActorSystem"); | |
private final Props p = Props.create(ActorMap.class,() -> new ActorMap<K,V>()); | |
private final ActorRef actorMap = system.actorOf(p, "theMap"); | |
@Override | |
public void put(K key, V value, long duration, TimeUnit unit) { | |
actorMap.tell(new ActorMap.PutMsg<>(key, value, duration, unit), ActorRef.noSender()); | |
} | |
@Override | |
public Optional<V> get(K key) { | |
Timeout timeout = Timeout.create(Duration.ofSeconds(1)); | |
Future<Object> future = ask(actorMap,new ActorMap.GetMsg<>(key),timeout); | |
try { | |
V v = (V)Await.result(future,timeout.duration()); | |
if (v != ""){ | |
return Optional.ofNullable(v); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
return Optional.ofNullable(null); | |
} | |
@Override | |
public Optional<V> remove(K key) { | |
Timeout timeout = Timeout.create(Duration.ofSeconds(1)); | |
Future<Object> future = ask(actorMap, new ActorMap.RemoveMsg<>(key),timeout); | |
try{ | |
return Optional.ofNullable((V)Await.result(future,timeout.duration())); | |
}catch(Exception e){ | |
e.printStackTrace(); | |
return Optional.empty(); | |
} | |
} | |
@Override | |
public long size() { | |
Timeout timeout = Timeout.create(Duration.ofSeconds(1)); | |
Future<Object> future = ask(actorMap,new ActorMap.SizeMsg(),timeout); | |
try{ | |
return (long)Await.result(future,timeout.duration()); | |
}catch (Exception e){ | |
e.printStackTrace(); | |
return 0; | |
} | |
} | |
public void terminate(){ | |
try { | |
Thread.sleep(2000); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
// FiniteDuration dur = new FiniteDuration(1,TimeUnit.MILLISECONDS); | |
// gracefulStop(actorMap,dur,new ActorMap.TerminateMsg()); | |
// system.mailboxes().deadLetterMailbox().cleanUp(); | |
actorMap.tell(new ActorMap.TerminateMsg(),ActorRef.noSender()); | |
system.stop(actorMap); | |
Future<?> f = system.terminate(); | |
Timeout timeout = Timeout.create(Duration.ofSeconds(1)); | |
try { | |
Await.result(f,timeout.duration()); | |
} catch (Exception e) { | |
} | |
} | |
public static void main(String[]args) throws InterruptedException { | |
ActorTimedHaspMap<Integer, String> test= new ActorTimedHaspMap<>(); | |
// test.put(1,"yarden",3,TimeUnit.SECONDS); | |
// test.put(1,"isan",3,TimeUnit.SECONDS); | |
// test.put(2,"nuni",3,TimeUnit.SECONDS); | |
// System.out.println(test.size());//2 | |
// Thread.sleep(4000); | |
// System.out.println(test.size());//0 | |
// test.put(1,"yarden",3,TimeUnit.SECONDS); | |
// test.put(1,"isan",3,TimeUnit.SECONDS); | |
// test.put(2,"nuni",3,TimeUnit.SECONDS); | |
// test.remove(1); | |
// System.out.println(test.size());//1 | |
for (int i=0; i<100_000;++i){ | |
test.put(i,i+"nuni",3,TimeUnit.MILLISECONDS); | |
} | |
Thread t1 = new Thread(()->{ | |
for (int i=0; i<100_000;++i){ | |
test.put(i+100_001,i+"nuni",3,TimeUnit.MILLISECONDS); | |
} | |
}); | |
t1.start(); | |
t1.join(); | |
System.out.println(test.size());//1 | |
test.remove(1); | |
test.terminate(); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment