Skip to content

Instantly share code, notes, and snippets.

@andrewgilmartin
Last active August 12, 2018 22:50
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save andrewgilmartin/be3b01770245028633f4 to your computer and use it in GitHub Desktop.
The Observer Pattern
/**
* The Observer pattern is used to create a relationship between two objects.
* The relationship is usually unidirectional with the observer waiting for
* notices from the observed. The relationships is a loose one as the observed
* only needs to know of the interest of the observer and the observer only
* needs to know of the set of and ordering of notice the observed will send. A
* downside of this looseness is that all notices pass through a single observer
* method for further dispatching.
*/
package com.andrewgilmartin.common.observation;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
/**
* The Observation class is only used to group all the related interfaces and
* classes into one file. In a typical development environment each interface
* and class would be in its own file.
*/
public class Observation {
/**
* The Observable is the object that is being watched. As changes are
* prepared and then made the observable will send consent and information
* notices to the observers. The notices are normally specialized classes
* that hold some context about the change. For example, if the notice
* concerns the addition of new inventory to the warehouse then the notice's
* class could have a method for enumerating the new inventory.
*/
public interface Observable {
/**
* If it is necessary for the observable to have approval before making
* a change then a consent notification is sent to the observers. Each
* observer will be notified and if any observer opposes the change then
* it must return false. The current thread will be used to notify the
* observers and so all the observed must wait for all observers to
* consent.
*/
boolean consentNotification(Object notice);
/**
* An information notification is normally sent after a change. Since
* the change has already occurred the notices are typically sent
* asynchronously by a background thread. The order of the notices is
* preserved, however.
*/
void informationNotification(Object notice);
}
/**
* The observer is the object that is notified by the observable. There is
* no typed relationship beyond the Observable and Observer classes. As
* mentioned earlier, the notices are usually of specialized classes where
* each notice instance holds data relevant to the change.
*/
public interface Observer {
boolean notice(Observable observable, Object notice);
}
/**
* A registry is the means of establishing the relationship between the
* observable and the observer. This interface is distinct from Observable
* as it is sometimes useful to indirectly register an observer via, for
* example, a registrar.
*/
public interface Registry {
/**
* The order of the observers is undefined.
*/
Set<Observer> getObservers();
/**
* Adds the observer to the set of observers. Returns true if the
* observer was added.
*/
boolean addObserver(Observer observer);
/**
* Removes the observer from the set of observers. Returns true if the
* observer was among the observers and was removed.
*/
boolean deleteObserver(Observer observer);
}
/**
* There is often very little difference between implementations of
* Observable and Registry and so this base implementation can be widely
* employed by any class that wants to be observed.
*
* When extending this class make sure to document the set of notices, their
* consent or information role, and what is their ordering.
*/
public static class ObservableBase implements Observable, Registry {
/**
* The management of the set of observers needs to be thread-safe. The
* set is expected to be mostly stable over the life of the observed and
* so copy-on-write semantics is appropriate here.
*/
private final Set<Observer> observers = new CopyOnWriteArraySet<Observer>();
/**
* Information notification notices will be sent by a background thread.
* A blocking queue will be used to coordinate the passing of notices
* from the observed to this background thread.
*/
private final BlockingQueue<Object> informationNotices = new LinkedBlockingQueue<Object>();
public ObservableBase() {
/**
* This implementation of the information notification background
* thread is quite simple and so uses an anonymous class for the
* implementation.
*/
Thread informationEventsDispatcher = new Thread(new Runnable() {
@Override
public void run() {
try {
/**
* Here the thread waits for a new notice on the queue
* and then sends it to each of the current observers.
*/
for (;;) {
Object notice = informationNotices.take();
for (Observer observer : observers) {
observer.notice(ObservableBase.this, notice);
}
}
}
catch (InterruptedException e) {
// empty
}
}
});
informationEventsDispatcher.setDaemon(true);
informationEventsDispatcher.start();
}
@Override
public boolean consentNotification(Object notice) {
/**
* As mentioned earlier, consent notifications are performed by the
* observed's thread. In this way, as soon as any observer opposes
* the change the observed must reject the change.
*/
for (Observer observer : observers) {
if (!observer.notice(this, notice)) {
return false;
}
}
return true;
}
@Override
public void informationNotification(Object notice) {
/**
* Pass along the notice to the background thread.
*/
informationNotices.add(notice);
}
@Override
public Set<Observer> getObservers() {
return observers;
}
@Override
public boolean addObserver(Observer observer) {
return observers.add(observer);
}
@Override
public boolean deleteObserver(Observer observer) {
return observers.remove(observer);
}
}
/**
* Here is a small example of using the observation interfaces and classes.
*/
public static void main(String... args) throws Exception {
class Notice {
private int senderId;
private int sequenceNumber;
public Notice(int senderId, int sequenceNumber) {
this.senderId = senderId;
this.sequenceNumber = sequenceNumber;
}
public int getSenderId() {
return senderId;
}
public int getSequenceNumber() {
return sequenceNumber;
}
}
/**
* The Sender is an observed. All that it does it to send a stream of
* notices consisting of sender-id & sequence-number pairs.
*/
class Sender extends ObservableBase implements Runnable {
private int senderId;
public Sender(int id) {
this.senderId = id;
}
@Override
public void run() {
for (int sequenceNumber = 0;; sequenceNumber++) {
informationNotification(new Notice(senderId, sequenceNumber));
sleep(); // add some randomness to the processing.
}
}
}
/**
* The Receiver is an observer. All that it does is to print the
* notice's facts. In this example all notices are information
* notifications and so the notice() return value does not matter.
* However, as a matter of course, notice() should always return true
* unless it is well sure of the consequences of opposing the change.
*/
class Receiver implements Observer {
private int receiverId;
public Receiver(int id) {
this.receiverId = id;
}
@Override
public boolean notice(Observable observable, Object notice) {
if (notice instanceof Notice) {
Notice n = (Notice) notice;
System.out.printf("notice %d %d %d\n", receiverId, n.getSenderId(), n.getSequenceNumber());
sleep(); // add some randomness to the processing.
}
return true;
}
}
/**
* Create a few senders.
*/
Sender[] senders = new Sender[5];
for (int i = 0; i < senders.length; i++) {
senders[i] = new Sender(i);
}
/**
* Create a few receivers.
*/
Receiver[] receivers = new Receiver[3];
for (int i = 0; i < receivers.length; i++) {
receivers[i] = new Receiver(i);
}
/**
* Have each receiver observe each sender
*/
for (Receiver r : receivers) {
for (Sender s : senders) {
s.addObserver(r);
}
}
/**
* Startup the senders
*/
for (Sender s : senders) {
Thread t = new Thread(s);
t.setDaemon(false);
t.start();
}
}
/**
* Add some sleep of random duration to the current thread.
*/
static void sleep() {
try {
Thread.sleep(Math.round(Math.random() * 25));
}
catch (InterruptedException e) {
// empty
}
}
}
/**
* To build and run this from the command line, first compile using
*
* {@code javac -d /tmp ./src/com/andrewgilmartin/common/observation/Observation
* }
*
* And then run using
*
* {@code java -classpath /tmp com.andrewgilmartin.common.observation.Observation
* }
*
* You can show to yourself that the events are ordered by sorting the output on
* the receiver id and you will see that the events are in numeric order.
*
* {@code java -classpath /tmp com.andrewgilmartin.common.observation.Observation | head -20 | sort -k 2 -n -s
* }
*/
// END
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment