add: [grok] ingest from file

nifi
Jean-Louis Huynen 2020-03-10 16:31:53 +01:00
parent 9a4d57ee0a
commit aef4b518c0
No known key found for this signature in database
GPG Key ID: 64799157F4BD6B93
2 changed files with 26 additions and 26 deletions

View File

@ -15,6 +15,7 @@ type (
// Flush recomputes statisitcs and recompile output // Flush recomputes statisitcs and recompile output
Compiler interface { Compiler interface {
Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup) Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup)
SetReader(io.Reader)
Pull() error Pull() error
Flush() error Flush() error
Compile() error Compile() error
@ -54,3 +55,8 @@ func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *red
s.compiling = false s.compiling = false
s.compilegr = compilegr s.compilegr = compilegr
} }
// SetReader Changes compiler's input
func (s *CompilerStruct) SetReader(reader io.Reader) {
s.reader = reader
}

46
main.go
View File

@ -192,34 +192,28 @@ func main() {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
// TODO log.Println("Exit")
// compile() os.Exit(0)
} }
// TODO update that -- deprecated }
} else if *fromfile != "" {
f, err = os.Open(*fromfile) // Launching Pull routines
if err != nil { for _, v := range torun {
log.Fatalf("Error opening seed file: %v", err)
} // If we read from a file, we set the reader to os.open
defer f.Close() if *fromfile != "" {
// scanner := bufio.NewScanner(f) f, err = os.Open(*fromfile)
// for scanner.Scan() { if err != nil {
// logline := scanner.Bytes() log.Fatalf("Error opening seed file: %v", err)
// for _, v := range torun { }
// go v.Pull() defer f.Close()
// if err != nil { v.SetReader(f)
// log.Fatal(err)
// }
// }
// }
} else {
// Launching Pull routines
for _, v := range torun {
// we add pulling routines to a waitgroup,
// they can immediately die when exiting.
pullgr.Add(1)
go v.Pull()
} }
// we add pulling routines to a waitgroup,
// they can immediately die when exiting.
pullgr.Add(1)
go v.Pull()
} }
pullgr.Wait() pullgr.Wait()