Skip to content

Instantly share code, notes, and snippets.

@fionera
Created August 4, 2023 00:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fionera/418229c28891eba7dd0f8b019d180a38 to your computer and use it in GitHub Desktop.
Save fionera/418229c28891eba7dd0f8b019d180a38 to your computer and use it in GitHub Desktop.
package fdcache
import (
"fmt"
"io"
"log"
"os"
"sync"
"sync/atomic"
"time"
)
type File interface {
io.Reader
io.Writer
io.Seeker
io.Closer
Name() string
}
func Open(name string) (File, error) {
return OpenFile(name, os.O_RDONLY, 0)
}
func OpenFile(name string, flag int, perm os.FileMode) (File, error) {
return DefaultCache.OpenFile(name, flag, perm)
}
type Cache interface {
OpenFile(name string, flag int, perm os.FileMode) (File, error)
closeFile(*cachedFile) error
}
type key struct {
name string
flag int
}
type filePool struct {
key
pool sync.Pool
cleanupMtx sync.RWMutex
ticker *time.Ticker
timeout time.Duration
openFDs int64
}
func (f *filePool) new() any {
log.Printf("new(): %s", f.name)
fd, err := os.OpenFile(f.name, f.flag, 0)
if err != nil {
return err
}
return fd
}
func (f *filePool) Get() any {
f.ticker.Reset(f.timeout)
f.cleanupMtx.RLock()
defer f.cleanupMtx.RUnlock()
atomic.AddInt64(&f.openFDs, 1)
return f.pool.Get()
}
func (f *filePool) Put(x any) {
f.cleanupMtx.RLock()
defer f.cleanupMtx.RUnlock()
fd, ok := x.(*os.File)
if !ok {
panic("cant put a non file into a filePool")
}
if fd.Name() != f.name {
panic("cant put files with different names into same pool")
}
_, err := fd.Seek(0, io.SeekStart)
if err != nil {
panic("cant seek to start of file: " + err.Error())
}
atomic.AddInt64(&f.openFDs, -1)
f.pool.Put(x)
}
func (f *filePool) cleanup(parentCleanup func(k key)) {
defer f.ticker.Stop()
for {
<-f.ticker.C
f.cleanupMtx.Lock()
if atomic.LoadInt64(&f.openFDs) != 0 {
log.Println("checking for cleanup, but found open fds")
f.ticker.Reset(f.timeout)
f.cleanupMtx.Unlock()
continue
}
break
}
log.Println("starting cleanup for: ", f.key)
defer log.Println("done cleanup for: ", f.key)
f.pool.New = nil
for {
get := f.pool.Get()
if get == nil {
break
}
err := get.(*os.File).Close()
if err != nil {
log.Println("failed closing: ", err)
}
}
f.pool.New = f.new
parentCleanup(f.key)
f.cleanupMtx.Unlock()
}
func newFilePool(k key, timeout time.Duration, parentCleanup func(k key)) *filePool {
f := &filePool{key: k, timeout: timeout, ticker: time.NewTicker(timeout)}
f.pool.New = f.new
go f.cleanup(parentCleanup)
return f
}
type timerCache struct {
mtx sync.Mutex
fds map[key]*filePool
timeout time.Duration
}
func (t *timerCache) closeFile(file *cachedFile) error {
t.mtx.Lock()
pool, ok := t.fds[file.k]
if !ok {
panic("cant file filepool for existing fd")
}
t.mtx.Unlock()
pool.Put(file.f)
return nil
}
func (t *timerCache) cleanup(k key) {
t.mtx.Lock()
delete(t.fds, k)
t.mtx.Unlock()
}
func (t *timerCache) OpenFile(name string, flag int, perm os.FileMode) (File, error) {
if flag&os.O_CREATE != 0 {
return nil, fmt.Errorf("creating files is not supported")
}
t.mtx.Lock()
k := key{name, flag}
pool, ok := t.fds[k]
if !ok {
fp := newFilePool(k, t.timeout, t.cleanup)
t.fds[k] = fp
pool = fp
}
t.mtx.Unlock()
get := pool.Get()
if err, ok := get.(error); ok {
return nil, err
}
return &cachedFile{f: get.(*os.File), cache: t, k: k}, nil
}
type cachedFile struct {
cache Cache
k key
f *os.File
closed bool
mtx sync.RWMutex
}
func (f *cachedFile) Read(p []byte) (n int, err error) {
f.mtx.Lock()
if !f.closed {
return 0, os.ErrClosed
}
f.mtx.Unlock()
return f.f.Read(p)
}
func (f *cachedFile) Write(p []byte) (n int, err error) {
f.mtx.Lock()
if !f.closed {
return 0, os.ErrClosed
}
f.mtx.Unlock()
return f.f.Write(p)
}
func (f *cachedFile) Seek(offset int64, whence int) (int64, error) {
f.mtx.Lock()
if !f.closed {
return 0, os.ErrClosed
}
f.mtx.Unlock()
return f.f.Seek(offset, whence)
}
func (f *cachedFile) Name() string {
return f.k.name
}
func (f *cachedFile) Close() error {
f.mtx.Lock()
if f.closed {
return os.ErrClosed
}
f.mtx.Unlock()
return f.cache.closeFile(f)
}
// NewTimerCache creates a Cache that invalidates fds which where not
// used for the given timeout duration.
func NewTimerCache(timeout time.Duration) Cache {
return &timerCache{
timeout: timeout,
fds: make(map[key]*filePool),
}
}
var DefaultCache = NewTimerCache(5 * time.Second)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment