Skip to content

Instantly share code, notes, and snippets.

@chanjarster
Created August 3, 2020 05:58
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save chanjarster/a3b6957ad1f4117038722372cafbecb9 to your computer and use it in GitHub Desktop.
Save chanjarster/a3b6957ad1f4117038722372cafbecb9 to your computer and use it in GitHub Desktop.
极客时间大爷胡同口优化
// https://time.geekbang.org/column/article/126504
// 张大爷在胡同口等着 ...
// 碰见一个李大爷:127.0.0.1:59668
// 耗时: 40.354851ms
package main
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"net"
"sync"
"time"
)
var zRecvCount = uint32(0) // 张大爷听到了多少句话
var lRecvCount = uint32(0) // 李大爷听到了多少句话
var total = uint32(100000) // 总共需要遇见多少次
var z0 = "吃了没,您吶?"
var z3 = "嗨!吃饱了溜溜弯儿。"
var z5 = "回头去给老太太请安!"
var l1 = "刚吃。"
var l2 = "您这,嘛去?"
var l4 = "有空家里坐坐啊。"
type RequestResponse struct {
Serial uint32 // 序号
Payload string // 内容
}
var liChan = make(chan *RequestResponse, 10000) // 李大爷channel
var zhangChan = make(chan *RequestResponse, 10000) // 张大爷channel
func queue(r *RequestResponse, ch chan *RequestResponse) {
select {
case ch <- r:
default:
go func() {
defer func() {
if recover() != nil {
// send on closed channel
return
}
}()
ch <- r
}()
}
}
func consume(conn *net.TCPConn, ch chan *RequestResponse) {
// 复用[]byte
buf := &bytes.Buffer{}
for {
// 第一个消息总是阻塞的
r, ok := <-ch
if !ok {
// channel已经关闭了
return
}
serialize(r, buf)
// 收集一波再发送
for i := 0; i < 49; i++ {
select {
case r, ok := <-ch:
if !ok {
// channel closed
break
}
serialize(r, buf)
default:
// nothing in the channel
break
}
}
if buf.Len() > 0 {
conn.Write(buf.Bytes())
buf.Reset()
}
}
}
// 序列化RequestResponse,并发送
// 序列化后的结构如下:
// 长度 4字节
// Serial 4字节
// PayLoad 变长
func serialize(r *RequestResponse, writer io.Writer) {
payloadBytes := []byte(r.Payload)
serialBytes := make([]byte, 4)
binary.BigEndian.PutUint32(serialBytes, r.Serial)
length := uint32(len(payloadBytes) + len(serialBytes))
lengthByte := make([]byte, 4)
binary.BigEndian.PutUint32(lengthByte, length)
writer.Write(lengthByte)
writer.Write(serialBytes)
writer.Write(payloadBytes)
}
// 接收数据,反序列化成RequestResponse
func readFrom(conn *net.TCPConn) (*RequestResponse, error) {
ret := &RequestResponse{}
buf := make([]byte, 4)
if _, err := io.ReadFull(conn, buf); err != nil {
return nil, fmt.Errorf("读长度故障:%s", err.Error())
}
length := binary.BigEndian.Uint32(buf)
if _, err := io.ReadFull(conn, buf); err != nil {
return nil, fmt.Errorf("读Serial故障:%s", err.Error())
}
ret.Serial = binary.BigEndian.Uint32(buf)
payloadBytes := make([]byte, length-4)
if _, err := io.ReadFull(conn, payloadBytes); err != nil {
return nil, fmt.Errorf("读Payload故障:%s", err.Error())
}
ret.Payload = string(payloadBytes)
return ret, nil
}
// 张大爷的耳朵
func zhangDaYeListen(conn *net.TCPConn, wg *sync.WaitGroup) {
defer func() {
close(zhangChan)
wg.Done()
}()
for zRecvCount < total*3 {
r, err := readFrom(conn)
if err != nil {
fmt.Println(err.Error())
break
}
// fmt.Println("张大爷收到:" + r.Payload)
if r.Payload == l2 { // 如果收到:您这,嘛去?
queue(&RequestResponse{r.Serial, z3}, zhangChan)
//go writeTo(&RequestResponse{r.Serial, z3}, conn, &zhangWriteLock) // 回复:嗨!吃饱了溜溜弯儿。
} else if r.Payload == l4 { // 如果收到:有空家里坐坐啊。
queue(&RequestResponse{r.Serial, z5}, zhangChan)
//go writeTo(&RequestResponse{r.Serial, z5}, conn, &zhangWriteLock) // 回复:回头去给老太太请安!
} else if r.Payload == l1 { // 如果收到:刚吃。
// 不用回复
} else {
fmt.Println("张大爷听不懂:" + r.Payload)
break
}
zRecvCount++
}
}
// 张大爷的嘴
func zhangDaYeSay(conn *net.TCPConn) {
nextSerial := uint32(0)
for i := uint32(0); i < total; i++ {
queue(&RequestResponse{nextSerial, z0}, zhangChan)
//writeTo(&RequestResponse{nextSerial, z0}, conn, &zhangWriteLock)
nextSerial++
}
}
// 李大爷的耳朵,实现是和张大爷类似的
func liDaYeListen(conn *net.TCPConn, wg *sync.WaitGroup) {
defer func() {
close(liChan)
wg.Done()
}()
for lRecvCount < total*3 {
r, err := readFrom(conn)
if err != nil {
fmt.Println(err.Error())
break
}
// fmt.Println("李大爷收到:" + r.Payload)
if r.Payload == z0 { // 如果收到:吃了没,您吶?
queue(&RequestResponse{r.Serial, l1}, liChan)
//writeTo(&RequestResponse{r.Serial, l1}, conn, &liWriteLock) // 回复:刚吃。
} else if r.Payload == z3 {
// do nothing
} else if r.Payload == z5 {
// do nothing
} else {
fmt.Println("李大爷听不懂:" + r.Payload)
break
}
lRecvCount++
}
}
// 李大爷的嘴
func liDaYeSay(conn *net.TCPConn) {
nextSerial := uint32(0)
for i := uint32(0); i < total; i++ {
queue(&RequestResponse{nextSerial, l2}, liChan)
//writeTo(&RequestResponse{nextSerial, l2}, conn, &liWriteLock)
nextSerial++
queue(&RequestResponse{nextSerial, l4}, liChan)
//writeTo(&RequestResponse{nextSerial, l4}, conn, &liWriteLock)
nextSerial++
}
}
func startServer(wg *sync.WaitGroup) {
tcpAddr, _ := net.ResolveTCPAddr("tcp", "127.0.0.1:9999")
tcpListener, _ := net.ListenTCP("tcp", tcpAddr)
defer tcpListener.Close()
fmt.Println("张大爷在胡同口等着 ...")
for {
conn, err := tcpListener.AcceptTCP()
if err != nil {
fmt.Println(err)
break
}
fmt.Println("碰见一个李大爷:" + conn.RemoteAddr().String())
go zhangDaYeListen(conn, wg)
go zhangDaYeSay(conn)
consume(conn, zhangChan)
}
}
func startClient(wg *sync.WaitGroup) *net.TCPConn {
var tcpAddr *net.TCPAddr
tcpAddr, _ = net.ResolveTCPAddr("tcp", "127.0.0.1:9999")
conn, _ := net.DialTCP("tcp", nil, tcpAddr)
go liDaYeListen(conn, wg)
go liDaYeSay(conn)
consume(conn, liChan)
return conn
}
func main() {
var wg sync.WaitGroup
wg.Add(2)
go startServer(&wg)
time.Sleep(time.Second)
conn := startClient(&wg)
t1 := time.Now()
wg.Wait()
elapsed := time.Since(t1)
conn.Close()
fmt.Println("耗时: ", elapsed)
}
@5idu
Copy link

5idu commented Mar 8, 2022

666

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment