Skip to content

Instantly share code, notes, and snippets.

@yomusu
Last active August 29, 2015 14:08
Show Gist options
  • Save yomusu/bd1d2a5344532ec7d69e to your computer and use it in GitHub Desktop.
Save yomusu/bd1d2a5344532ec7d69e to your computer and use it in GitHub Desktop.
Push配信サーバー with Go
package main
import (
"fmt"
"log"
"net/http"
"sync"
"time"
)
func handler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.FormValue("key")
outln := func(w http.ResponseWriter, mes string) {
w.Write([]byte(mes))
w.Write([]byte("\n"))
w.(http.Flusher).Flush()
log.Println(fmt.Sprintf("[%s] %s", key, mes))
}
// Transfer-Encoding: chunked
w.WriteHeader(http.StatusOK)
outln(w, "start web handler.")
rec := receivers.Add(key)
defer receivers.Remove(rec)
// 通信切断されたら抜ける
closeNotify := w.(http.CloseNotifier).CloseNotify()
// 何かしら結果が出るまでループする
Loop:
for {
select {
case mes, ok := <-rec.Ch:
if ok {
w.Write([]byte(mes))
w.Write([]byte("\n"))
w.(http.Flusher).Flush()
} else {
outln(w, "message is over")
break Loop
}
case <-closeNotify:
log.Println("session closed")
return
}
}
outln(w, "bye")
}
//============================================
var receivers ReceiverList
type Receiver struct {
Ch chan string
Key string
}
func CreateReceiver(key string) *Receiver {
r := new(Receiver)
// バッファ付きのchannelを作成
r.Ch = make(chan string, 10)
r.Key = key
return r
}
type ReceiverList struct {
Receivers []*Receiver
lock sync.Mutex
}
// 新規Receiverを作成&追加する
func (s *ReceiverList) Add(key string) (ar *Receiver) {
// Receiverを作成
ar = CreateReceiver(key)
// channelリストをLock
s.lock.Lock()
defer s.lock.Unlock()
// 配列にnilがあればそこにセットする
for i, r := range s.Receivers {
if r == nil {
s.Receivers[i] = ar
log.Printf("Add:put to index:%d\n", i)
return
}
}
// nilがないので最後に追加する
s.Receivers = append(s.Receivers, ar)
log.Printf("Add:appended\n")
return
}
// Receiverの削除
func (s *ReceiverList) Remove(dr *Receiver) {
s.lock.Lock()
defer s.lock.Unlock()
// ここでchannelをcloseすることに意味があるのか??
close(dr.Ch)
// 同じchannelを探してnilする
for i, r := range s.Receivers {
if r == dr {
s.Receivers[i] = nil
log.Printf("Remove : removed index:%d\n", i)
return
}
}
log.Printf("Remove : not found...\n")
}
// イベントを全てのReceiverに送信する
func (s *ReceiverList) FireToAll(mes string) {
s.lock.Lock()
defer s.lock.Unlock()
for i, r := range s.Receivers {
if r != nil {
select {
case r.Ch <- mes:
case <-time.After(time.Second * 10):
s.Receivers[i] = nil
log.Printf("time out index:%d\n", i)
// そのレシーバーは削除する
}
}
}
}
// イベントをkeyを持ったReceiver(複数あり)に送信する。でも遅い
func (s *ReceiverList) FireTo(key string, mes string) {
s.lock.Lock()
defer s.lock.Unlock()
for i, r := range s.Receivers {
if r != nil && r.Key == key {
select {
case r.Ch <- mes:
case <-time.After(time.Second * 10):
s.Receivers[i] = nil
log.Printf("time out index:%d\n", i)
// そのレシーバーは削除する
}
}
}
}
//============================================
// サンプルデータを定期的にブロードキャストする
func messageGenerater() {
i := 0
for {
time.Sleep(time.Second * 1)
mes := fmt.Sprintf("hoge:%d", i)
receivers.FireToAll(mes)
i++
log.Printf("message gen : %d\n", i)
}
}
func main() {
go messageGenerater()
// 配信セッションの接続先
http.HandleFunc("/con", handler)
// 特定のkeyを持つセッションにメッセージを通知
http.HandleFunc("/api/push", handlerPushToOne)
http.ListenAndServe(":8085", nil)
}
func handlerPushToOne(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "OPTIONS":
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "content-type")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
w.Header().Set("Connection", "close")
return
case "GET":
case "POST":
r.ParseForm()
key := r.FormValue("key")
mes := r.FormValue("mes")
log.Printf("push : key=%s, mes=%s", key, mes)
//
receivers.FireTo(key, mes)
// 返り値
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprint(w, "OK")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment