Skip to content

Instantly share code, notes, and snippets.

@miknonny
Forked from pattanunNP/chat.go
Created July 26, 2023 16:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miknonny/66286fe57316d9418d8ab995f297f11e to your computer and use it in GitHub Desktop.
Save miknonny/66286fe57316d9418d8ab995f297f11e to your computer and use it in GitHub Desktop.
package chat
import (
"bytes"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/gofiber/contrib/websocket"
"github.com/nats-io/nats.go"
)
const (
// Time allowed to write a message to the peer.
writeWait = 25 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 120 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var (
newline = []byte{'\n'}
space = []byte{' '}
)
func (ch *ChatHandler) FindRoomORCreate(roomName string) *Room {
// Retrieve the room or create a new one if it doesn't exist
//
log.Println("FindRoomORCreate: ", roomName)
if room, ok := ch.rooms[roomName]; ok {
return room
}
room := &Room{
name: roomName,
clients: make(map[*websocket.Conn]bool),
addCh: make(chan *websocket.Conn),
delCh: make(chan *websocket.Conn),
sendCh: make(chan []byte),
doneCh: make(chan struct{}),
}
ch.rooms[roomName] = room
go room.run(ch)
return room
}
func (room *Room) run(ch *ChatHandler) {
// Create a ticker to ping clients every 50 seconds.
for {
select {
case client := <-room.addCh: // Add new client
room.clients[client] = true
case client := <-room.delCh: // Remove client from the room
delete(room.clients, client)
if len(room.clients) == 0 {
// If the room is empty, remove it from ChatHandler
delete(ch.rooms, room.name) // Broadcast the close signal
close(room.doneCh) // Broadcast the close signal
return
}
case message := <-room.sendCh:
if len(room.clients) > 0 {
for client := range room.clients {
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
// Send the message to all clients in the room
if err := client.WriteMessage(websocket.TextMessage, message); err != nil {
log.Println("Error broadcasting message:", err)
}
}
}
// Broadcast message to all clients
}
}
}
func (ch *ChatHandler) FindRoom(roomName string) *Room {
log.Println("FindRoom: ", roomName)
// Retrieve the room or create a new one if it doesn't exist
//
room, ok := ch.rooms[roomName]
if !ok {
log.Println("Room not found live chat is disabled")
return nil
}
log.Println("Room found: ", roomName)
return room
}
func (ch *ChatHandler) StreamInLiveChat(c *websocket.Conn) {
// Check is roomID exist
// If existed, emit to all clients
// If not exist, create new roomID and emit to all clients
userId := c.Params("user_id")
botID := c.Params("bot_id")
// Continuously read messages from the connection
roomName := fmt.Sprintf("room_%s_botid_%s", userId, botID)
log.Println("RoomName: ", roomName)
room := ch.FindRoomORCreate(roomName)
ch.addConnection(room, c) // Add connection to the room
go func(room *Room, c *websocket.Conn) {
ch.Ping(room, c)
}(room, c)
ch.ReadPump(roomName, c)
}
func (ch *ChatHandler) StreamOutLiveChat(c *websocket.Conn) {
// Check is roomID exist
// If existed, emit to all clients
// If not exist, create new roomID and emit to all clients
userId := c.Params("user_id")
botID := c.Params("bot_id")
// Continuously read messages from the connection
outRoom := fmt.Sprintf("out_room_%s_botid_%s", userId, botID)
log.Println("RoomName: ", outRoom)
room := ch.FindRoomORCreate(outRoom)
ch.addConnection(room, c) // Add connection to the room
go func(room *Room, c *websocket.Conn) {
ch.Ping(room, c)
}(room, c)
ch.ReadPump(outRoom, c)
}
func (ch *ChatHandler) NotificationChat(c *websocket.Conn) {
// Check is roomID exist
// If existed, emit to all clients
// If not exist, create new roomID and emit to all clients
roomName := "notification"
room := ch.FindRoomORCreate(roomName)
ch.addConnection(room, c)
go func(room *Room, c *websocket.Conn) {
ch.Ping(room, c)
}(room, c)
ch.ReadPump(roomName, c)
}
func (ch *ChatHandler) Ping(room *Room, c *websocket.Conn) {
// Start a timer to send ping every 5 seconds
// Start a goroutine to ping the client every 5 seconds
pingTicker := time.NewTicker(5 * time.Second)
defer pingTicker.Stop()
for range pingTicker.C { // Loop until the timer channel receives a value
if len(room.clients) > 0 {
if err := c.WriteMessage(websocket.PingMessage, []byte("")); err != nil {
log.Println(err)
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
break
}
ch.removeConnection(room, c)
break
}
}
}
pingTicker.Stop()
// // Read messages from the client
}
// ReadPump listens for incoming messages from the WebSocket connection
func (ch *ChatHandler) ReadPump(subject string, c *websocket.Conn) {
log.Println("New connection added")
// Keep the main goroutine running
// Start a timer to send ping every 5 seconds
c.SetReadLimit(maxMessageSize)
err := c.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
return
}
c.SetPongHandler(func(string) error {
err := c.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
return err
}
return nil
})
// Subscribe to a subject/topic
room := ch.FindRoom(subject)
sub, _ := ch.repository.NATSRepository.Subscribe(subject, func(msg *nats.Msg) {
message := bytes.TrimSpace(bytes.Replace(msg.Data, newline, space, -1))
if err := c.WriteMessage(websocket.TextMessage, message); err != nil {
log.Println(err)
ch.removeConnection(room, c)
return
}
})
// Read messages from NATS and write to the socket
defer func(sub *nats.Subscription) {
err := sub.Unsubscribe()
if err != nil {
return
}
}(sub)
// // Set up a signal handler to gracefully exit the program
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) // Register os.Interrupt and syscall.SIGTERM to sigCh
go func(room *Room, c *websocket.Conn) {
<-sigCh
err := sub.Unsubscribe() // Unsubscribe from the subject
if err != nil {
return
}
ch.repository.NATSRepository.Close() // Close the connection to NATS
ch.removeConnection(room, c)
os.Exit(0)
// Exit the program
}(room, c)
// Publish message to the subject
//
select {}
}
func (ch *ChatHandler) addConnection(room *Room, c *websocket.Conn) {
log.Println("Add connection to room: ", room.name)
log.Println("Add connection to room: ", c.RemoteAddr().String())
// Add the new client to the room by sending it over the addCh channel
room.addCh <- c // Send the client to the addCh channel
}
func (ch *ChatHandler) removeConnection(room *Room, c *websocket.Conn) {
// Remove the client from the room by sending it over the delCh channel
room.delCh <- c // Send the client to the delCh channel
}
func (ch *ChatHandler) broadcast(room *Room, message []byte) {
// Broadcast message to all clients
log.Println("Broadcast message: ")
// log.Println(utils.PrettyFormatStruct(string(message)))
// Send the message to all the clients in the room by sending it over the sendCh channel
room.sendCh <- message // Send the message to the sendCh channel
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment