Created
October 4, 2016 23:07
-
-
Save Felk/457c87cb7e6704d3b2fc2f253f2224a5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package de.hsrm.swtpro2015.network; | |
import java.util.List; | |
import java.util.logging.Logger; | |
import javax.jms.Connection; | |
import javax.jms.Destination; | |
import javax.jms.JMSException; | |
import javax.jms.MessageConsumer; | |
import javax.jms.MessageProducer; | |
import javax.jms.Session; | |
import javax.jms.TextMessage; | |
import org.apache.activemq.ActiveMQConnectionFactory; | |
import org.apache.activemq.broker.BrokerService; | |
import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; | |
import org.apache.activemq.broker.region.policy.PolicyEntry; | |
import org.apache.activemq.broker.region.policy.PolicyMap; | |
import org.apache.activemq.command.ActiveMQMessage; | |
import org.apache.activemq.command.DataStructure; | |
import org.apache.activemq.command.RemoveInfo; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.databind.PropertyNamingStrategy.PascalCaseStrategy; | |
import com.fasterxml.jackson.databind.SerializationFeature; | |
import com.google.inject.Inject; | |
import de.hsrm.swtpro2015.events.Event; | |
import de.hsrm.swtpro2015.events.EventList; | |
import de.hsrm.swtpro2015.events.EventList.EventType; | |
import de.hsrm.swtpro2015.events.ExitGameEvent; | |
import de.hsrm.swtpro2015.events.GameCreatedEvent; | |
import de.hsrm.swtpro2015.events.PlayerAddedEvent; | |
import de.hsrm.swtpro2015.events.util.EventManager; | |
import de.hsrm.swtpro2015.events.util.PublicClientEvent; | |
import de.hsrm.swtpro2015.exceptions.BrokerConnectionFailedException; | |
import de.hsrm.swtpro2015.exceptions.MissingEventManagerException; | |
import de.hsrm.swtpro2015.game.Game; | |
/** | |
* | |
* Zentrales Netzwerksystem zum Übersetzen und Multiplexen von | |
* Nertzwerkereignissen in und aus spielnative(n) Events. Grundlage zur | |
* Netzwerkkommunikation ist der MessageBroker ActiveMQ von Apache. Er ist | |
* eingebettet. | |
* | |
* @author bring001 fkoen001 mvala001 | |
* | |
*/ | |
public class NetworkCommunication { | |
private static final Logger LOGGER = Logger.getGlobal(); | |
public static final int DEFAULT_PORT = 61616; | |
public static final String LISTEN_DEVICE_IP = "0.0.0.0"; | |
// Namensgebung für Topics und Queues | |
public static final String EVENT_PREFIX = "EVENT_"; | |
public static final String GAME_PREFIX = "GAME_"; | |
public static final String PLAYER_PREFIX = "PLAYER_"; | |
// Dependencies: | |
// TODO von außen übergeben (Dependency Injection) | |
private EventManager eventManager; | |
private ObjectMapper objectMapper; | |
// Verbindungssession zum Broker | |
private Connection connection; | |
private Session session; | |
private int port; | |
/** | |
* Erstellt eine NetworkCommunication-Instanz auf dem Standard-Port | |
* {@value #DEFAULT_PORT}. | |
*/ | |
@Inject public NetworkCommunication(EventManager eventManager) { | |
this(eventManager, DEFAULT_PORT); | |
} | |
/** | |
* Eine neue NetworkCommunication-Instanz startet einen internet Message | |
* Broker und initialisert alle globalen Topics etc. | |
* | |
* @param port | |
* Port für Message Broker | |
*/ | |
public NetworkCommunication(EventManager eventManager, int port) { | |
this.eventManager = eventManager; | |
this.port = port; | |
} | |
/** | |
* Startet einen internen Message Broker und initialisert alle globalen | |
* Topics etc. | |
* | |
* @throws BrokerConnectionFailedException | |
* Falls der Broker nicht richtig gestartet werden kann. | |
*/ | |
public void start() throws BrokerConnectionFailedException { | |
try { | |
// Starte eingebetteten Broker Service (oder stirb langsam (via Exception)). | |
// lauscht auf allen Interfaces. | |
BrokerService broker = new BrokerService(); | |
broker.addConnector("tcp://" + LISTEN_DEVICE_IP + ":" + port); | |
setLastMessagesPersistent(broker); | |
broker.setAdvisorySupport(true); | |
broker.start(); | |
// Erstellen der Verbindung zum internen Broker | |
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); | |
connection = factory.createConnection(); | |
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); | |
// Alles netzwerktechnische fertig, starten der Broker-Verbindung | |
connection.start(); | |
} catch (Exception e) { | |
throw new BrokerConnectionFailedException(); | |
} | |
// Erstelle einen Jackson-ObjectMapper zum (de)serialisieren von und in JSON. | |
objectMapper = new ObjectMapper(); | |
// Attributnamen beginnen in Groß | |
objectMapper.setPropertyNamingStrategy(new PascalCaseStrategy()); | |
// Leere Beans sollen keine Fehler auslösen | |
objectMapper.disable(SerializationFeature.FAIL_ON_EMPTY_BEANS); | |
// Anmelden auf default-Topics im Message Broker | |
initConsumersGlobal(); | |
// Anmelden auf Events, welche rausgesendet werden sollen | |
initProducersGlobal(); | |
// Anmelden auf spielspezifisches Topic mit entsprechendem Spiel-EventManager, | |
// wenn ein Spiel hinzukommt | |
eventManager.addEventListener( | |
event -> { | |
initConsumersGame(event.getGameID(), event.getGame()); | |
initProducersGame(event.getGame(), event.getGameID()); | |
// Anmelden auf spielerspezifische Queues, wenn ein Spieler hinzukommt | |
event.getGame().getEventManager().addEventListener(event2 -> { | |
initConsumersPlayer(event.getGameID(), event.getGame(), event2.getPlayerID()); | |
initProducersPlayer(event.getGame(), event.getGameID(), event2.getPlayerID()); | |
}, PlayerAddedEvent.class); | |
}, GameCreatedEvent.class); | |
// Anmelden auf Gamespezifische Topics, wenn ein Game hinzukommt | |
// -- Gibt es nicht -- | |
} | |
/** | |
* @return Session-Objekt der Session mit dem internen Message Broker | |
*/ | |
public Session getSession() { | |
return session; | |
} | |
/** | |
* Erstellt Consumer für alle globalen Topics und meldet entsprechende | |
* Listener an. | |
*/ | |
private void initConsumersGlobal() { | |
// Nur globale Events, keine spieler- oder spielspezifischen Events | |
List<EventList> events = EventList.forType(EventType.GLOBAL); | |
for (EventList event : events) { | |
try { | |
Destination dest = session.createTopic(event.getNetworkName()); | |
MessageConsumer consumer = session.createConsumer(dest); | |
consumer.setMessageListener(new NetworkEventAdapter(objectMapper, event, eventManager)); | |
} catch (JMSException e) { | |
LOGGER.warning("Could not subscribe to global topic: " + event.getNetworkName()); | |
e.printStackTrace(); | |
} | |
} | |
} | |
/** | |
* Erstellt Consumer für alle Spielerspezifischen Queues und meldet | |
* entsprechende Listener an. | |
* | |
* @param gameID | |
* Spiele-ID, in dem der Spieler existiert | |
* @param playerID | |
* Spieler-ID | |
*/ | |
private void initConsumersPlayer(int gameID, Game game, int playerID) { | |
// Nur spielerspezifische Events, keine globalen oder spielspezifischen Events | |
List<EventList> events = EventList.forType(EventType.PLAYER); | |
for (EventList event : events) { | |
String queuename = GAME_PREFIX + gameID + "_" + PLAYER_PREFIX + playerID + "_" + event.getNetworkName(); | |
try { | |
Destination dest = session.createQueue(queuename); | |
MessageConsumer consumer = session.createConsumer(dest); | |
new Thread(() -> { | |
try { | |
consumer.setMessageListener(new NetworkEventPlayerAdapter(objectMapper, event, game.getEventManager(), playerID)); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}).start(); | |
game.addGameEndListener(g -> { | |
try { | |
consumer.close(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}); | |
game.getEventManager().addEventListener(ege -> { | |
if (ege.getPlayerID() != playerID) return; | |
try { | |
consumer.close(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}, ExitGameEvent.class); | |
} catch (JMSException e) { | |
LOGGER.warning("Could not subscribe to player queue:" + queuename); | |
e.printStackTrace(); | |
} | |
} | |
} | |
/** | |
* Erstellt Consumer für alle Spielspezifischen Topics und meldet | |
* entsprechende Listener an. | |
* | |
* @param gameID | |
* Spiele-ID | |
* @param gameEventManager | |
* EventManager des Spiels | |
*/ | |
private void initConsumersGame(int gameID, Game game) { | |
// Alle Spielspezifischen Events, die einkommen können | |
List<EventList> events = EventList.forType(EventType.GAME); | |
for (EventList event : events) { | |
String topicname = GAME_PREFIX + gameID + "_" + event.getNetworkName(); | |
try { | |
Destination dest = session.createTopic(topicname); | |
MessageConsumer consumer = session.createConsumer(dest); | |
consumer.setMessageListener(new NetworkEventGameAdapter(objectMapper, event, game.getEventManager())); | |
game.addGameEndListener(g -> { | |
try { | |
consumer.close(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}); | |
} catch (JMSException e) { | |
LOGGER.warning("Could not subscribe to game topic: " + topicname); | |
e.printStackTrace(); | |
} | |
} | |
} | |
/** | |
* Meldet sich für ausgehende Events auf dem Spiel-Eventlistener an und | |
* erstellt entsprechende Listener zum Senden. | |
* | |
* @param gameID | |
* ID des Spiels | |
* @param gameEventManager | |
* EventManager des Spiels | |
*/ | |
private void initProducersPlayer(Game game, int gameID, int playerID) { | |
// für alle ausgehenden Events anmelden | |
// Es gibt keine ausgehenden Spieler-Events. | |
// Auf Verbindungsverlust hören! | |
class ConnectionEvent implements Event {} | |
try { | |
String topicName = "ActiveMQ.Advisory.Consumer.Queue." + topicNameFromPlayerEvent(gameID, playerID, new ConnectionEvent()); | |
Destination dest = session.createTopic(topicName); | |
MessageConsumer consumer = session.createConsumer(dest); | |
consumer.setMessageListener(msg -> { | |
if (msg instanceof ActiveMQMessage) { | |
ActiveMQMessage aMsg = (ActiveMQMessage) msg; | |
if (aMsg.getDataStructure() instanceof RemoveInfo) { | |
LOGGER.info("Client hat verbindung verloren: " + playerID); | |
// Client hat Verbindung verloren => Spiel verlassen | |
ExitGameEvent event = new ExitGameEvent(); | |
event.setPlayerID(playerID); | |
game.getEventManager().onEvent(event); | |
try { | |
consumer.close(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
}); | |
game.addGameEndListener(g -> { | |
try { | |
consumer.close(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}); | |
game.getEventManager().addEventListener(ege -> { | |
if (ege.getPlayerID() != playerID) return; | |
try { | |
consumer.close(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
}, ExitGameEvent.class); | |
} catch (JMSException e) { | |
e.printStackTrace(); | |
} | |
} | |
/** | |
* Meldet sich für ausgehende Events auf dem Spiel-Eventlistener an und | |
* erstellt entsprechende Listener zum Senden. | |
* | |
* @param gameID | |
* ID des Spiels | |
* @param gameEventManager | |
* EventManager des Spiels | |
*/ | |
private void initProducersGame(Game game, int gameID) { | |
// für alle ausgehenden Events (=PublicClientEvent) anmelden | |
game.getEventManager().addEventListener(event -> { | |
try { | |
TextMessage message = session.createTextMessage(); | |
MessageProducer producer = null; | |
Destination dest = session.createTopic(topicNameFromGameEvent(gameID, event)); | |
message.setText(objectMapper.writeValueAsString(event)); | |
producer = session.createProducer(dest); | |
producer.send(message); | |
LOGGER.info("Sent game event: " + event.getClass().getSimpleName() + message.getText()); | |
} catch (Exception e1) { | |
e1.printStackTrace(); | |
} | |
} , PublicClientEvent.class); | |
} | |
/** | |
* Meldet sich beim EventListener an rauszuschickenden Events an, erstellt | |
* entsprechende Producer und verknüpft diese. | |
*/ | |
private void initProducersGlobal() { | |
// Das Interface ClientEvent besagt, dass dieses Event rausgeschickt werden soll | |
if (eventManager == null) { | |
throw new MissingEventManagerException(); | |
} | |
// Reguläres Topic ermitteln und verschicken | |
eventManager.addEventListener(event -> { | |
try { | |
TextMessage message = session.createTextMessage(); | |
MessageProducer producer = null; | |
Destination dest = session.createTopic(topicNameFromClientEvent(event)); | |
message.setText(objectMapper.writeValueAsString(event)); | |
producer = session.createProducer(dest); | |
producer.send(message); | |
LOGGER.info("Sent global event: " + event.getClass().getSimpleName() + message.getText()); | |
} catch (Exception e1) { | |
e1.printStackTrace(); | |
} | |
} , PublicClientEvent.class); | |
} | |
/** | |
* Generiert einen Topic Name für ein ClientEvent | |
* | |
* @param event | |
* Das ClientEvent, für das ein Topicname erstellt werden soll | |
* @return Ein systematisch aufgebauter Topic Name | |
*/ | |
private String topicNameFromClientEvent(PublicClientEvent event) { | |
return String.format("%s%s", | |
EVENT_PREFIX, event.getClass().getSimpleName()); | |
} | |
/** | |
* Generiert einen Topic Name für ein GameEvent | |
* | |
* @param gameID | |
* die gameID, aus dem das Event stammt | |
* @param gameEvent | |
* das Event aus dem EventManasger des Spiels | |
* @return Ein systematisch aufgebauter Topic Name | |
*/ | |
private String topicNameFromGameEvent(int gameID, Event gameEvent) { | |
return String.format("%s%d_%s%s", | |
GAME_PREFIX, gameID, | |
EVENT_PREFIX, gameEvent.getClass().getSimpleName()); | |
} | |
/** | |
* Generiert einen Topic Name für ein spielerspezifisches Event | |
* | |
* @param gameID | |
* die gameID, aus dem das Event stammt | |
* @param playerID | |
* die SpielerID, dem das Event zugeordnet ist | |
* @param gameEvent | |
* das Event aus dem EventManasger des Spiels | |
* @return Ein systematisch aufgebauter Topic Name | |
*/ | |
private String topicNameFromPlayerEvent(int gameID, int playerID, Event playerEvent) { | |
return String.format("%s%d_%s%d_%s%s", | |
GAME_PREFIX, gameID, | |
PLAYER_PREFIX, playerID, | |
EVENT_PREFIX, playerEvent.getClass().getSimpleName()); | |
} | |
/** | |
* Aktiviert die LastImageSubscriptionRecoveryPolicy, sodass die Letzte | |
* bereits gesendete Nachrichten auch bei nachträglichem Anmelden an einem | |
* Topic gesendet werden kann. | |
* | |
* @param broker | |
* Message Broker, bei dem diese Aktion durchgeführt wird. | |
*/ | |
private void setLastMessagesPersistent(BrokerService broker) { | |
LastImageSubscriptionRecoveryPolicy policy = new LastImageSubscriptionRecoveryPolicy(); | |
PolicyEntry pe = new PolicyEntry(); | |
pe.setSubscriptionRecoveryPolicy(policy); | |
PolicyMap pm = new PolicyMap(); | |
pm.setDefaultEntry(pe); | |
broker.setDestinationPolicy(pm); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment