From 688ca7123dbfcb53e10e90726a99d8bc15ca7497 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Fri, 31 Jan 2020 11:44:01 +0100 Subject: [PATCH] chg: [sshd] retry delay for empty queue + flag for parsing a file --- logparser/sshd.go | 18 +++++++++-- main.go | 78 +++++++++++++++++++++++++++++------------------ 2 files changed, 64 insertions(+), 32 deletions(-) diff --git a/logparser/sshd.go b/logparser/sshd.go index 9a02edc..b6a5fa8 100644 --- a/logparser/sshd.go +++ b/logparser/sshd.go @@ -5,6 +5,7 @@ import ( "log" "math" "os" + "path/filepath" "regexp" "strconv" "strings" @@ -175,12 +176,23 @@ func (s *SshdParser) Compile() error { p.NominalY(keys...) // Create folder to store plots - if _, err := os.Stat(stype[0]); os.IsNotExist(err) { - os.Mkdir(stype[0], 0700) + + if _, err := os.Stat("data"); os.IsNotExist(err) { + err := os.Mkdir("data", 0700) + if err != nil { + return err + } + } + + if _, err := os.Stat(filepath.Join("data", stype[0])); os.IsNotExist(err) { + err := os.Mkdir(filepath.Join("data", stype[0]), 0700) + if err != nil { + return err + } } xsize := 3 + vg.Length(math.Round(float64(len(keys)/2))) - if err := p.Save(15*vg.Centimeter, xsize*vg.Centimeter, fmt.Sprintf("data/%v/%v.svg", stype[0], v)); err != nil { + if err := p.Save(15*vg.Centimeter, xsize*vg.Centimeter, filepath.Join("data", stype[0], fmt.Sprintf("%v.svg", v))); err != nil { return err } diff --git a/main.go b/main.go index ccf5f81..2409ea7 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,7 @@ package main import ( - "errors" + "bufio" "flag" "fmt" "log" @@ -45,6 +45,8 @@ var ( all = flag.Bool("a", true, "run all parsers when set. Set by default") specific = flag.String("o", "", "run only a specific parser [sshd]") debug = flag.Bool("d", false, "debug info in logs") + fromfile = flag.String("f", "", "parse from file on disk") + retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue") redisD4 redis.Conn redisParsers *redis.Pool parsers = [1]string{"sshd"} @@ -175,38 +177,56 @@ func main() { log.Println("TODO should run specific parser here") } - f, err = os.Open("./test_seed.log") - if err != nil { - log.Fatalf("Error opening test file: %v", err) - } - defer f.Close() - // scanner := bufio.NewScanner(f) - // for scanner.Scan() { - - // Pop D4 redis queue - for { - - err := errors.New("") - logline, err := redis.String(redisD4.Do("LPOP", "analyzer:3:"+rd4.redisQueue)) - // logline := scanner.Text() + // Parsing loop + if *fromfile != "" { + f, err = os.Open(*fromfile) if err != nil { - log.Fatal(err) + log.Fatalf("Error opening seed file: %v", err) } - fmt.Println(logline) - - // Run the parsers - for _, v := range torun { - err := v.Parse(logline) - if err != nil { - log.Fatal(err) + defer f.Close() + scanner := bufio.NewScanner(f) + for scanner.Scan() { + logline := scanner.Text() + for _, v := range torun { + err := v.Parse(logline) + if err != nil { + log.Fatal(err) + } + } + nblines++ + if nblines > compilationTrigger { + nblines = 0 + // Non-blocking + if !compiling.compiling { + go compile() + } } } - nblines++ - if nblines > compilationTrigger { - nblines = 0 - // Non-blocking - if !compiling.compiling { - go compile() + } else { + // Pop D4 redis queue + for { + logline, err := redis.String(redisD4.Do("LPOP", "analyzer:3:"+rd4.redisQueue)) + if err == redis.ErrNil { + // redis queue empty, let's sleep for a while + time.Sleep(time.Duration(*retry) * time.Minute) + } else if err != nil { + log.Fatal(err) + // let's parse + } else { + for _, v := range torun { + err := v.Parse(logline) + if err != nil { + log.Fatal(err) + } + } + nblines++ + if nblines > compilationTrigger { + nblines = 0 + // Non-blocking + if !compiling.compiling { + go compile() + } + } } } }