chg: [sshd] retry on redisreader EOF

nifi
Jean-Louis Huynen 2020-05-27 17:16:11 +02:00
parent 389d070f5a
commit f69c9348da
No known key found for this signature in database
GPG Key ID: 64799157F4BD6B93
6 changed files with 27 additions and 18 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/D4-project/analyzer-d4-log
go 1.13 go 1.13
require ( require (
github.com/D4-project/d4-golang-utils v0.1.2 github.com/D4-project/d4-golang-utils v0.1.5
github.com/ajstarks/svgo v0.0.0-20200204031535-0cbcf57ea1d8 // indirect github.com/ajstarks/svgo v0.0.0-20200204031535-0cbcf57ea1d8 // indirect
github.com/gomodule/redigo v2.0.0+incompatible github.com/gomodule/redigo v2.0.0+incompatible
github.com/jung-kurt/gofpdf v1.16.2 // indirect github.com/jung-kurt/gofpdf v1.16.2 // indirect

2
go.sum
View File

@ -1,6 +1,8 @@
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/D4-project/d4-golang-utils v0.1.2 h1:aLdvwIR2CFvIn2FnqPjbHxzLeo3ZL7YEyhCXRL6a9kQ= github.com/D4-project/d4-golang-utils v0.1.2 h1:aLdvwIR2CFvIn2FnqPjbHxzLeo3ZL7YEyhCXRL6a9kQ=
github.com/D4-project/d4-golang-utils v0.1.2/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc= github.com/D4-project/d4-golang-utils v0.1.2/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc=
github.com/D4-project/d4-golang-utils v0.1.5 h1:0aL2gv0uc56Gn2NwQY8L2C6OQRdq0LpoioAeLWs6zZc=
github.com/D4-project/d4-golang-utils v0.1.5/go.mod h1:GGR5KMhvABZtIfmS5jZkwQnBoP+9/V0ZEETSGiWLaM4=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af h1:wVe6/Ea46ZMeNkQjjBW6xcqyQA/j5e0D6GytH95g0gQ= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af h1:wVe6/Ea46ZMeNkQjjBW6xcqyQA/j5e0D6GytH95g0gQ=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/ajstarks/svgo v0.0.0-20200204031535-0cbcf57ea1d8 h1:LMjxfr9tcHP10YI+i4+cjHWSjPeUAUy5+sqw5FhFzwE= github.com/ajstarks/svgo v0.0.0-20200204031535-0cbcf57ea1d8 h1:LMjxfr9tcHP10YI+i4+cjHWSjPeUAUy5+sqw5FhFzwE=

View File

@ -2,15 +2,13 @@ package inputreader
import ( import (
"bytes" "bytes"
"github.com/gomodule/redigo/redis"
"io" "io"
"log" "log"
"time"
"github.com/gomodule/redigo/redis"
) )
// RedisLPOPReader is a abstraction of LPOP list // RedisLPOPReader is a abstraction of LPOP list
// and behaves likes a scanner // and behaves like a reader
type RedisLPOPReader struct { type RedisLPOPReader struct {
// D4 redis connection // D4 redis connection
r *redis.Conn r *redis.Conn
@ -18,14 +16,12 @@ type RedisLPOPReader struct {
d int d int
// D4 Queue storing // D4 Queue storing
q string q string
// Time in minute before retrying
retryPeriod time.Duration
// Current buffer // Current buffer
buf []byte buf []byte
} }
// NewLPOPReader creates a new RedisLPOPScanner // NewLPOPReader creates a new RedisLPOPScanner
func NewLPOPReader(rc *redis.Conn, db int, queue string, rt int) *RedisLPOPReader { func NewLPOPReader(rc *redis.Conn, db int, queue string) *RedisLPOPReader {
rr := *rc rr := *rc
if _, err := rr.Do("SELECT", db); err != nil { if _, err := rr.Do("SELECT", db); err != nil {
@ -37,7 +33,6 @@ func NewLPOPReader(rc *redis.Conn, db int, queue string, rt int) *RedisLPOPReade
r: rc, r: rc,
d: db, d: db,
q: queue, q: queue,
retryPeriod: time.Duration(rt) * time.Minute,
} }
} }

View File

@ -3,6 +3,7 @@ package logcompiler
import ( import (
"io" "io"
"sync" "sync"
"time"
"github.com/D4-project/analyzer-d4-log/inputreader" "github.com/D4-project/analyzer-d4-log/inputreader"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
@ -15,7 +16,7 @@ type (
// Parse to parse a line of log // Parse to parse a line of log
// Flush recomputes statisitcs and recompile output // Flush recomputes statisitcs and recompile output
Compiler interface { Compiler interface {
Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup, *chan error) Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup, *chan error, time.Duration)
SetReader(io.Reader) SetReader(io.Reader)
Pull(chan error) Pull(chan error)
Flush() error Flush() error
@ -40,6 +41,8 @@ type (
pullreturn *chan error pullreturn *chan error
// Comutex embedding // Comutex embedding
comutex comutex
// retry Period when applicable
retryPeriod time.Duration
} }
comutex struct { comutex struct {
@ -49,7 +52,7 @@ type (
) )
// Set set the redis connections to this compiler // Set set the redis connections to this compiler
func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, reader io.Reader, ct int, compilegr *sync.WaitGroup, c *chan error) { func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, reader io.Reader, ct int, compilegr *sync.WaitGroup, c *chan error, retry time.Duration) {
s.r0 = rconn0 s.r0 = rconn0
s.r1 = rconn1 s.r1 = rconn1
s.reader = reader s.reader = reader
@ -57,6 +60,7 @@ func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *red
s.compiling = false s.compiling = false
s.compilegr = compilegr s.compilegr = compilegr
s.pullreturn = c s.pullreturn = c
s.retryPeriod = retry
} }
// SetReader Changes compiler's input // SetReader Changes compiler's input

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"html/template" "html/template"
"io"
"io/ioutil" "io/ioutil"
"log" "log"
"math" "math"
@ -91,12 +92,19 @@ func (s *SSHDCompiler) Pull(c chan error) {
jsoner := json.NewDecoder(s.reader) jsoner := json.NewDecoder(s.reader)
DecodeLoop:
for jsoner.More() { for jsoner.More() {
var m GrokedSSHD var m GrokedSSHD
err := jsoner.Decode(&m) err := jsoner.Decode(&m)
if err != nil { if err := jsoner.Decode(&m); err == io.EOF {
log.Println(err) // In case of EOF, we wait for the reader to have
// new data available
time.Sleep(s.retryPeriod)
continue DecodeLoop
} else if err != nil {
log.Fatal(err)
} }
fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser) fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser)
// Assumes the system parses logs recorded during the current year // Assumes the system parses logs recorded during the current year

10
main.go
View File

@ -37,13 +37,14 @@ type (
// Setting up flags // Setting up flags
var ( var (
tmpretry, _ = time.ParseDuration("30s")
// Flags // Flags
confdir = flag.String("c", "conf.sample", "configuration directory") confdir = flag.String("c", "conf.sample", "configuration directory")
all = flag.Bool("a", true, "run all compilers when set. Set by default") all = flag.Bool("a", true, "run all compilers when set. Set by default")
specific = flag.String("o", "", "run only a specific parser [sshd]") specific = flag.String("o", "", "run only a specific parser [sshd]")
debug = flag.Bool("d", false, "debug info in logs") debug = flag.Bool("d", false, "debug info in logs")
fromfile = flag.String("f", "", "parse from file on disk") fromfile = flag.String("f", "", "parse from file on disk")
retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue") retry = flag.Duration("r", tmpretry, "Time in human format before retrying to read an empty d4 queue")
flush = flag.Bool("F", false, "Flush HTML output, recompile all statistic from redis logs, then quits") flush = flag.Bool("F", false, "Flush HTML output, recompile all statistic from redis logs, then quits")
// Pools of redis connections // Pools of redis connections
redisCompilers *redis.Pool redisCompilers *redis.Pool
@ -100,9 +101,8 @@ func main() {
fmt.Printf("The configuration directory should hold the following files\n") fmt.Printf("The configuration directory should hold the following files\n")
fmt.Printf("to specify the settings to use:\n\n") fmt.Printf("to specify the settings to use:\n\n")
fmt.Printf(" mandatory: redis_d4 - host:port/db\n") fmt.Printf(" mandatory: redis_d4 - host:port/db\n")
fmt.Printf(" mandatory: redis_queue - uuid\n")
fmt.Printf(" mandatory: redis_compilers - host:port/maxdb\n") fmt.Printf(" mandatory: redis_compilers - host:port/maxdb\n")
fmt.Printf(" optional: http_server - host:port\n\n") // fmt.Printf(" optional: http_server - host:port\n\n")
fmt.Printf("See conf.sample for an example.\n") fmt.Printf("See conf.sample for an example.\n")
} }
@ -186,9 +186,9 @@ func main() {
log.Fatal("Could not connect to output line on Input Redis") log.Fatal("Could not connect to output line on Input Redis")
} }
defer sshdrcon2.Close() defer sshdrcon2.Close()
redisReader := inputreader.NewLPOPReader(&sshdrcon2, ri.redisDB, "sshd", *retry) redisReader := inputreader.NewLPOPReader(&sshdrcon2, ri.redisDB, "sshd")
sshd := logcompiler.SSHDCompiler{} sshd := logcompiler.SSHDCompiler{}
sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, redisReader, compilationTrigger, &compilegr, &pullreturn) sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, redisReader, compilationTrigger, &compilegr, &pullreturn, *retry)
torun = append(torun, &sshd) torun = append(torun, &sshd)
} }
} }