diff --git a/go.mod b/go.mod index c46dfcc..872354c 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/D4-project/analyzer-d4-log go 1.13 require ( - github.com/D4-project/d4-golang-utils v0.1.2 + github.com/D4-project/d4-golang-utils v0.1.5 github.com/ajstarks/svgo v0.0.0-20200204031535-0cbcf57ea1d8 // indirect github.com/gomodule/redigo v2.0.0+incompatible github.com/jung-kurt/gofpdf v1.16.2 // indirect diff --git a/go.sum b/go.sum index e5e839d..28d5186 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/D4-project/d4-golang-utils v0.1.2 h1:aLdvwIR2CFvIn2FnqPjbHxzLeo3ZL7YEyhCXRL6a9kQ= github.com/D4-project/d4-golang-utils v0.1.2/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc= +github.com/D4-project/d4-golang-utils v0.1.5 h1:0aL2gv0uc56Gn2NwQY8L2C6OQRdq0LpoioAeLWs6zZc= +github.com/D4-project/d4-golang-utils v0.1.5/go.mod h1:GGR5KMhvABZtIfmS5jZkwQnBoP+9/V0ZEETSGiWLaM4= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af h1:wVe6/Ea46ZMeNkQjjBW6xcqyQA/j5e0D6GytH95g0gQ= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/ajstarks/svgo v0.0.0-20200204031535-0cbcf57ea1d8 h1:LMjxfr9tcHP10YI+i4+cjHWSjPeUAUy5+sqw5FhFzwE= diff --git a/inputreader/redisreader.go b/inputreader/redisreader.go index 2570078..1f27dfb 100644 --- a/inputreader/redisreader.go +++ b/inputreader/redisreader.go @@ -2,15 +2,13 @@ package inputreader import ( "bytes" + "github.com/gomodule/redigo/redis" "io" "log" - "time" - - "github.com/gomodule/redigo/redis" ) // RedisLPOPReader is a abstraction of LPOP list -// and behaves likes a scanner +// and behaves like a reader type RedisLPOPReader struct { // D4 redis connection r *redis.Conn @@ -18,14 +16,12 @@ type RedisLPOPReader struct { d int // D4 Queue storing q string - // Time in minute before retrying - retryPeriod time.Duration // Current buffer buf []byte } // NewLPOPReader creates a new RedisLPOPScanner -func NewLPOPReader(rc *redis.Conn, db int, queue string, rt int) *RedisLPOPReader { +func NewLPOPReader(rc *redis.Conn, db int, queue string) *RedisLPOPReader { rr := *rc if _, err := rr.Do("SELECT", db); err != nil { @@ -37,7 +33,6 @@ func NewLPOPReader(rc *redis.Conn, db int, queue string, rt int) *RedisLPOPReade r: rc, d: db, q: queue, - retryPeriod: time.Duration(rt) * time.Minute, } } diff --git a/logcompiler/compiler.go b/logcompiler/compiler.go index 49ffb73..4e4740a 100644 --- a/logcompiler/compiler.go +++ b/logcompiler/compiler.go @@ -3,6 +3,7 @@ package logcompiler import ( "io" "sync" + "time" "github.com/D4-project/analyzer-d4-log/inputreader" "github.com/gomodule/redigo/redis" @@ -15,7 +16,7 @@ type ( // Parse to parse a line of log // Flush recomputes statisitcs and recompile output Compiler interface { - Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup, *chan error) + Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup, *chan error, time.Duration) SetReader(io.Reader) Pull(chan error) Flush() error @@ -40,6 +41,8 @@ type ( pullreturn *chan error // Comutex embedding comutex + // retry Period when applicable + retryPeriod time.Duration } comutex struct { @@ -49,7 +52,7 @@ type ( ) // Set set the redis connections to this compiler -func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, reader io.Reader, ct int, compilegr *sync.WaitGroup, c *chan error) { +func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, reader io.Reader, ct int, compilegr *sync.WaitGroup, c *chan error, retry time.Duration) { s.r0 = rconn0 s.r1 = rconn1 s.reader = reader @@ -57,6 +60,7 @@ func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *red s.compiling = false s.compilegr = compilegr s.pullreturn = c + s.retryPeriod = retry } // SetReader Changes compiler's input diff --git a/logcompiler/sshd.go b/logcompiler/sshd.go index be5a011..3b6d6f6 100644 --- a/logcompiler/sshd.go +++ b/logcompiler/sshd.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "html/template" + "io" "io/ioutil" "log" "math" @@ -91,12 +92,19 @@ func (s *SSHDCompiler) Pull(c chan error) { jsoner := json.NewDecoder(s.reader) +DecodeLoop: for jsoner.More() { var m GrokedSSHD err := jsoner.Decode(&m) - if err != nil { - log.Println(err) + if err := jsoner.Decode(&m); err == io.EOF { + // In case of EOF, we wait for the reader to have + // new data available + time.Sleep(s.retryPeriod) + continue DecodeLoop + } else if err != nil { + log.Fatal(err) } + fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser) // Assumes the system parses logs recorded during the current year diff --git a/main.go b/main.go index b9cc682..75a10da 100644 --- a/main.go +++ b/main.go @@ -37,13 +37,14 @@ type ( // Setting up flags var ( + tmpretry, _ = time.ParseDuration("30s") // Flags confdir = flag.String("c", "conf.sample", "configuration directory") all = flag.Bool("a", true, "run all compilers when set. Set by default") specific = flag.String("o", "", "run only a specific parser [sshd]") debug = flag.Bool("d", false, "debug info in logs") fromfile = flag.String("f", "", "parse from file on disk") - retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue") + retry = flag.Duration("r", tmpretry, "Time in human format before retrying to read an empty d4 queue") flush = flag.Bool("F", false, "Flush HTML output, recompile all statistic from redis logs, then quits") // Pools of redis connections redisCompilers *redis.Pool @@ -100,9 +101,8 @@ func main() { fmt.Printf("The configuration directory should hold the following files\n") fmt.Printf("to specify the settings to use:\n\n") fmt.Printf(" mandatory: redis_d4 - host:port/db\n") - fmt.Printf(" mandatory: redis_queue - uuid\n") fmt.Printf(" mandatory: redis_compilers - host:port/maxdb\n") - fmt.Printf(" optional: http_server - host:port\n\n") + // fmt.Printf(" optional: http_server - host:port\n\n") fmt.Printf("See conf.sample for an example.\n") } @@ -186,9 +186,9 @@ func main() { log.Fatal("Could not connect to output line on Input Redis") } defer sshdrcon2.Close() - redisReader := inputreader.NewLPOPReader(&sshdrcon2, ri.redisDB, "sshd", *retry) + redisReader := inputreader.NewLPOPReader(&sshdrcon2, ri.redisDB, "sshd") sshd := logcompiler.SSHDCompiler{} - sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, redisReader, compilationTrigger, &compilegr, &pullreturn) + sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, redisReader, compilationTrigger, &compilegr, &pullreturn, *retry) torun = append(torun, &sshd) } }