Skip to content

Instantly share code, notes, and snippets.

@hyoban
Created September 24, 2021 05:41
Show Gist options
  • Save hyoban/c443ca82d2fd105b81a7e2d04025eef6 to your computer and use it in GitHub Desktop.
Save hyoban/c443ca82d2fd105b81a7e2d04025eef6 to your computer and use it in GitHub Desktop.
监听文件夹自动上传到 hdfs
package main
import (
"context"
"database/sql"
"flag"
"log"
"os"
"os/exec"
"strconv"
"strings"
"time"
"github.com/djherbis/times"
"github.com/fsnotify/fsnotify"
_ "github.com/go-sql-driver/mysql"
)
var (
localFolderPath string
db *sql.DB
)
type video struct {
name string
path string
time string
}
func (v video) save() (res sql.Result, err error) {
sqlStr := "INSERT INTO videos VALUES (?, ?, ?)"
res, err = db.Exec(sqlStr, v.name, v.path, v.time)
return
}
const (
userName = "hadoop-video"
password = "123456"
ip = "127.0.0.1"
port = "3306"
dbName = "videos"
)
func init() {
flag.StringVar(&localFolderPath, "lfp", "", "监听的文件夹")
dbPath := strings.Join([]string{userName, ":", password, "@tcp(", ip, ":", port, ")/", dbName, "?charset=utf8"}, "")
dbType := "mysql"
var err error
db, err = sql.Open(dbType, dbPath)
if err != nil {
panic(err)
}
ctx := context.Background()
err = db.PingContext(ctx)
if err != nil {
log.Fatalln(err.Error())
}
log.Println("Database Connected")
}
func mkdirAndUploadToHDFS(localFilePath string) {
if localFilePath != "" {
if strings.Contains(localFilePath, " ") {
// p := strings.ReplaceAll(localFilePath, " ", "")
// log.Println(p, localFilePath)
// os.Rename(localFilePath, p)
// localFilePath = p
// log.Println(p, localFilePath)
log.Fatalln("文件路径不可包含空格")
}
t, err := times.Stat(localFilePath)
if err != nil {
log.Fatal(err)
}
if t.HasBirthTime() {
path, err := makeDirInHDFSByTime(t.BirthTime())
if err != nil {
log.Fatal(err)
}
index := strings.LastIndex(localFilePath, "/")
fileName := localFilePath[index+1:]
// log.Println(index)
video := video{
name: fileName,
path: "hdfs:/" + path + fileName,
time: t.BirthTime().String(),
}
res, err := video.save()
if err != nil {
log.Fatal(err)
}
log.Println(res)
err = uploadToHDFS(localFilePath, path+fileName)
if err != nil {
log.Fatal(err)
}
}
}
}
func uploadToHDFS(localFilePath, hdfsFilePath string) (err error) {
cmd := exec.Command("hadoop", "fs", "-put", localFilePath, hdfsFilePath)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Run()
return
}
func makeDirInHDFSByTime(time time.Time) (path string, err error) {
path = "/upload/" + strconv.Itoa(time.Year()) + "/" + strconv.Itoa(int(time.Month())) + "/" + strconv.Itoa(int(time.Day())) + "/" + strconv.Itoa(int(time.Hour())) + strconv.Itoa(int(time.Minute())) + strconv.Itoa(int(time.Second())) + "/"
err = makeDirInHDFS(path)
return
}
func makeDirInHDFS(path string) (err error) {
cmd := exec.Command("hadoop", "fs", "-mkdir", "-p", path)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
err = cmd.Run()
return
}
func main() {
flag.Parse()
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
}
defer watcher.Close()
done := make(chan bool)
go func() {
for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}
if event.Op == fsnotify.Create {
log.Println("event:", event)
go mkdirAndUploadToHDFS(event.Name)
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
log.Println("error:", err)
}
}
}()
err = watcher.Add(localFolderPath)
if err != nil {
log.Fatal(err)
}
<-done
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment