diff --git a/inputreader/filewatcherreader.go b/inputreader/filewatcherreader.go index 268b06e..5693c1d 100644 --- a/inputreader/filewatcherreader.go +++ b/inputreader/filewatcherreader.go @@ -22,7 +22,8 @@ type FileWatcherReader struct { // TearDown channel exit chan string // Chan used to restart the watching channel on a new folder - dailyswitch chan string + dailySwitch chan bool + dailySwitchExit chan bool // Current buffer json bool // Current file @@ -31,17 +32,22 @@ type FileWatcherReader struct { watching bool // Insert Separator insertsep bool + // logging + log * log.Logger } // NewFileWatcherReader creates a new FileWatcherReader // json specifies whether we now we handle json files -func NewFileWatcherReader(f string, j bool, daily bool) (*FileWatcherReader, error) { +func NewFileWatcherReader(f string, j bool, daily bool, logger *log.Logger) (*FileWatcherReader, error) { r := &FileWatcherReader{ - folderstr: f, - eic: make(chan notify.EventInfo, 4096), - json: j, - watching: true, - insertsep: false, + folderstr: f, + eic: make(chan notify.EventInfo, 4096), + dailySwitch: make(chan bool), + dailySwitchExit: make(chan bool), + json: j, + watching: true, + insertsep: false, + log: logger, } // go routine holding the watcher go setUpWatcher(r, daily) @@ -50,8 +56,12 @@ func NewFileWatcherReader(f string, j bool, daily bool) (*FileWatcherReader, err if daily { c := cron.New() c.AddFunc("@midnight", func() { - //c.AddFunc("@every 1m", func() { - r.dailyswitch <- "switch" + //c.AddFunc("@every 10s", func() { + // Sending exit signal to setUpWatcher + r.dailySwitch <- true + // Waiting for exit signal from setUpWatcher + <-r.dailySwitchExit + go setUpWatcher(r, daily) }) c.Start() } @@ -61,21 +71,34 @@ func NewFileWatcherReader(f string, j bool, daily bool) (*FileWatcherReader, err // setUpWatcher holds the watcher func setUpWatcher(r *FileWatcherReader, daily bool) { if daily { - dt := time.Now() - //Format YYYYMMDD // TODO make it customizable - currentFolder := dt.Format("20060102") - log.Println(fmt.Sprintf("Watching : %s/%s/...", r.folderstr, currentFolder)) - if err := notify.Watch(fmt.Sprintf("%s/%s/...", r.folderstr, currentFolder), r.eic, notify.InCloseWrite); err != nil { - log.Fatal(err) - } + t, _ := time.ParseDuration("1s") + retryWatch(r, t) } else { if err := notify.Watch(fmt.Sprintf("%s/...", r.folderstr), r.eic, notify.InCloseWrite); err != nil { log.Fatal(err) } } defer notify.Stop(r.eic) - <-r.dailyswitch + <-r.dailySwitch + r.dailySwitchExit <- true +} + +// retryWatch tries to set up the watcher until it works every t +func retryWatch(r *FileWatcherReader, t time.Duration) { + dt := time.Now() + //Format YYYYMMDD + // TODO make it customizable + currentFolder := dt.Format("20060102") + r.log.Println(fmt.Sprintf("Watching: %s/%s/...", r.folderstr, currentFolder)) + for { + if err := notify.Watch(fmt.Sprintf("%s/%s/...", r.folderstr, currentFolder), r.eic, notify.InCloseWrite); err != nil { + r.log.Println(fmt.Sprintf("Waiting for: %s/%s/... to exist", r.folderstr, currentFolder)) + time.Sleep(t) + }else{ + return + } + } } // Read waits for InCloseWrite file event uses a bytes reader to copy @@ -88,12 +111,12 @@ func (fw *FileWatcherReader) Read(p []byte) (n int, err error) { for { select { case ei := <-fw.eic: - //log.Println("Got event:", ei) + //r.log.Println("Got event:", ei) // New File, let's read its content var err error fw.curfile, err = os.Open(ei.Path()) if err != nil { - log.Fatal(err) + fw.log.Fatal(err) } fw.watching = false break watchloop