Skip to content

Instantly share code, notes, and snippets.

@Felk
Created October 4, 2016 23:07
Show Gist options
  • Save Felk/457c87cb7e6704d3b2fc2f253f2224a5 to your computer and use it in GitHub Desktop.
Save Felk/457c87cb7e6704d3b2fc2f253f2224a5 to your computer and use it in GitHub Desktop.
package de.hsrm.swtpro2015.network;
import java.util.List;
import java.util.logging.Logger;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.RemoveInfo;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategy.PascalCaseStrategy;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.inject.Inject;
import de.hsrm.swtpro2015.events.Event;
import de.hsrm.swtpro2015.events.EventList;
import de.hsrm.swtpro2015.events.EventList.EventType;
import de.hsrm.swtpro2015.events.ExitGameEvent;
import de.hsrm.swtpro2015.events.GameCreatedEvent;
import de.hsrm.swtpro2015.events.PlayerAddedEvent;
import de.hsrm.swtpro2015.events.util.EventManager;
import de.hsrm.swtpro2015.events.util.PublicClientEvent;
import de.hsrm.swtpro2015.exceptions.BrokerConnectionFailedException;
import de.hsrm.swtpro2015.exceptions.MissingEventManagerException;
import de.hsrm.swtpro2015.game.Game;
/**
*
* Zentrales Netzwerksystem zum Übersetzen und Multiplexen von
* Nertzwerkereignissen in und aus spielnative(n) Events. Grundlage zur
* Netzwerkkommunikation ist der MessageBroker ActiveMQ von Apache. Er ist
* eingebettet.
*
* @author bring001 fkoen001 mvala001
*
*/
public class NetworkCommunication {
private static final Logger LOGGER = Logger.getGlobal();
public static final int DEFAULT_PORT = 61616;
public static final String LISTEN_DEVICE_IP = "0.0.0.0";
// Namensgebung für Topics und Queues
public static final String EVENT_PREFIX = "EVENT_";
public static final String GAME_PREFIX = "GAME_";
public static final String PLAYER_PREFIX = "PLAYER_";
// Dependencies:
// TODO von außen übergeben (Dependency Injection)
private EventManager eventManager;
private ObjectMapper objectMapper;
// Verbindungssession zum Broker
private Connection connection;
private Session session;
private int port;
/**
* Erstellt eine NetworkCommunication-Instanz auf dem Standard-Port
* {@value #DEFAULT_PORT}.
*/
@Inject public NetworkCommunication(EventManager eventManager) {
this(eventManager, DEFAULT_PORT);
}
/**
* Eine neue NetworkCommunication-Instanz startet einen internet Message
* Broker und initialisert alle globalen Topics etc.
*
* @param port
* Port für Message Broker
*/
public NetworkCommunication(EventManager eventManager, int port) {
this.eventManager = eventManager;
this.port = port;
}
/**
* Startet einen internen Message Broker und initialisert alle globalen
* Topics etc.
*
* @throws BrokerConnectionFailedException
* Falls der Broker nicht richtig gestartet werden kann.
*/
public void start() throws BrokerConnectionFailedException {
try {
// Starte eingebetteten Broker Service (oder stirb langsam (via Exception)).
// lauscht auf allen Interfaces.
BrokerService broker = new BrokerService();
broker.addConnector("tcp://" + LISTEN_DEVICE_IP + ":" + port);
setLastMessagesPersistent(broker);
broker.setAdvisorySupport(true);
broker.start();
// Erstellen der Verbindung zum internen Broker
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Alles netzwerktechnische fertig, starten der Broker-Verbindung
connection.start();
} catch (Exception e) {
throw new BrokerConnectionFailedException();
}
// Erstelle einen Jackson-ObjectMapper zum (de)serialisieren von und in JSON.
objectMapper = new ObjectMapper();
// Attributnamen beginnen in Groß
objectMapper.setPropertyNamingStrategy(new PascalCaseStrategy());
// Leere Beans sollen keine Fehler auslösen
objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS);
// Anmelden auf default-Topics im Message Broker
initConsumersGlobal();
// Anmelden auf Events, welche rausgesendet werden sollen
initProducersGlobal();
// Anmelden auf spielspezifisches Topic mit entsprechendem Spiel-EventManager,
// wenn ein Spiel hinzukommt
eventManager.addEventListener(
event -> {
initConsumersGame(event.getGameID(), event.getGame());
initProducersGame(event.getGame(), event.getGameID());
// Anmelden auf spielerspezifische Queues, wenn ein Spieler hinzukommt
event.getGame().getEventManager().addEventListener(event2 -> {
initConsumersPlayer(event.getGameID(), event.getGame(), event2.getPlayerID());
initProducersPlayer(event.getGame(), event.getGameID(), event2.getPlayerID());
}, PlayerAddedEvent.class);
}, GameCreatedEvent.class);
// Anmelden auf Gamespezifische Topics, wenn ein Game hinzukommt
// -- Gibt es nicht --
}
/**
* @return Session-Objekt der Session mit dem internen Message Broker
*/
public Session getSession() {
return session;
}
/**
* Erstellt Consumer für alle globalen Topics und meldet entsprechende
* Listener an.
*/
private void initConsumersGlobal() {
// Nur globale Events, keine spieler- oder spielspezifischen Events
List<EventList> events = EventList.forType(EventType.GLOBAL);
for (EventList event : events) {
try {
Destination dest = session.createTopic(event.getNetworkName());
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(new NetworkEventAdapter(objectMapper, event, eventManager));
} catch (JMSException e) {
LOGGER.warning("Could not subscribe to global topic: " + event.getNetworkName());
e.printStackTrace();
}
}
}
/**
* Erstellt Consumer für alle Spielerspezifischen Queues und meldet
* entsprechende Listener an.
*
* @param gameID
* Spiele-ID, in dem der Spieler existiert
* @param playerID
* Spieler-ID
*/
private void initConsumersPlayer(int gameID, Game game, int playerID) {
// Nur spielerspezifische Events, keine globalen oder spielspezifischen Events
List<EventList> events = EventList.forType(EventType.PLAYER);
for (EventList event : events) {
String queuename = GAME_PREFIX + gameID + "_" + PLAYER_PREFIX + playerID + "_" + event.getNetworkName();
try {
Destination dest = session.createQueue(queuename);
MessageConsumer consumer = session.createConsumer(dest);
new Thread(() -> {
try {
consumer.setMessageListener(new NetworkEventPlayerAdapter(objectMapper, event, game.getEventManager(), playerID));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
game.addGameEndListener(g -> {
try {
consumer.close();
} catch (Exception e) {
e.printStackTrace();
}
});
game.getEventManager().addEventListener(ege -> {
if (ege.getPlayerID() != playerID) return;
try {
consumer.close();
} catch (Exception e) {
e.printStackTrace();
}
}, ExitGameEvent.class);
} catch (JMSException e) {
LOGGER.warning("Could not subscribe to player queue:" + queuename);
e.printStackTrace();
}
}
}
/**
* Erstellt Consumer für alle Spielspezifischen Topics und meldet
* entsprechende Listener an.
*
* @param gameID
* Spiele-ID
* @param gameEventManager
* EventManager des Spiels
*/
private void initConsumersGame(int gameID, Game game) {
// Alle Spielspezifischen Events, die einkommen können
List<EventList> events = EventList.forType(EventType.GAME);
for (EventList event : events) {
String topicname = GAME_PREFIX + gameID + "_" + event.getNetworkName();
try {
Destination dest = session.createTopic(topicname);
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(new NetworkEventGameAdapter(objectMapper, event, game.getEventManager()));
game.addGameEndListener(g -> {
try {
consumer.close();
} catch (Exception e) {
e.printStackTrace();
}
});
} catch (JMSException e) {
LOGGER.warning("Could not subscribe to game topic: " + topicname);
e.printStackTrace();
}
}
}
/**
* Meldet sich für ausgehende Events auf dem Spiel-Eventlistener an und
* erstellt entsprechende Listener zum Senden.
*
* @param gameID
* ID des Spiels
* @param gameEventManager
* EventManager des Spiels
*/
private void initProducersPlayer(Game game, int gameID, int playerID) {
// für alle ausgehenden Events anmelden
// Es gibt keine ausgehenden Spieler-Events.
// Auf Verbindungsverlust hören!
class ConnectionEvent implements Event {}
try {
String topicName = "ActiveMQ.Advisory.Consumer.Queue." + topicNameFromPlayerEvent(gameID, playerID, new ConnectionEvent());
Destination dest = session.createTopic(topicName);
MessageConsumer consumer = session.createConsumer(dest);
consumer.setMessageListener(msg -> {
if (msg instanceof ActiveMQMessage) {
ActiveMQMessage aMsg = (ActiveMQMessage) msg;
if (aMsg.getDataStructure() instanceof RemoveInfo) {
LOGGER.info("Client hat verbindung verloren: " + playerID);
// Client hat Verbindung verloren => Spiel verlassen
ExitGameEvent event = new ExitGameEvent();
event.setPlayerID(playerID);
game.getEventManager().onEvent(event);
try {
consumer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
});
game.addGameEndListener(g -> {
try {
consumer.close();
} catch (Exception e) {
e.printStackTrace();
}
});
game.getEventManager().addEventListener(ege -> {
if (ege.getPlayerID() != playerID) return;
try {
consumer.close();
} catch (Exception e) {
e.printStackTrace();
}
}, ExitGameEvent.class);
} catch (JMSException e) {
e.printStackTrace();
}
}
/**
* Meldet sich für ausgehende Events auf dem Spiel-Eventlistener an und
* erstellt entsprechende Listener zum Senden.
*
* @param gameID
* ID des Spiels
* @param gameEventManager
* EventManager des Spiels
*/
private void initProducersGame(Game game, int gameID) {
// für alle ausgehenden Events (=PublicClientEvent) anmelden
game.getEventManager().addEventListener(event -> {
try {
TextMessage message = session.createTextMessage();
MessageProducer producer = null;
Destination dest = session.createTopic(topicNameFromGameEvent(gameID, event));
message.setText(objectMapper.writeValueAsString(event));
producer = session.createProducer(dest);
producer.send(message);
LOGGER.info("Sent game event: " + event.getClass().getSimpleName() + message.getText());
} catch (Exception e1) {
e1.printStackTrace();
}
} , PublicClientEvent.class);
}
/**
* Meldet sich beim EventListener an rauszuschickenden Events an, erstellt
* entsprechende Producer und verknüpft diese.
*/
private void initProducersGlobal() {
// Das Interface ClientEvent besagt, dass dieses Event rausgeschickt werden soll
if (eventManager == null) {
throw new MissingEventManagerException();
}
// Reguläres Topic ermitteln und verschicken
eventManager.addEventListener(event -> {
try {
TextMessage message = session.createTextMessage();
MessageProducer producer = null;
Destination dest = session.createTopic(topicNameFromClientEvent(event));
message.setText(objectMapper.writeValueAsString(event));
producer = session.createProducer(dest);
producer.send(message);
LOGGER.info("Sent global event: " + event.getClass().getSimpleName() + message.getText());
} catch (Exception e1) {
e1.printStackTrace();
}
} , PublicClientEvent.class);
}
/**
* Generiert einen Topic Name für ein ClientEvent
*
* @param event
* Das ClientEvent, für das ein Topicname erstellt werden soll
* @return Ein systematisch aufgebauter Topic Name
*/
private String topicNameFromClientEvent(PublicClientEvent event) {
return String.format("%s%s",
EVENT_PREFIX, event.getClass().getSimpleName());
}
/**
* Generiert einen Topic Name für ein GameEvent
*
* @param gameID
* die gameID, aus dem das Event stammt
* @param gameEvent
* das Event aus dem EventManasger des Spiels
* @return Ein systematisch aufgebauter Topic Name
*/
private String topicNameFromGameEvent(int gameID, Event gameEvent) {
return String.format("%s%d_%s%s",
GAME_PREFIX, gameID,
EVENT_PREFIX, gameEvent.getClass().getSimpleName());
}
/**
* Generiert einen Topic Name für ein spielerspezifisches Event
*
* @param gameID
* die gameID, aus dem das Event stammt
* @param playerID
* die SpielerID, dem das Event zugeordnet ist
* @param gameEvent
* das Event aus dem EventManasger des Spiels
* @return Ein systematisch aufgebauter Topic Name
*/
private String topicNameFromPlayerEvent(int gameID, int playerID, Event playerEvent) {
return String.format("%s%d_%s%d_%s%s",
GAME_PREFIX, gameID,
PLAYER_PREFIX, playerID,
EVENT_PREFIX, playerEvent.getClass().getSimpleName());
}
/**
* Aktiviert die LastImageSubscriptionRecoveryPolicy, sodass die Letzte
* bereits gesendete Nachrichten auch bei nachträglichem Anmelden an einem
* Topic gesendet werden kann.
*
* @param broker
* Message Broker, bei dem diese Aktion durchgeführt wird.
*/
private void setLastMessagesPersistent(BrokerService broker) {
LastImageSubscriptionRecoveryPolicy policy = new LastImageSubscriptionRecoveryPolicy();
PolicyEntry pe = new PolicyEntry();
pe.setSubscriptionRecoveryPolicy(policy);
PolicyMap pm = new PolicyMap();
pm.setDefaultEntry(pe);
broker.setDestinationPolicy(pm);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment