Last active
August 17, 2022 05:40
-
-
Save dlzou/402d80035deaaeb1a8bbc3b5c3a23333 to your computer and use it in GitHub Desktop.
Prototype client for Slack API Socket Mode
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"encoding/json" | |
"errors" | |
"fmt" | |
"io/ioutil" | |
"net/http" | |
"os" | |
"os/signal" | |
"sync" | |
"time" | |
"github.com/gorilla/websocket" | |
"golang.org/x/net/context" | |
) | |
type M map[string]interface{} | |
// POST request to get a WebSocket connection URL | |
func fetchSocketUrl(appUrl string) (string, error) { | |
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) | |
defer cancel() | |
req, err := http.NewRequestWithContext(ctx, http.MethodPost, appUrl, nil) | |
if err != nil { | |
return "", err | |
} | |
req.Header.Set("Authorization", "Bearer "+os.Getenv("SLACK_APP_TOKEN")) | |
req.Header.Set("Content-Type", "application/x-www-form-urlencoded") | |
resp, err := http.DefaultClient.Do(req) | |
if err != nil { | |
return "", err | |
} | |
defer resp.Body.Close() | |
urlData, _ := ioutil.ReadAll(resp.Body) | |
var urlJson M | |
if err = json.Unmarshal(urlData, &urlJson); err != nil { | |
return "", err | |
} | |
if urlJson["ok"] != true { | |
return "", errors.New("Fetch socket URL: ok is false") | |
} | |
return urlJson["url"].(string), nil | |
} | |
// Open a WebSocket connection and listen to incoming payloads. | |
// Gracefully handle connection restarts and failed reads. | |
func socketWorker( | |
intrCtx context.Context, | |
appUrl string, | |
queue chan<- M, | |
restartChan chan<- struct{}, | |
) { | |
fmt.Println("Socket worker: spawned") | |
wssUrl, err := fetchSocketUrl(appUrl) | |
if err != nil { | |
fmt.Println(err.Error()) | |
return | |
} | |
conn, _, err := websocket.DefaultDialer.Dial(wssUrl, nil) | |
if err != nil { | |
fmt.Println("Socket worker: connection failed") | |
return | |
} | |
defer conn.Close() | |
fmt.Println("Socket worker: connection established") | |
defer fmt.Println("Socket worker: terminated") | |
done := make(chan struct{}) | |
go func() { | |
defer close(done) | |
for { | |
var payload M | |
err := conn.ReadJSON(&payload) | |
if err != nil { | |
fmt.Printf("Socket worker: read failed with %s", err.Error()) | |
restartChan <- struct{}{} | |
return | |
} | |
if payload["type"] == "hello" { | |
fmt.Println("Socket worker: hello") | |
continue | |
} | |
if payload["type"] == "disconnect" { | |
if payload["reason"] == "refresh_requested" { | |
fmt.Println("Socket worker: refresh_requested") | |
restartChan <- struct{}{} | |
} | |
return | |
} | |
// Ack payload and queue it | |
if envelopeId, ok := payload["envelope_id"]; ok { | |
if err = conn.WriteJSON(M{"envelope_id": envelopeId}); err != nil { | |
fmt.Printf("Socket worker: ack failed with %s", err.Error()) | |
continue | |
} | |
} | |
if p, ok := payload["payload"]; ok { | |
queue <- p.(M) | |
} | |
} | |
}() | |
select { | |
case <-done: | |
case <-intrCtx.Done(): | |
// Attempt to gracefully disconnect | |
if err = conn.WriteMessage( | |
websocket.CloseMessage, | |
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), | |
); err != nil { | |
fmt.Printf("Socket worker: interrupt failed with %s", err.Error()) | |
return | |
} | |
select { | |
case <-done: | |
case <-time.After(2 * time.Second): | |
} | |
} | |
} | |
func main() { | |
intrCtx, cancel := context.WithCancel(context.Background()) | |
intrChan := make(chan os.Signal, 1) | |
signal.Notify(intrChan, os.Interrupt) | |
defer func() { | |
signal.Stop(intrChan) | |
cancel() | |
}() | |
go func() { | |
select { | |
case <-intrChan: | |
cancel() | |
case <-intrCtx.Done(): | |
} | |
}() | |
queue := make(chan M, 5) | |
defer close(queue) | |
restartChan := make(chan struct{}, 1) | |
defer close(restartChan) | |
var wg sync.WaitGroup | |
appUrl := "https://slack.com/api/apps/connections.open" | |
// Handle restarting socket worker | |
go func() { | |
for { | |
select { | |
case <-restartChan: | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
socketWorker(intrCtx, appUrl, queue, restartChan) | |
}() | |
case <-intrCtx.Done(): | |
return | |
} | |
} | |
}() | |
// Start socket worker | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
socketWorker(intrCtx, appUrl, queue, restartChan) | |
}() | |
// Consume payloads | |
numConsumers := 2 | |
for i := 1; i <= numConsumers; i++ { | |
wg.Add(1) | |
go func() { | |
defer wg.Done() | |
for { | |
select { | |
case payload := <-queue: | |
// More logic for handling payloads | |
fmt.Println(payload) | |
case <-intrCtx.Done(): | |
return | |
} | |
} | |
}() | |
} | |
wg.Wait() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment