Skip to content

Instantly share code, notes, and snippets.

@ysugimoto

ysugimoto/client.go

Created Aug 15, 2018
Embed
What would you like to do?
socket multiplexer example
package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"sync"
"time"
)
var mu sync.Mutex
func main() {
sock, err := net.Dial("tcp", ":9998")
if err != nil {
log.Fatal(err)
}
defer sock.Close()
ch := make(chan struct{}, 2)
wch := make(chan []byte)
go recv(sock, ch, wch)
go write(sock, ch, wch)
stop := make(chan struct{})
go func() {
times := 0
ticker := time.NewTicker(time.Microsecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
times++
// log.Printf("[Ticker] filred: %d\n", times)
wch <- []byte(fmt.Sprintf("[Ticker-Client] fired, send message [%d]\n", times))
if times == 10 {
return
}
case <-stop:
return
}
}
}()
times := 0
for {
times++
wch <- []byte(fmt.Sprintf("[Client] Writer message times [%d]", times))
if times == 10 {
stop <- struct{}{}
break
}
time.Sleep(100 * time.Millisecond)
}
<-ch
}
func recv(sock net.Conn, closer chan struct{}, wch chan []byte) {
r := bufio.NewReader(sock)
for {
b := make([]byte, 2048)
n, err := r.Read(b)
if err != nil {
log.Printf("failed to read packet : %s", err.Error())
closer <- struct{}{}
return
}
log.Printf("read packet successfully: %s", string(b[0:n]))
if strings.Index(string(b[0:n]), "Client") != -1 {
continue
}
if err := send(sock, b[0:n]); err != nil {
closer <- struct{}{}
return
}
}
}
func send(sock net.Conn, msg []byte) error {
mu.Lock()
defer mu.Unlock()
w := bufio.NewWriter(sock)
n, err := w.Write(msg)
if err != nil {
log.Printf("failed to write packet : %s", err.Error())
return err
}
if n != len(msg) {
log.Printf("could not write enough packet : %s", err.Error())
return err
}
if err := w.Flush(); err != nil {
log.Printf("could not flush writer : %s", err.Error())
return err
}
log.Printf("Write message successfully: %s\b", string(msg))
return nil
}
func write(sock net.Conn, closer chan struct{}, wch chan []byte) {
for {
select {
case msg := <-wch:
if err := send(sock, msg); err != nil {
closer <- struct{}{}
return
}
}
}
}
package main
import (
"bufio"
"fmt"
"log"
"net"
"strings"
"sync"
"time"
)
var mu sync.Mutex
func main() {
listener, err := net.Listen("tcp", ":9998")
if err != nil {
log.Fatal(err)
}
defer listener.Close()
for {
sock, err := listener.Accept()
if err != nil {
log.Fatal(err)
}
go func(s net.Conn) {
defer sock.Close()
ch := make(chan struct{}, 2)
wch := make(chan []byte)
go recv(sock, ch)
go write(sock, ch, wch)
stop := make(chan struct{})
go func() {
times := 0
ticker := time.NewTicker(time.Microsecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
times++
// log.Printf("[Ticker-Server] fired: %d\n", times)
wch <- []byte(fmt.Sprintf("[Ticker-Server] fired, send message [%d]\n", times))
if times == 10 {
return
}
case <-stop:
return
}
}
}()
times := 0
for {
times++
wch <- []byte(fmt.Sprintf("[Server] Writer message times [%d]", times))
if times == 10 {
stop <- struct{}{}
break
}
time.Sleep(100 * time.Millisecond)
}
}(sock)
}
}
func recv(sock net.Conn, closer chan struct{}) {
r := bufio.NewReader(sock)
for {
b := make([]byte, 512)
n, err := r.Read(b)
if err != nil {
log.Printf("failed to read packet : %s", err.Error())
closer <- struct{}{}
return
}
log.Printf("read packet successfully: %s", string(b[0:n]))
if strings.Index(string(b[0:n]), "Server") != -1 {
continue
}
if err := send(sock, b[0:n]); err != nil {
closer <- struct{}{}
return
}
}
}
func send(sock net.Conn, msg []byte) error {
mu.Lock()
defer mu.Unlock()
w := bufio.NewWriter(sock)
n, err := w.Write(msg)
if err != nil {
log.Printf("failed to write packet : %s", err.Error())
return err
}
if n != len(msg) {
log.Printf("could not write enough packet : %s", err.Error())
return err
}
if err := w.Flush(); err != nil {
log.Printf("could not flush writer : %s", err.Error())
return err
}
log.Printf("Write message successfully: %s\b", string(msg))
return nil
}
func write(sock net.Conn, closer chan struct{}, wch chan []byte) {
for {
select {
case msg := <-wch:
if err := send(sock, msg); err != nil {
closer <- struct{}{}
return
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment