From cdc6b5da52ef40f0ddea27ffc741c01211a5dcd7 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 17 Feb 2021 15:05:17 +0100 Subject: [PATCH] add: [filerwatcher] wip --- inputreader/filewatcherreader.go | 92 ++++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 29 deletions(-) diff --git a/inputreader/filewatcherreader.go b/inputreader/filewatcherreader.go index 2d98b26..4fd0aa4 100644 --- a/inputreader/filewatcherreader.go +++ b/inputreader/filewatcherreader.go @@ -5,7 +5,6 @@ import ( "encoding/base64" "github.com/rjeczalik/notify" "io" - "io/ioutil" "log" "os" ) @@ -20,9 +19,13 @@ type FileWatcherReader struct { // TearDown channel exit chan string // Current buffer - buf []byte - // json json bool + // Current file + curfile *os.File + // Current state Watching / Reading + watching bool + // Insert Separator + insertsep bool } // NewFileWatcherReader creates a new FileWatcherReader @@ -30,8 +33,10 @@ type FileWatcherReader struct { func NewFileWatcherReader(f *os.File, j bool) (*FileWatcherReader, error) { r := &FileWatcherReader{ folderfd: f, - eic: make(chan notify.EventInfo, 1), - json: j, + eic: make(chan notify.EventInfo, 4096), + json: j, + watching: true, + insertsep: false, } // go routine holding the watcher go func() { @@ -47,43 +52,72 @@ func NewFileWatcherReader(f *os.File, j bool) (*FileWatcherReader, error) { // Read waits for InCloseWrite file event uses a bytes reader to copy // the resulting file encoded in b64 in p func (fw *FileWatcherReader) Read(p []byte) (n int, err error) { - for{ - select{ + + // Watching for new files to read + if fw.watching { + watchloop: + for { + select { case ei := <-fw.eic: - log.Println("Got event:", ei) + //log.Println("Got event:", ei) // New File, let's read its content var err error - fw.buf, err = ioutil.ReadFile(ei.Path()) + fw.curfile, err = os.Open(ei.Path()) if err != nil { log.Fatal(err) } - // if not json it could be anything so we encode it in b64 - if !fw.json{ - // base64 stream encoder - b64buf := new(bytes.Buffer) - b64encoder := base64.NewEncoder(base64.StdEncoding, b64buf) - // Encode in Base64 to b64buf - b64encoder.Write(fw.buf) - // Close the encoder to flush partially written blocks - b64encoder.Close() - b64buf.WriteString("\n") - //rreader := bytes.NewReader(fw.buf) - n, err = b64buf.Read(p) - }else{ - fw.buf = append(fw.buf, "\n"...) - rreader := bytes.NewReader(fw.buf) - n, err = rreader.Read(p) - return n, err - } - return n, err + fw.watching = false + break watchloop case <-fw.exit: // Exiting return 0, io.EOF + } } } + + // Inserting separator + if fw.insertsep{ + var buf []byte + buf = append(buf, "\n"...) + rreader := bytes.NewReader(buf) + n, err = rreader.Read(p) + fw.watching = true + fw.insertsep = false + log.Println("Inserting file seperator ") + fw.curfile.Close() + return n, err + } + + // Reading + // if not json it could be anything so we encode it in b64 + if !fw.json { + // base64 stream encoder + b64buffer := new(bytes.Buffer) + b64encoder := base64.NewEncoder(base64.StdEncoding, b64buffer) + // Read the file using p's length to get he correct size automatically + buf := make([]byte, len(p)) + n, err = fw.curfile.Read(buf) + // buf is the input + b64encoder.Write(buf) + // Close the encoder to flush partially written blocks + b64encoder.Close() + if err == io.EOF { + fw.insertsep = true + } + // Copy from b64buffer to p + n, err = b64buffer.Read(p) + // I keep a len(p) buffer size as python knows how to handle the padding + } else { + n, err = fw.curfile.Read(p) + if err == io.EOF { + fw.insertsep = true + } + } + //log.Println("nread: ", n) + return n, err } // Teardown is called on error to stop the Reading loop if needed func (rl *FileWatcherReader) Teardown() { rl.exit <- "exit" -} \ No newline at end of file +}