From 1e9e4a28d047833aefde68eac9174a661b7a83dd Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Tue, 2 Mar 2021 11:53:02 +0100 Subject: [PATCH] chg: [filewatcher] daily rotation of watched folder --- go.mod | 4 +++ go.sum | 19 ++++++++++++ inputreader/filewatcherreader.go | 50 +++++++++++++++++++++++++------- 3 files changed, 62 insertions(+), 11 deletions(-) diff --git a/go.mod b/go.mod index b9163eb..80e291d 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,9 @@ go 1.12 require ( github.com/gofrs/uuid v3.2.0+incompatible github.com/gomodule/redigo v2.0.0+incompatible + github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/redislabs/redisgraph-go v2.0.2+incompatible github.com/rjeczalik/notify v0.9.2 + github.com/robfig/cron/v3 v3.0.1 + github.com/stretchr/testify v1.7.0 // indirect ) diff --git a/go.sum b/go.sum index 8ec60ac..5def256 100644 --- a/go.sum +++ b/go.sum @@ -1,8 +1,27 @@ +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= +github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0= +github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= +github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec= +github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/redislabs/redisgraph-go v2.0.2+incompatible h1:HW9BqUhvgHeKXnUCrv9HOLPL/QFYxxzR2hsJClTzWk8= +github.com/redislabs/redisgraph-go v2.0.2+incompatible/go.mod h1:GYn4oUFkbkHx49xm2H4G8jZziqKDKdRtDUuTBZTmqBE= github.com/rjeczalik/notify v0.9.2 h1:MiTWrPj55mNDHEiIX5YUSKefw/+lCQVoAFmD6oQm5w8= github.com/rjeczalik/notify v0.9.2/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7 h1:bit1t3mgdR35yN0cX0G8orgLtOuyL9Wqxa1mccLB0ig= golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/inputreader/filewatcherreader.go b/inputreader/filewatcherreader.go index 49dc952..268b06e 100644 --- a/inputreader/filewatcherreader.go +++ b/inputreader/filewatcherreader.go @@ -5,9 +5,11 @@ import ( "encoding/base64" "fmt" "github.com/rjeczalik/notify" + "github.com/robfig/cron/v3" "io" "log" "os" + "time" ) // FileWatcherReader is an abstraction of a folder watcher @@ -19,6 +21,8 @@ type FileWatcherReader struct { eic chan notify.EventInfo // TearDown channel exit chan string + // Chan used to restart the watching channel on a new folder + dailyswitch chan string // Current buffer json bool // Current file @@ -31,23 +35,47 @@ type FileWatcherReader struct { // NewFileWatcherReader creates a new FileWatcherReader // json specifies whether we now we handle json files -func NewFileWatcherReader(f string, j bool) (*FileWatcherReader, error) { +func NewFileWatcherReader(f string, j bool, daily bool) (*FileWatcherReader, error) { r := &FileWatcherReader{ folderstr: f, - eic: make(chan notify.EventInfo, 4096), - json: j, - watching: true, + eic: make(chan notify.EventInfo, 4096), + json: j, + watching: true, insertsep: false, } // go routine holding the watcher - go func() { - if err := notify.Watch(fmt.Sprintf("%s/...", r.folderstr ), r.eic, notify.InCloseWrite); err != nil { + go setUpWatcher(r, daily) + + // cron task to add daily folder to watch + if daily { + c := cron.New() + c.AddFunc("@midnight", func() { + //c.AddFunc("@every 1m", func() { + r.dailyswitch <- "switch" + }) + c.Start() + } + return r, nil +} + +// setUpWatcher holds the watcher +func setUpWatcher(r *FileWatcherReader, daily bool) { + if daily { + dt := time.Now() + //Format YYYYMMDD + // TODO make it customizable + currentFolder := dt.Format("20060102") + log.Println(fmt.Sprintf("Watching : %s/%s/...", r.folderstr, currentFolder)) + if err := notify.Watch(fmt.Sprintf("%s/%s/...", r.folderstr, currentFolder), r.eic, notify.InCloseWrite); err != nil { log.Fatal(err) } - defer notify.Stop(r.eic) - <-r.exit - }() - return r, nil + } else { + if err := notify.Watch(fmt.Sprintf("%s/...", r.folderstr), r.eic, notify.InCloseWrite); err != nil { + log.Fatal(err) + } + } + defer notify.Stop(r.eic) + <-r.dailyswitch } // Read waits for InCloseWrite file event uses a bytes reader to copy @@ -77,7 +105,7 @@ func (fw *FileWatcherReader) Read(p []byte) (n int, err error) { } // Inserting separator - if fw.insertsep{ + if fw.insertsep { var buf []byte buf = append(buf, "\n"...) rreader := bytes.NewReader(buf)