Skip to content

Instantly share code, notes, and snippets.

@aklyachkin
Created October 19, 2022 14:59
Show Gist options
  • Save aklyachkin/fad889d0b3f2d9fbe64eb59c8516418e to your computer and use it in GitHub Desktop.
Save aklyachkin/fad889d0b3f2d9fbe64eb59c8516418e to your computer and use it in GitHub Desktop.
golang fsnotify support for IBM AIX
// ahafs.go
// Copyright 2022 Power-Devops.com. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build aix
// +build aix
package fsnotify
import (
"fmt"
"errors"
"os"
"path/filepath"
"strings"
"sync"
"github.com/power-devops/ahafs"
"golang.org/x/sys/unix"
)
// Watcher watches a set of files, delivering events to a channel.
type Watcher struct {
Events chan Event
Errors chan error
mu sync.Mutex
watches map[string]*watch // Map of ahafs watches (key: path)
}
type watch struct {
path string
fileevt *ahafs.Monitor
filec chan ahafs.Event
attrevt *ahafs.Monitor
attrc chan ahafs.Event
pathevt *ahafs.Monitor
pathc chan ahafs.Event
}
// NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
func NewWatcher() (*Watcher, error) {
if !isAhaMounted() {
return nil, errors.New("AHAFS is not mounted.")
}
w := &Watcher{
Events: make(chan Event),
Errors: make(chan error),
watches: make(map[string]*watch),
}
go w.readEvents()
return w, nil
}
// Close removes all watches and closes the events channel.
func (w *Watcher) Close() error {
for _, x := range w.watches {
x.Close()
}
return nil
}
// Add starts watching the named file or directory (non-recursively).
func (w *Watcher) Add(name string) error {
name = filepath.Clean(name)
w.mu.Lock()
defer w.mu.Unlock()
wa := w.watches[name]
if wa != nil {
return errors.New(fmt.Sprintf("Object %s is already monitored", name))
}
wa, err := newWatch(name)
if err != nil {
return err
}
w.watches[name] = wa
return nil
}
// Remove stops watching the the named file or directory (non-recursively).
func (w *Watcher) Remove(name string) error {
name = filepath.Clean(name)
w.mu.Lock()
defer w.mu.Unlock()
wa := w.watches[name]
if wa == nil {
return errors.New(fmt.Sprintf("Object %s is not monitored", name))
}
wa.Close()
delete(w.watches, name)
return nil
}
// readEvents reads events from ahafs, converts the
// received events into Event objects and sends them via the Events channel
func (w *Watcher) readEvents() {
c := make(chan Event, 1)
for _, wa := range w.watches {
go wa.GetEvents(c)
}
for {
select {
case ahaevt, ok := <- c:
if !ok { continue }
w.Events <- ahaevt
}
}
}
func newWatch(name string) (*watch, error) {
w := &watch{path: name}
// fmt.Println("Adding a new watcher for:", name)
s, err := os.Stat(name)
switch {
case err == nil && !s.IsDir():
// fmt.Println("We should monitor an existing file")
// fmt.Println("Adding file monitor for:", name)
if fm, err := ahafs.NewFileMonitor(name); err != nil {
return nil, err
} else {
w.fileevt = fm
}
// fmt.Println("Adding attribute monitor for:", name)
if am, err := ahafs.NewFileAttrMonitor(name); err != nil {
return nil, err
} else {
w.attrevt = am
}
// fmt.Println("Adding directory monitor for:", filepath.Dir(name))
if pm, err := ahafs.NewDirMonitor(filepath.Dir(name)); err != nil {
return nil, err
} else {
w.pathevt = pm
}
case err == nil && s.IsDir():
// fmt.Println("We should monitor an existing directory")
// fmt.Println("Adding directory monitor for:", name)
if pm, err := ahafs.NewDirMonitor(name); err != nil {
return nil, err
} else {
w.pathevt = pm
}
case err != nil:
// fmt.Println("We should monitor something non-existent. Assume it is a file")
// fmt.Println("Adding directory monitor for:", filepath.Dir(name))
if pm, err := ahafs.NewDirMonitor(filepath.Dir(name)); err != nil {
return nil, err
} else {
w.pathevt = pm
}
}
return w, nil
}
// GetEvents returns the newest events from the specific watcher
func (w *watch) GetEvents(c chan <- Event) {
filecAlive := false
pathcAlive := false
attrcAlive := false
w.filec = make(chan ahafs.Event)
w.pathc = make(chan ahafs.Event)
w.attrc = make(chan ahafs.Event)
defer func() {
if !isClosed(w.filec) { close(w.filec) }
if !isClosed(w.pathc) { close(w.pathc) }
if !isClosed(w.attrc) { close(w.attrc) }
}()
if w.fileevt != nil {
filecAlive = true
go w.fileevt.Watch(w.filec)
}
if w.pathevt != nil {
pathcAlive = true
go w.pathevt.Watch(w.pathc)
}
if w.attrevt != nil {
attrcAlive = true
go w.attrevt.Watch(w.attrc)
}
for {
var e ahafs.Event
var ok bool
select {
case e, ok = <- w.filec:
// fmt.Println("Event from file channel:", e)
if !ok { continue }
if !filecAlive { continue }
if e.IsQuit() {
filecAlive = false
w.fileevt.Close()
continue
}
select {
case c <- Event{Name: w.path, Op: Write}:
default:
}
case e, ok = <- w.pathc:
// fmt.Println("Event from path channel:", e)
if !ok { continue }
if !pathcAlive { continue }
if e.IsQuit() {
pathcAlive = false
w.pathevt.Close()
continue
}
switch e.RC {
case ahafs.ModDirCreate:
// a file is created
// fmt.Println("File created")
if w.fileevt == nil {
if _, err := os.Stat(w.path); err == nil {
if fm, err := ahafs.NewFileMonitor(w.path); err != nil {
fmt.Println("Can't create file monitor:", err)
continue
} else {
w.fileevt = fm
filecAlive = true
go w.fileevt.Watch(w.filec)
}
}
}
if w.attrevt == nil {
if _, err := os.Stat(w.path); err == nil {
if am, err := ahafs.NewFileAttrMonitor(w.path); err != nil {
fmt.Println("Can't create attribute monitor:", err)
continue
} else {
w.attrevt = am
attrcAlive = true
go w.attrevt.Watch(w.attrc)
}
}
}
select {
case c <- Event{Name: w.path, Op: Create}:
default:
}
case ahafs.ModDirRemove:
// a file is removed
// fmt.Println("File removed")
if strings.Split(e.Info, "\n")[0] == w.path {
// the file is removed, no events more from filec and attrc
filecAlive = false
attrcAlive = false
}
select {
case c <- Event{Name: w.path, Op: Remove}:
default:
}
}
case e, ok = <- w.attrc:
// fmt.Println("Event from attribute channel:", e)
if !ok { continue }
if !attrcAlive { continue }
if e.IsQuit() {
attrcAlive = false
w.attrevt.Close()
continue
}
switch e.RC {
case ahafs.ModFileAttrRemove:
select {
case c <- Event{Name: w.path, Op: Remove}:
default:
}
// file is removed, no events anymore from filec & attrc
filecAlive = false
attrcAlive = false
w.attrevt.Close()
w.fileevt.Close()
continue
case ahafs.ModFileAttrRename:
select {
case c <- Event{Name: w.path, Op: Rename}:
default:
}
continue
}
select {
case c <- Event{Name: w.path, Op: Chmod}:
default:
}
}
}
}
func (w *watch) Close() {
if w.fileevt != nil {
w.fileevt.Close()
}
if w.pathevt != nil {
w.pathevt.Close()
}
if w.attrevt != nil {
w.attrevt.Close()
}
}
func isAhaMounted() bool {
st := unix.Stat_t{}
err := unix.Stat("/aha", &st)
if err != nil {
return false
}
if st.Flag != 1 {
return false
}
return true
}
func isClosed(ch <- chan ahafs.Event) bool {
select {
case <-ch:
return true
default:
}
return false
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment