chg: [sshd] retry delay for empty queue + flag for parsing a file

nifi
Jean-Louis Huynen 2020-01-31 11:44:01 +01:00
parent 587d418a1a
commit 688ca7123d
No known key found for this signature in database
GPG Key ID: 64799157F4BD6B93
2 changed files with 64 additions and 32 deletions

View File

@ -5,6 +5,7 @@ import (
"log"
"math"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
@ -175,12 +176,23 @@ func (s *SshdParser) Compile() error {
p.NominalY(keys...)
// Create folder to store plots
if _, err := os.Stat(stype[0]); os.IsNotExist(err) {
os.Mkdir(stype[0], 0700)
if _, err := os.Stat("data"); os.IsNotExist(err) {
err := os.Mkdir("data", 0700)
if err != nil {
return err
}
}
if _, err := os.Stat(filepath.Join("data", stype[0])); os.IsNotExist(err) {
err := os.Mkdir(filepath.Join("data", stype[0]), 0700)
if err != nil {
return err
}
}
xsize := 3 + vg.Length(math.Round(float64(len(keys)/2)))
if err := p.Save(15*vg.Centimeter, xsize*vg.Centimeter, fmt.Sprintf("data/%v/%v.svg", stype[0], v)); err != nil {
if err := p.Save(15*vg.Centimeter, xsize*vg.Centimeter, filepath.Join("data", stype[0], fmt.Sprintf("%v.svg", v))); err != nil {
return err
}

78
main.go
View File

@ -1,7 +1,7 @@
package main
import (
"errors"
"bufio"
"flag"
"fmt"
"log"
@ -45,6 +45,8 @@ var (
all = flag.Bool("a", true, "run all parsers when set. Set by default")
specific = flag.String("o", "", "run only a specific parser [sshd]")
debug = flag.Bool("d", false, "debug info in logs")
fromfile = flag.String("f", "", "parse from file on disk")
retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue")
redisD4 redis.Conn
redisParsers *redis.Pool
parsers = [1]string{"sshd"}
@ -175,38 +177,56 @@ func main() {
log.Println("TODO should run specific parser here")
}
f, err = os.Open("./test_seed.log")
if err != nil {
log.Fatalf("Error opening test file: %v", err)
}
defer f.Close()
// scanner := bufio.NewScanner(f)
// for scanner.Scan() {
// Pop D4 redis queue
for {
err := errors.New("")
logline, err := redis.String(redisD4.Do("LPOP", "analyzer:3:"+rd4.redisQueue))
// logline := scanner.Text()
// Parsing loop
if *fromfile != "" {
f, err = os.Open(*fromfile)
if err != nil {
log.Fatal(err)
log.Fatalf("Error opening seed file: %v", err)
}
fmt.Println(logline)
// Run the parsers
for _, v := range torun {
err := v.Parse(logline)
if err != nil {
log.Fatal(err)
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
logline := scanner.Text()
for _, v := range torun {
err := v.Parse(logline)
if err != nil {
log.Fatal(err)
}
}
nblines++
if nblines > compilationTrigger {
nblines = 0
// Non-blocking
if !compiling.compiling {
go compile()
}
}
}
nblines++
if nblines > compilationTrigger {
nblines = 0
// Non-blocking
if !compiling.compiling {
go compile()
} else {
// Pop D4 redis queue
for {
logline, err := redis.String(redisD4.Do("LPOP", "analyzer:3:"+rd4.redisQueue))
if err == redis.ErrNil {
// redis queue empty, let's sleep for a while
time.Sleep(time.Duration(*retry) * time.Minute)
} else if err != nil {
log.Fatal(err)
// let's parse
} else {
for _, v := range torun {
err := v.Parse(logline)
if err != nil {
log.Fatal(err)
}
}
nblines++
if nblines > compilationTrigger {
nblines = 0
// Non-blocking
if !compiling.compiling {
go compile()
}
}
}
}
}