|
package main |
|
|
|
/* |
|
Assumptions |
|
----------- |
|
|
|
- All peers listen for new peers on their designated port and address. |
|
|
|
- All peers maintain a list of addresses of all peers. Every peer is |
|
identified by a unique number among all peers. |
|
|
|
- Once a peer is registered in the network of peers, it never leaves again. |
|
|
|
Command line arguments |
|
---------------------- |
|
|
|
The program is called with two arguments: |
|
|
|
./p2p-chat <listen-addr> [<peer-addr>] |
|
|
|
<listen-addr> is the address to listen on. <peer-addr> is the address of a |
|
peer in an existing peer-to-peer chat network. If it is left out, this one is |
|
made the first peer in a new network. |
|
|
|
|
|
User interface |
|
-------------- |
|
|
|
We use a simple text-mode prompting. There are two commands: |
|
|
|
- "LIST" prints a list of available peers to chat with. |
|
|
|
- "<nr>: <text>" sends <text> to the peer identified by <nr>. |
|
|
|
Mutual exclusion |
|
---------------- |
|
|
|
We use the Ricart-Agrawala algorithm to ensure mutual exclusion. However, as |
|
we are testing on one machine, we simply use system time for the time. On a |
|
real distributed system, a Lamport clock or something like that would be |
|
necessary. |
|
|
|
- When a peer wants to enter the critical section, it sends |
|
|
|
DEMAND <nr> <time>\n |
|
|
|
to the others. <nr> is the peer's number. <time> is a Unix time in seconds. |
|
|
|
- The OK message for the algorithm looks like this: |
|
|
|
CSOK\n |
|
|
|
See the old article for an explanation. |
|
|
|
|
|
Accepting new peers |
|
------------------- |
|
|
|
- When a new peer arrives, it prepares for receiving any messages of the chat |
|
and mutual exclusion protocols.. Then it sends |
|
|
|
NEW <address>\n |
|
|
|
to a peer it knows of. <address> is the address on which this peer listens. |
|
It then waits for the number of the contacted peer, its own number and a |
|
list of known peers to be sent to it. After it has received the NOMORE |
|
message, it considers itself a registered peer. |
|
|
|
- When a peer receives a NEW message from a to-be peer, it demands access to |
|
the critical section. When it gets there, it assigns a number to |
|
the to-be peer, updates its list of peers and sends the current list of |
|
peers to the to-be peer in this form: |
|
|
|
ME <pnr>\n |
|
YOU <nnr>\n |
|
PEER <nr> <address>\n |
|
... |
|
PEER <nr> <address>\n |
|
NOMORE |
|
|
|
<pnr> is the number of this peer. <nnr> is the number for the new peer. |
|
After that, this peer sends |
|
|
|
REGISTERED <nr> <address>\n |
|
|
|
to all peers in its list. <nr> is the unique number of the new peer and |
|
<address> is its address. It waits to receive as many REGOK messages as |
|
it sent REGISTERED messages. After that, it leaves the critical section. |
|
|
|
- When a peer receives a REGISTERED message, it adds the newly registered peer |
|
to its list of peers and sends back: |
|
|
|
REGOK\n |
|
|
|
|
|
Chat protocol |
|
------------- |
|
|
|
- Peers send messages to other peers by sending: |
|
|
|
MSG <nr> <text>\n |
|
|
|
<nr> is the unique number of the sending peer. <text> must not contain |
|
newline characters. |
|
|
|
Bugs |
|
---- |
|
|
|
- I'm not familiar enough with networking and IP stuff. Therefore I don't know |
|
whether this program works running on different machines. However, with |
|
distinct processes listening on different ports on one machine it does what |
|
it's supposed to do. |
|
|
|
- I have not checked whether I close all connections. |
|
|
|
- Error handling consists of panicking. |
|
*/ |
|
|
|
import ( |
|
"bufio" |
|
"fmt" |
|
"net" |
|
"os" |
|
"strconv" |
|
"strings" |
|
"sync" |
|
"time" |
|
) |
|
|
|
const ( |
|
CSOK = "CSOK" |
|
DEMAND = "DEMAND" |
|
LIST = "LIST" |
|
ME = "TARZAN" // Shouldn't do this. |
|
MSG = "MSG" |
|
NEW = "NEW" |
|
NOMORE = "NOMORE" |
|
PEER = "PEER" |
|
REGISTERED = "REGISTERED" |
|
REGOK = "REGOK" |
|
YOU = "JANE" |
|
) |
|
|
|
|
|
func main() { |
|
// Catch error of providing no addresses |
|
if len(os.Args) == 1 || len(os.Args) > 3 { |
|
panic("Illegal number of command line arguments.") |
|
} |
|
|
|
// Create empty list of peers |
|
peers := newPeermap() |
|
|
|
// Listen at specified address |
|
mineAddr := os.Args[1] |
|
ln, err := net.Listen("tcp", mineAddr) |
|
if err != nil { panic("Cannot listen at specified address.") } |
|
|
|
// Start goroutines for handling incoming connections |
|
go handleIn(ln, peers) |
|
|
|
// If I am to open a new chat network, make me peer no. 1 |
|
if len(os.Args) == 2 { |
|
peers.setMyNr(1) |
|
|
|
// If there is a peer to connect to, connect to the network through it |
|
} else { |
|
// Connect to existing peer |
|
peerAddr := os.Args[2] |
|
conn, err := net.Dial("tcp", peerAddr) |
|
if err != nil { panic("Cannot connect to " + peerAddr) } |
|
|
|
// Request to be a peer |
|
_, err0 := fmt.Fprintf(conn, "%s %s\n", NEW, mineAddr) |
|
if err0 != nil { panic("Cannot send address.") } |
|
|
|
// Read the number of the contacted peer |
|
var peerNr int |
|
peerReader := bufio.NewReader(conn) |
|
meResp := panickingRead(peerReader) |
|
_, err5 := fmt.Sscanf(meResp, ME + " %d\n", &peerNr) |
|
if err5 != nil { panic("Didn't get proper ME reply." + err5.Error()) } |
|
peers.put(peerNr, peerAddr) |
|
|
|
// Read my own number |
|
var ownNr int |
|
youResp := panickingRead(peerReader) |
|
_, err6 := fmt.Sscanf(youResp, YOU + " %d\n", &ownNr) |
|
if err6 != nil { panic("Didn't get proper YOU reply.") } |
|
peers.setMyNr(ownNr) |
|
|
|
// Read the list of peers in the network |
|
for { |
|
// Read a line removing the newline character |
|
nextPeer, err := peerReader.ReadString('\n') |
|
if err != nil { panic("Problem reading peer list.") } |
|
nextPeer = strings.TrimRight(nextPeer, "\n") |
|
|
|
// Break up peer information |
|
info := strings.Split(nextPeer, " ") |
|
|
|
// Stop reading if there are no more peers |
|
if info[0] == NOMORE { // Might use EOF instead? |
|
break |
|
} |
|
|
|
// Store peer information in our list of peers |
|
nr, err := strconv.Atoi(info[1]) |
|
if err != nil { panic("Illegal number format.") } |
|
addr := info[2] |
|
peers.put(nr, addr) |
|
} |
|
|
|
// Close the connection |
|
if conn.Close() != nil { panic("Error closing connection.") } |
|
} |
|
|
|
// Read commands from the user |
|
commandReader := bufio.NewReader(os.Stdin) |
|
if err != nil { panic("Cannot open STDIN for reading.") } |
|
for { |
|
// Read a command |
|
command, err := commandReader.ReadString('\n') |
|
if err != nil { panic("Error reading from STDIN.") } |
|
|
|
// If it was LIST, print a list of peers (include ourselves) |
|
if command == LIST + "\n" { |
|
fmt.Println("These peers are available:", peers.keys()) |
|
|
|
// If not, it should have been a message thing |
|
} else { |
|
// Parse it |
|
parts := strings.Split(command, ": ") |
|
fmt.Println(parts) |
|
nr, err1 := strconv.Atoi(parts[0]) |
|
msg := parts[1] |
|
if err1 != nil { |
|
fmt.Fprintln(os.Stderr, "Invalid command:", err1.Error()) |
|
continue |
|
} |
|
|
|
// Connect to specified peer and send the message |
|
conn, err := net.Dial("tcp", peers.get(nr)) |
|
if err != nil { panic("Cannot connect to peer. " + err.Error()) } |
|
_, err3 := fmt.Fprintf( |
|
conn, |
|
"%s %d %s\n", |
|
MSG, |
|
peers.getMyNr(), |
|
msg) |
|
if err3 != nil { panic("Cannot send.") } |
|
if conn.Close() != nil { panic("Error closing connection.") } |
|
} |
|
} |
|
} |
|
|
|
// handleIn handles connections coming in through the specified listener |
|
func handleIn(ln net.Listener, peers *peermap) { |
|
// Start handler for actions on critical section |
|
lockCh := make(chan *lockRequest) |
|
demandCh := make(chan demand) |
|
go raServer(peers, lockCh, demandCh) |
|
|
|
// Go on accepting connections indefinitely |
|
for { |
|
// Accept a connection |
|
connection, err := ln.Accept() |
|
if err != nil { panic("Error accepting connection.") } |
|
|
|
// Handle all connections simultaneously |
|
go handleConnection(connection, peers, lockCh, demandCh) |
|
} |
|
} |
|
|
|
// Handles an incoming connection (just the first command) |
|
func handleConnection(conn net.Conn, peers *peermap, lockCh chan |
|
*lockRequest, demandCh chan demand) { |
|
// Read a line |
|
rd := bufio.NewReader(conn) |
|
cmd, err := rd.ReadString('\n') |
|
if err != nil { panic("Error reading. " + err.Error()) } |
|
|
|
// Chomp it and split it up |
|
cmd = strings.TrimRight(cmd, "\n") |
|
fields := strings.Split(cmd, " ") |
|
|
|
// Take action depending on first field |
|
switch fields[0] { |
|
// Print simple messages |
|
case MSG: |
|
fmt.Printf("Nr. %s says: %s\n", fields[1], fields[2:]) |
|
|
|
// Register new peers in our own list of peers |
|
case REGISTERED: |
|
fmt.Printf("This part is not called.") |
|
// Add it to the list |
|
nr, err := strconv.Atoi(fields[1]) |
|
if err != nil { panic("Illegal number format") } |
|
peers.put(nr, fields[2]) |
|
|
|
// Inform about success |
|
_, err4 := fmt.Fprint(conn, REGOK, "\n") |
|
if err4 != nil { panic("Error sending REGOK.") } |
|
|
|
// Accept and introduce a new peer to the other peers in our list of peers |
|
case NEW: |
|
// Demand access to critical section |
|
req := &lockRequest{ |
|
nr : peers.getMyNr(), |
|
utime : time.Now().Unix(), |
|
freeCh : make(chan bool), |
|
unlockCh : make(chan bool), |
|
} |
|
lockCh <- req |
|
<-req.freeCh |
|
|
|
// Find out the currently highest number of a peer |
|
peernrs := peers.keys() |
|
maxNr := peers.getMyNr() |
|
for _, nr := range peernrs { |
|
if nr > maxNr { |
|
maxNr = nr |
|
} |
|
} |
|
|
|
// Assign a higher number to the new peer |
|
newPeerAddr := fields[1] |
|
newPeerNr := maxNr + 1 |
|
|
|
// Send my own number |
|
panickingPrint(conn, fmt.Sprintf("%s %d\n", ME, peers.getMyNr())) |
|
|
|
// Send its new number |
|
panickingPrint(conn, fmt.Sprintf("%s %d\n", YOU, newPeerNr)) |
|
|
|
// Send information about the other peers |
|
for _, n := range peernrs { |
|
panickingPrint( |
|
conn, |
|
fmt.Sprintf("%s %d %s\n", PEER, n, peers.get(n)) ) |
|
} |
|
|
|
// Send the NOMORE |
|
panickingPrint(conn, fmt.Sprintf("%s\n", NOMORE, newPeerNr)) |
|
|
|
// Send our registration message, waiting for confirmation |
|
broadcastExpBlock( |
|
peers, |
|
fmt.Sprintf("%s %d %s\n", REGISTERED, newPeerNr, newPeerAddr), |
|
REGOK) |
|
|
|
// Also put the peer in our own list of peers |
|
peers.put(newPeerNr, newPeerAddr) |
|
|
|
// Leave the critical section |
|
req.unlockCh <- true |
|
|
|
// Someone wants to access the critical section |
|
case DEMAND: |
|
nr, err := strconv.Atoi(fields[1]) |
|
if err != nil { panic("Strange peer number.") } |
|
utime, err := strconv.ParseInt(fields[2], 10, 64) |
|
if err != nil { panic("Strange time.") } |
|
|
|
demandCh <- demand{ nr, utime, conn } |
|
|
|
// Don't accept any other stuff |
|
default: |
|
panic("Received illegal command.") |
|
} |
|
} |
|
|
|
// broadcastExpecting expecting sends the same message to all peers and waits |
|
// for them to respond with expResp. |
|
func broadcastExpecting(peers *peermap, message, expResp string, |
|
done chan bool) { |
|
// Go through all peers |
|
for _, nr := range peers.keys() { |
|
addr := peers.get(nr) // Very inefficient! Use an iterator! |
|
|
|
// Send message and wait for answer |
|
go func() { |
|
// Set up connection |
|
conn, reader := panickingSetup(addr) |
|
|
|
// Send our message |
|
panickingPrint(conn, message) |
|
|
|
// Wait for confirmation |
|
ans := panickingRead(reader) |
|
// works because of dedicated connection |
|
if strings.TrimRight(ans, "\n") != expResp { |
|
panic("Confirmation different from the expected one.") |
|
} |
|
|
|
// Announce that we have received the confirmation |
|
done <- true |
|
|
|
// Close connection |
|
if conn.Close() != nil { |
|
panic("Error closing connection.") |
|
} |
|
}() |
|
} |
|
} |
|
|
|
// broadcastExpBlock expecting sends the same message to all peers and waits |
|
// for them to respond with expResp. |
|
func broadcastExpBlock(peers *peermap, message, expResp string) { |
|
// Send a message to everyone |
|
dones := make(chan bool) |
|
broadcastExpecting(peers, message, expResp, dones) |
|
|
|
// Wait until everyone has accepted the new peer |
|
for i := 0; i < peers.len(); i++ { |
|
<-dones |
|
} |
|
} |
|
|
|
|
|
// demand contains the information necessary to handle incoming DEMANDs |
|
type demand struct { |
|
nr int // the number of the peer sending the DEMAND |
|
utime int64 // the Unix time of the demand in seconds |
|
conn net.Conn // connection through to answer through |
|
} |
|
|
|
// lockRequest contains the information necessary to do the Ricart-Agrawala |
|
// locking stuff on the critical section |
|
type lockRequest struct { |
|
nr int // number of the requesting process |
|
utime int64 // Unix time of the request in seconds |
|
freeCh chan bool // channel to wait for until the c. s. is free |
|
unlockCh chan bool // channel to expect unlocks from |
|
} |
|
|
|
// raServer controls the locking of the critical section through the |
|
// Ricart-Agrawala algorithm |
|
func raServer(peers *peermap, lockCh chan *lockRequest, |
|
demandCh chan demand) { |
|
var curLockRequest *lockRequest |
|
delayedDemands := make([]demand, 0, peers.len()) |
|
outstOks := 0 |
|
csokCh := make(chan bool) |
|
|
|
// Go on forever |
|
for { |
|
select { |
|
// Wait for CSOKs and give access to c. s. once we've received all |
|
case <-csokCh: |
|
outstOks-- |
|
if outstOks == 0 { |
|
curLockRequest.freeCh <- true |
|
} |
|
|
|
// If there's no other request going on, start entry protocol |
|
case curLockRequest = <-when(curLockRequest == nil, lockCh): |
|
// Proceed if we're the only one in the world |
|
outstOks = peers.len() |
|
if outstOks == 0 { |
|
curLockRequest.freeCh <- true |
|
|
|
// Send DEMANDs to all peers |
|
} else { |
|
broadcastExpecting( |
|
peers, |
|
fmt.Sprintf( |
|
"%s %d %d\n", |
|
DEMAND, |
|
curLockRequest.nr, |
|
curLockRequest.utime), |
|
CSOK, |
|
csokCh) |
|
} |
|
|
|
// When we have the lock, unlock it (see comment at lazyWhenBool) |
|
case <-lazyWhenBool( |
|
curLockRequest != nil, |
|
func() chan bool { return curLockRequest.unlockCh }): |
|
// Send CSOK to all peers that were waiting |
|
for _, d := range delayedDemands { |
|
sendCSOK(d.conn) |
|
} |
|
delayedDemands = nil |
|
|
|
// End the current request |
|
curLockRequest = nil |
|
|
|
// When someone else wants to lock the critical section |
|
case extDemand := <-demandCh: |
|
// Send OK if we don't want access or if he comes before us |
|
if curLockRequest == nil || curLockRequest.after(extDemand) { |
|
sendCSOK(extDemand.conn) |
|
|
|
// Append them to the list of those who will be OKed when we leavs |
|
} else { |
|
delayedDemands = append(delayedDemands, extDemand) |
|
} |
|
} |
|
} |
|
} |
|
|
|
// sendCSOK sends CSOK to the specified connection and closes it |
|
func sendCSOK(conn net.Conn) { |
|
panickingPrint(conn, CSOK + "\n") |
|
if conn.Close() != nil { |
|
panic("Error closing connection.") |
|
} |
|
} |
|
|
|
// after returns bool if the specified lockRequest should obtain the lock |
|
// after the specified demand. |
|
func (l *lockRequest) after(d demand) bool { |
|
if l.utime == d.utime { |
|
return l.nr > d.nr |
|
} else { |
|
return l.utime > d.utime |
|
} |
|
} |
|
|
|
/* |
|
peermap is a map mapping the number of a peer to its address with access |
|
protected by a reader-writer lock. Additionally, it contains the number and |
|
address of this peer in extra fields. |
|
|
|
Conrad had the idea of ensuring mutually exclusive access to a variable by |
|
putting it into a channel of capacity 1. Every process has to take it from the |
|
channel and then no one else can access it. This idea came to my mind after I |
|
had implemented this peermap and I might have used it. However, now it is to |
|
late and we don't want completely mutually exclusive access anyway. |
|
|
|
A variation of that idea could be used to implement the latch thing |
|
(WaitGroup). |
|
*/ |
|
type peermap struct { |
|
myNr int |
|
myLatch sync.WaitGroup |
|
addrFor map[int]string |
|
m sync.RWMutex |
|
} |
|
|
|
// newPeermap returns a pointer to a new, empty, peermap |
|
func newPeermap() *peermap { |
|
p := new(peermap) |
|
p.myLatch.Add(1) |
|
p.addrFor = make(map[int]string) |
|
// others initialised to zero value |
|
return p |
|
|
|
} |
|
|
|
// put puts a peer's number and address in the map |
|
func (s *peermap) put(nr int, addr string) { |
|
s.m.Lock() |
|
s.addrFor[nr] = addr |
|
s.m.Unlock() |
|
} |
|
|
|
// get retrieves a peer's address (according to its number) from the map |
|
func (s *peermap) get(nr int) (addr string) { |
|
s.m.RLock() |
|
addr = s.addrFor[nr] |
|
s.m.RUnlock() |
|
return |
|
} |
|
|
|
// len returns the number of elements in the map |
|
func (s *peermap) len() (n int) { |
|
s.m.RLock() |
|
n = len(s.addrFor) |
|
s.m.RUnlock() |
|
return |
|
} |
|
|
|
// keys returns a slice of all keys in the map |
|
func (s *peermap) keys() (ks []int) { |
|
s.m.RLock() |
|
|
|
ks = make([]int, 0, len(s.addrFor)) |
|
for k, _ := range s.addrFor { |
|
ks = append(ks, k) |
|
} |
|
|
|
s.m.RUnlock() |
|
return |
|
} |
|
|
|
// setNr sets the number of this peer |
|
func (s *peermap) setMyNr(nr int) { |
|
s.myNr = nr |
|
s.myLatch.Done() |
|
} |
|
|
|
// myNr returns the number of this peer, waiting if it is not yet set |
|
func (s *peermap) getMyNr() int { |
|
s.myLatch.Wait() |
|
return s.myNr |
|
} |
|
|
|
// panickingSetup opens a connection to the specified address returns the |
|
// Conn and a bufio.Reader for that connection. On errors it panics. |
|
func panickingSetup(addr string) (net.Conn, *bufio.Reader) { |
|
conn, err := net.Dial("tcp", addr) |
|
if err != nil { panic("Couldn't set up connection: " + err.Error()) } |
|
|
|
return conn, bufio.NewReader(conn) |
|
} |
|
|
|
// panickingPrint print the specified message to the specified connection. On |
|
// errors it panics. |
|
func panickingPrint(conn net.Conn, msg string) { |
|
_, err := fmt.Fprint(conn, msg) |
|
if err != nil { panic("Couldn't write to this connection.") } |
|
} |
|
|
|
// panickingRead reads from the specified bufio.Reader up to the next newline |
|
// character and returns what it has read. On errors it panics. |
|
func panickingRead(reader *bufio.Reader) string { |
|
res, err := reader.ReadString('\n') |
|
if err != nil { panic("Couldn't read from this reader.") } |
|
return res |
|
} |
|
|
|
func when(isTrue bool, ch chan *lockRequest) chan *lockRequest { |
|
if isTrue { |
|
return ch |
|
} else { |
|
return nil |
|
} |
|
} |
|
|
|
// lazyWhenBool is a when for returning bools with lazy evaluation. |
|
// You have to wrap the channel for the second argument in a constant function |
|
// returning that channel in order for it to be evaluated lazily. |
|
func lazyWhenBool(isTrue bool, chFunc func() chan bool) chan bool { |
|
if isTrue { |
|
return chFunc() |
|
} else { |
|
return nil |
|
} |
|
} |