parent
1e9e4a28d0
commit
9bea72ce04
|
@ -22,7 +22,8 @@ type FileWatcherReader struct {
|
|||
// TearDown channel
|
||||
exit chan string
|
||||
// Chan used to restart the watching channel on a new folder
|
||||
dailyswitch chan string
|
||||
dailySwitch chan bool
|
||||
dailySwitchExit chan bool
|
||||
// Current buffer
|
||||
json bool
|
||||
// Current file
|
||||
|
@ -31,17 +32,22 @@ type FileWatcherReader struct {
|
|||
watching bool
|
||||
// Insert Separator
|
||||
insertsep bool
|
||||
// logging
|
||||
log * log.Logger
|
||||
}
|
||||
|
||||
// NewFileWatcherReader creates a new FileWatcherReader
|
||||
// json specifies whether we now we handle json files
|
||||
func NewFileWatcherReader(f string, j bool, daily bool) (*FileWatcherReader, error) {
|
||||
func NewFileWatcherReader(f string, j bool, daily bool, logger *log.Logger) (*FileWatcherReader, error) {
|
||||
r := &FileWatcherReader{
|
||||
folderstr: f,
|
||||
eic: make(chan notify.EventInfo, 4096),
|
||||
dailySwitch: make(chan bool),
|
||||
dailySwitchExit: make(chan bool),
|
||||
json: j,
|
||||
watching: true,
|
||||
insertsep: false,
|
||||
log: logger,
|
||||
}
|
||||
// go routine holding the watcher
|
||||
go setUpWatcher(r, daily)
|
||||
|
@ -50,8 +56,12 @@ func NewFileWatcherReader(f string, j bool, daily bool) (*FileWatcherReader, err
|
|||
if daily {
|
||||
c := cron.New()
|
||||
c.AddFunc("@midnight", func() {
|
||||
//c.AddFunc("@every 1m", func() {
|
||||
r.dailyswitch <- "switch"
|
||||
//c.AddFunc("@every 10s", func() {
|
||||
// Sending exit signal to setUpWatcher
|
||||
r.dailySwitch <- true
|
||||
// Waiting for exit signal from setUpWatcher
|
||||
<-r.dailySwitchExit
|
||||
go setUpWatcher(r, daily)
|
||||
})
|
||||
c.Start()
|
||||
}
|
||||
|
@ -61,21 +71,34 @@ func NewFileWatcherReader(f string, j bool, daily bool) (*FileWatcherReader, err
|
|||
// setUpWatcher holds the watcher
|
||||
func setUpWatcher(r *FileWatcherReader, daily bool) {
|
||||
if daily {
|
||||
dt := time.Now()
|
||||
//Format YYYYMMDD
|
||||
// TODO make it customizable
|
||||
currentFolder := dt.Format("20060102")
|
||||
log.Println(fmt.Sprintf("Watching : %s/%s/...", r.folderstr, currentFolder))
|
||||
if err := notify.Watch(fmt.Sprintf("%s/%s/...", r.folderstr, currentFolder), r.eic, notify.InCloseWrite); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
t, _ := time.ParseDuration("1s")
|
||||
retryWatch(r, t)
|
||||
} else {
|
||||
if err := notify.Watch(fmt.Sprintf("%s/...", r.folderstr), r.eic, notify.InCloseWrite); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
defer notify.Stop(r.eic)
|
||||
<-r.dailyswitch
|
||||
<-r.dailySwitch
|
||||
r.dailySwitchExit <- true
|
||||
}
|
||||
|
||||
// retryWatch tries to set up the watcher until it works every t
|
||||
func retryWatch(r *FileWatcherReader, t time.Duration) {
|
||||
dt := time.Now()
|
||||
//Format YYYYMMDD
|
||||
// TODO make it customizable
|
||||
currentFolder := dt.Format("20060102")
|
||||
r.log.Println(fmt.Sprintf("Watching: %s/%s/...", r.folderstr, currentFolder))
|
||||
for {
|
||||
if err := notify.Watch(fmt.Sprintf("%s/%s/...", r.folderstr, currentFolder), r.eic, notify.InCloseWrite); err != nil {
|
||||
r.log.Println(fmt.Sprintf("Waiting for: %s/%s/... to exist", r.folderstr, currentFolder))
|
||||
time.Sleep(t)
|
||||
}else{
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Read waits for InCloseWrite file event uses a bytes reader to copy
|
||||
|
@ -88,12 +111,12 @@ func (fw *FileWatcherReader) Read(p []byte) (n int, err error) {
|
|||
for {
|
||||
select {
|
||||
case ei := <-fw.eic:
|
||||
//log.Println("Got event:", ei)
|
||||
//r.log.Println("Got event:", ei)
|
||||
// New File, let's read its content
|
||||
var err error
|
||||
fw.curfile, err = os.Open(ei.Path())
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
fw.log.Fatal(err)
|
||||
}
|
||||
fw.watching = false
|
||||
break watchloop
|
||||
|
|
Loading…
Reference in New Issue