Skip to content

Instantly share code, notes, and snippets.

@ugjka
Last active February 13, 2024 12:17
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save ugjka/2e5898c545ab1c9cc952f184cf06e4b5 to your computer and use it in GitHub Desktop.
IRC text-to-speech http mp3 server
// IRC text-to-speech server
// needs ffmpeg and piper-tts (https://github.com/rhasspy/piper)
//
// Example stream:
// https://ugjka.net/irc-radio/
// mpv https://ugjka.net/ircradio
// mpg123 https://ugjka.net/ircradio
//
// configuration? change the constants and build
package main
import (
"bytes"
"fmt"
"io"
"net/http"
"net/mail"
"net/url"
"os"
"os/exec"
"strings"
"sync"
"time"
"github.com/bogem/id3v2/v2"
"github.com/tcolgate/mp3"
kitty "github.com/ugjka/kittybot"
"gopkg.in/inconshreveable/log15.v2"
"mvdan.cc/xurls/v2"
)
const IRC_SERVER = "irc.libera.chat:6697"
const IRC_NICK = "guest556677"
const IRC_PASSWORD = "hunter1"
var IRC_CHANNELS = []string{
"##english",
"#android",
"#archlinux",
"#archlinux-offtopic",
"#go-nuts",
//"#kde",
"#libera",
"#linux",
"#linux-offtopic",
"#networking",
"#newpipe",
"#nextcloud",
"#security",
"#systemd",
"#ubuntu",
"#ubuntu-server",
"#ubuntu-offtopic",
"#yt-dlp",
"#ugjka",
}
const PIPER_TTS_VOICE = "/home/ugjka/amy/medium/en_US-amy-medium.onnx"
// Silence PCM
const SPR = 22050
const PCM_SECOND = SPR * 2
const PCM_BUFFER = PCM_SECOND / 10
const PCM_BUFFER_DUR = time.Second / 10
// Mp3
const BITRATE = 40
// Buffers
const INIT_BUFF_SEC = 15 //seconds
const INIT_BUFF = BITRATE * 1000 / 8 * INIT_BUFF_SEC
const NETW_BUFF = INIT_BUFF / 4
const WORK_BUFF = INIT_BUFF / 32
const MAX_BACKLOG = INIT_BUFF * 2
const STREAM_PORT = 8089
// TMP DIR
const TMP_FOLDER = "ircwebradiotmp"
var tmpdb = newFiles()
var log = log15.New()
var start = make(chan struct{})
func main() {
exes := []string{"piper-tts", "ffmpeg"}
for _, exe := range exes {
_, err := exec.LookPath(exe)
errExit(err)
}
os.RemoveAll(TMP_FOLDER)
err := os.Mkdir(TMP_FOLDER, 0755)
errExit(err)
err = os.Chdir(TMP_FOLDER)
errExit(err)
if len(os.Args) > 1 {
IRC_CHANNELS = []string{"#" + os.Args[1]}
}
bot := kitty.NewBot(IRC_SERVER, IRC_NICK,
func(bot *kitty.Bot) {
bot.Password = IRC_PASSWORD
bot.Channels = IRC_CHANNELS
bot.SSL = true
bot.StripColors = true
},
)
h := log15.StreamHandler(
os.Stderr,
log15.FormatFunc(
func(r *log15.Record) []byte {
buf := bytes.NewBuffer(nil)
fmt.Fprintf(buf, "[%v] ", r.Lvl)
fmt.Fprintf(buf, "[%v] ", r.Msg)
for i, v := range r.Ctx {
if i%2 == 0 {
fmt.Fprintf(buf, "%v=", v)
} else {
fmt.Fprintf(buf, "%v ", v)
}
}
fmt.Fprintln(buf)
return buf.Bytes()
},
),
)
log.SetHandler(log15.LvlFilterHandler(log15.LvlInfo, h))
bot.Logger = log
line := make(chan string, 5000)
block := make(chan struct{}, 1)
var prevchan string
var prevuser string
// privmsg
bot.AddTrigger(
kitty.Trigger{
Condition: func(bot *kitty.Bot, m *kitty.Message) bool {
return m.Command == "PRIVMSG" && strings.HasPrefix(m.To, "#") && !strings.HasPrefix(m.Content, "\u0001ACTION")
},
Action: func(bot *kitty.Bot, m *kitty.Message) {
block <- struct{}{}
defer func() { <-block }()
var msg string
if m.To != prevchan {
prevchan = m.To
prevuser = m.Name
msg = fmt.Sprintf("%s, %s says: %s", m.To, m.Name, m.Content)
line <- msg
return
}
if m.Name != prevuser {
prevuser = m.Name
msg = fmt.Sprintf("%s says: %s", m.Name, m.Content)
line <- msg
return
}
line <- m.Content
},
},
)
// /me action messages
bot.AddTrigger(
kitty.Trigger{
Condition: func(bot *kitty.Bot, m *kitty.Message) bool {
return m.Command == "PRIVMSG" && strings.HasPrefix(m.To, "#") && strings.HasPrefix(m.Content, "\u0001ACTION")
},
Action: func(bot *kitty.Bot, m *kitty.Message) {
block <- struct{}{}
defer func() { <-block }()
var msg string
if m.To != prevchan {
prevchan = m.To
prevuser = m.Name
msg = fmt.Sprintf("%s, %s %s", m.To, m.Name, m.Content[8:len(m.Content)-1])
line <- msg
return
}
if m.Name != prevuser {
prevuser = m.Name
msg = fmt.Sprintf("%s %s", m.Name, m.Content[8:len(m.Content)-1])
line <- msg
return
}
line <- fmt.Sprintf("%s %s", m.Name, m.Content[8:len(m.Content)-1])
},
},
)
httpserver := &http.Server{
Addr: fmt.Sprintf(":%d", STREAM_PORT),
}
stream := new(streamer)
stream.init()
http.Handle("/stream", stream)
log.Info("stream", "url", fmt.Sprintf("http://0.0.0.0:%d/stream", STREAM_PORT))
go func() {
<-start
httpserver.ListenAndServe()
}()
// TTS PCM generation
go generateTTS(line)
// push the mp3 feed
r, w := io.Pipe()
go generateMP3(w)
go stream.streamMP3(r)
for {
bot.Run()
time.Sleep(time.Second * 30)
}
}
func generateTTS(line <-chan string) {
for {
msg := <-line
msg = stripURLpath(msg)
log.Debug(msg)
piper := exec.Command(
"piper-tts", "--output-raw", "-m",
PIPER_TTS_VOICE,
)
piper.Stdin = strings.NewReader(msg)
filename := fmt.Sprintf("%d.pcm", time.Now().UnixNano())
f, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0644)
errExit(err)
piper.Stdout = f
err = piper.Run()
errExit(err)
err = f.Close()
errExit(err)
tmpdb.add(filename)
}
}
func generateMP3(stream io.Writer) {
ffmpeg := exec.Command("ffmpeg",
"-f", "s16le",
"-ar", fmt.Sprint(SPR),
"-ac", "1",
"-i", "-",
"-b:a", fmt.Sprintf("%dk", BITRATE),
"-f", "mp3",
"-")
ffmpeg.Stdout = stream
r, w := io.Pipe()
ffmpeg.Stdin = r
err := ffmpeg.Start()
errExit(err)
silence := make([]byte, PCM_BUFFER)
buf := make([]byte, PCM_BUFFER)
var total time.Duration
var start time.Time
tick := func() {
total += PCM_BUFFER_DUR
time.Sleep(total - time.Since(start))
}
start = time.Now()
for {
file := tmpdb.get()
if file == "" {
tick()
_, err := w.Write(silence)
errExit(err)
continue
}
f, err := os.Open(file)
errExit(err)
loop:
for {
n, err := f.Read(buf)
switch err {
case nil:
tick()
_, err := w.Write(append(buf[:n], silence[n:]...))
errExit(err)
case io.EOF:
err = f.Close()
errExit(err)
err = os.Remove(file)
errExit(err)
break loop
default:
errExit(err)
}
}
}
}
func (s *streamer) streamMP3(r *io.PipeReader) {
dec := mp3.NewDecoder(r)
frame := &mp3.Frame{}
skipped := new(int)
var tmp []byte
var buflen int
s.Lock()
for {
err := dec.Decode(frame, skipped)
if err == nil {
tmp, err = io.ReadAll(frame.Reader())
if err == nil {
s.buffer = append(s.buffer, tmp)
buflen += len(tmp)
if buflen >= INIT_BUFF {
buflen = 0
break
}
}
}
}
log.Info("initial buffer built", "length", fmt.Sprintf("%ds", INIT_BUFF_SEC))
s.Unlock()
close(start)
var buf [][]byte
buflen = 0
for {
err := dec.Decode(frame, skipped)
if err == nil {
tmp, err = io.ReadAll(frame.Reader())
if err == nil {
buf = append(buf, tmp)
buflen += len(tmp)
if buflen >= WORK_BUFF {
s.send(buf)
buf = [][]byte{}
buflen = 0
}
}
}
}
}
func (s *streamer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Set some headers
w.Header().Add("Cache-Control", "no-cache, no-store, must-revalidate")
w.Header().Add("Pragma", "no-cache")
w.Header().Add("Expires", "0")
w.Header().Add("Content-Type", "audio/mpeg")
flusher, ok := w.(http.Flusher)
chunked := ok && r.Proto == "HTTP/1.1"
if r.Method == http.MethodHead {
w.WriteHeader(http.StatusOK)
return
}
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
id, recieve := s.addClient()
defer s.delClient(id)
tag := id3v2.NewEmptyTag()
tag.SetArtist(IRC_SERVER)
tag.SetAlbum("IRC tts service")
tag.SetGenre("voice")
tag.SetYear(fmt.Sprint(time.Now().Year()))
tag.SetTitle(strings.Join(IRC_CHANNELS, " "))
_, err := tag.WriteTo(w)
if err != nil {
return
}
if chunked {
flusher.Flush()
}
for {
chunk := <-recieve
if _, err := w.Write(chunk); err != nil {
return
}
if chunked {
flusher.Flush()
}
}
}
type streamer struct {
sync.RWMutex
clients map[uint64]chan []byte
id uint64
buffer [][]byte
}
func (s *streamer) init() {
s.Lock()
defer s.Unlock()
s.clients = make(map[uint64]chan []byte)
s.buffer = make([][]byte, 0)
}
func (s *streamer) addClient() (id uint64, in chan []byte) {
s.Lock()
defer s.Unlock()
s.id++
s.clients[s.id] = make(chan []byte, MAX_BACKLOG/WORK_BUFF)
s.clients[s.id] <- bytes.Join(s.buffer, nil)
log.Info("listeners", "count", len(s.clients))
return s.id, s.clients[s.id]
}
func (s *streamer) delClient(id uint64) {
s.Lock()
defer s.Unlock()
close(s.clients[id])
delete(s.clients, id)
log.Info("listeners", "count", len(s.clients))
}
func (s *streamer) send(frames [][]byte) {
s.Lock()
buf := bytes.Join(frames, nil)
for _, v := range s.clients {
select {
case v <- buf:
default:
}
}
s.buffer = append(s.buffer[len(frames):], frames...)
s.Unlock()
}
type tmpfiles struct {
sync.Mutex
files []string
}
func (tmp *tmpfiles) add(name string) {
tmp.Lock()
tmp.files = append(tmp.files, name)
tmp.Unlock()
}
func (tmp *tmpfiles) get() (out string) {
tmp.Lock()
defer tmp.Unlock()
if len(tmp.files) > 0 {
out = tmp.files[0]
tmp.files = tmp.files[1:]
}
return
}
func newFiles() tmpfiles {
return tmpfiles{
files: make([]string, 0),
}
}
var urlreg = xurls.Relaxed()
// strip URLs down to hostname
func stripURLpath(in string) string {
matches := urlreg.FindAllString(in, -1)
var tmp string
for i := range matches {
_, err := mail.ParseAddress(matches[i])
if err == nil {
continue
}
tmp = matches[i]
if !strings.HasPrefix(tmp, "http://") && !strings.HasPrefix(tmp, "https://") {
tmp = "https://" + tmp
}
link, err := url.Parse(tmp)
if err == nil {
in = strings.Replace(in, matches[i], link.Hostname(), -1)
}
}
return in
}
func errExit(err error) {
if err != nil {
fmt.Fprintln(os.Stderr, "Exiting, fatal:")
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment