diff --git a/inputreader/filewatcherreader.go b/inputreader/filewatcherreader.go index 276b21c..8624ada 100644 --- a/inputreader/filewatcherreader.go +++ b/inputreader/filewatcherreader.go @@ -2,63 +2,72 @@ package inputreader import ( "bytes" - "github.com/gomodule/redigo/redis" + "encoding/base64" "github.com/rjeczalik/notify" "io" + "io/ioutil" "log" "os" ) -// FileWatcherReader is a abstraction a folder watcher +// FileWatcherReader is an abstraction of a folder watcher // and behaves like a reader type FileWatcherReader struct { // Folder to watch - folderfd os.File + folderfd *os.File // Notify Channel eic chan notify.EventInfo + // TearDown channel + exit chan string // Current buffer buf []byte } -// NewLPOPReader creates a new RedisLPOPReader -func NewFileWatcherReader(f os.File) (*FileWatcherReader, error) { +// NewFileWatcherReader creates a new FileWatcherReader +func NewFileWatcherReader(f *os.File) (*FileWatcherReader, error) { r := &FileWatcherReader{ folderfd: f, eic: make(chan notify.EventInfo, 1), } - return r, nil } -// Read waits for new file event uses a bytes reader to copy +// Read waits for InCloseWrite file event uses a bytes reader to copy // the resulting file in p func (fw *FileWatcherReader) Read(p []byte) (n int, err error) { - if err := notify.Watch("./...", fw.eic, notify.Remove); err != nil { + if err := notify.Watch("./...", fw.eic, notify.InCloseWrite); err != nil { log.Fatal(err) } defer notify.Stop(fw.eic) - // Create a go routing listening the the channel - - // select, on new event, stream content of the file - - // Block until event is received - ei := <-fw.eic - log.Println("Got event:", ei) - - // TODO grab the ei.Path content - - // Encode it in base64 - // push in buffer - // add \n - - //buf = append(buf, "\n"...) - //rreader := bytes.NewReader(buf) - //n, err = rreader.Read(p) - //return n, err + for{ + select{ + case ei := <-fw.eic: + // New File, let's read its content + var err error + fw.buf, err = ioutil.ReadFile(ei.Path()) + if err != nil { + log.Fatal(err) + } + // base64 stream encoder + b64buf := new(bytes.Buffer) + b64encoder := base64.NewEncoder(base64.StdEncoding, b64buf) + // Encode in Base64 to b64buf + b64encoder.Write(fw.buf) + // Close the encoder to flush partially written blocks + b64encoder.Close() + b64buf.WriteString("\n") + //rreader := bytes.NewReader(fw.buf) + n, err = b64buf.Read(p) + return n, err + case <-fw.exit: + // Exiting + return 0, io.EOF + } + } } -// Teardown is called on error to close the redis connection -func (rl *RedisLPOPReader) Teardown() { - (*rl.r).Close() +// Teardown is called on error to stop the Reading loop if needed +func (rl *FileWatcherReader) Teardown() { + rl.exit <- "exit" } \ No newline at end of file