Skip to content

Instantly share code, notes, and snippets.

@taylorhughes
Last active July 26, 2022 14:00
Show Gist options
  • Save taylorhughes/9861cce37d9a5f1974a1 to your computer and use it in GitHub Desktop.
Save taylorhughes/9861cce37d9a5f1974a1 to your computer and use it in GitHub Desktop.
A lightly modified excerpt from Cluster's streaming media upload server, which encodes video on the fly as it is uploaded.
func EncodeStreamingVideo(streamingFile io.Reader, request ShouldCanceler) (*os.File, error) {
outputFilename := generateFilename("mp4")
// Actually start the command.
cmd := exec.Command("ffmpeg",
// Read input from stdin.
"-i", "-",
// ... environment-specific ffmpeg options ...
"-y", outputFilename)
stdin, _ := cmd.StdinPipe()
stderr, _ := cmd.StderrPipe()
defer stderr.Close()
if err := cmd.Start(); err != nil {
return nil, err
}
var err error = nil
for err == nil && !request.Closed() {
bytesRead, err := streamingFile.Read(buf)
if err == nil {
_, err = stdin.Write(buf[:bytesRead])
}
}
stdin.Close()
if err != nil {
defer os.Remove(outputFilename)
log.Print("Error reading streaming file: ", err)
return nil, err
}
err = cmd.Wait()
if err != nil {
defer os.Remove(outputFilename)
log.Print("Error processing video: ", err)
return nil, err
}
return os.Open(outputFilename)
}
// This is a type that wraps a temporary file, allowing one goroutine
// to write to it and another to read from it as data is written.
// Read() blocks until more data is available.
import (
"log"
"io/ioutil"
"os"
)
type UploadingFile struct {
readPosition int64
writePosition int64
isDoneWriting bool
file *os.File
dataNotificationChan chan bool
}
func NewUploadingFile() (*UploadingFile, error) {
file, err := ioutil.TempFile("", "upload")
if err != nil {
log.Print("Could not create a temporary file!", err)
return nil, err
}
uploadingFile := UploadingFile{}
uploadingFile.file = file
uploadingFile.readPosition = 0
uploadingFile.writePosition = 0
uploadingFile.dataNotificationChan = make(chan bool)
return &uploadingFile, nil
}
func (r *UploadingFile) Write(buf []byte) (int, error) {
bytesWritten, err := r.file.WriteAt(buf, r.writePosition)
r.writePosition += int64(bytesWritten);
r.notifyReadAvailable()
return bytesWritten, err
}
func (r *UploadingFile) DoneWriting() {
r.isDoneWriting = true
r.notifyReadAvailable()
}
func (r *UploadingFile) notifyReadAvailable() {
// Non-blockingly send to the notification channel, in case anyone is reading.
select {
case r.dataNotificationChan <- true:
default:
}
}
func (r *UploadingFile) Read(buf []byte) (int, error) {
newPosition := r.readPosition + int64(len(buf))
for !r.isDoneWriting && newPosition > r.writePosition {
// Block while more of the file is written.
<-r.dataNotificationChan
}
bytesRead, err := r.file.ReadAt(buf, r.readPosition)
r.readPosition += int64(bytesRead)
return bytesRead, err
}
func (r *UploadingFile) Wait() {
for !r.isDoneWriting {
<-r.dataNotificationChan
}
}
func (r *UploadingFile) Close() error {
return r.file.Close()
}
func (r *UploadingFile) Name() string {
return r.file.Name()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment