diff --git a/logcompiler/compiler.go b/logcompiler/compiler.go index 42bfa80..869dfa3 100644 --- a/logcompiler/compiler.go +++ b/logcompiler/compiler.go @@ -1,8 +1,8 @@ package logcompiler import ( + "io" "sync" - "time" "github.com/gomodule/redigo/redis" ) @@ -14,7 +14,7 @@ type ( // Parse to parse a line of log // Flush recomputes statisitcs and recompile output Compiler interface { - Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, *redis.Conn, int, string, int, int, *sync.WaitGroup) + Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup) Pull() error Flush() error Compile() error @@ -27,13 +27,8 @@ type ( r0 *redis.Conn // Compiler redis Write r1 *redis.Conn - // Input Read - r2 *redis.Conn - db int - // Dedicated queue - q string - // Time in minute before retrying - retryPeriod time.Duration + // Input Reader + reader io.Reader // Number of line to process before triggering output compilationTrigger int // Current line processed @@ -51,14 +46,11 @@ type ( ) // Set set the redis connections to this compiler -func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, rconn2 *redis.Conn, db int, queue string, ct int, rt int, compilegr *sync.WaitGroup) { +func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, reader io.Reader, ct int, compilegr *sync.WaitGroup) { s.r0 = rconn0 s.r1 = rconn1 - s.r2 = rconn2 - s.q = queue - s.db = db + s.reader = reader s.compilationTrigger = ct - s.retryPeriod = time.Duration(rt) * time.Minute s.compiling = false s.compilegr = compilegr } diff --git a/logcompiler/sshd.go b/logcompiler/sshd.go index f6d5121..6689c7d 100644 --- a/logcompiler/sshd.go +++ b/logcompiler/sshd.go @@ -37,8 +37,6 @@ type GrokedSSHD struct { SshdInvalidUser string `json:"sshd_invalid_user"` } -var m GrokedSSHD - // Flush recomputes statistics and recompile HTML output // TODO : review after refacto func (s *SSHDCompiler) Flush() error { @@ -103,75 +101,54 @@ func (s *SSHDCompiler) Flush() error { // Pull pulls a line of groked sshd logline from redis func (s *SSHDCompiler) Pull() error { r1 := *s.r1 - r2 := *s.r2 - for { + jsoner := json.NewDecoder(s.reader) - // Reading from specified database on r2 - input - if _, err := r2.Do("SELECT", s.db); err != nil { - r2.Close() + for jsoner.More() { + var m GrokedSSHD + err := jsoner.Decode(&m) + if err != nil { + log.Println(err) + } + fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser) + + // Assumes the system parses logs recorded during the current year + m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year()) + // TODO Make this automatic or a config parameter + loc, _ := time.LoadLocation("Europe/Luxembourg") + parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc) + m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10)) + + // Pushing loglines in database 0 + if _, err := r1.Do("SELECT", 0); err != nil { + r1.Close() return err } - grokedline, err := redis.Bytes(r2.Do("LPOP", s.q)) - fmt.Printf("%s\n", grokedline) + // Writing logs + _, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP)) + if err != nil { + r1.Close() + return err + } - if err == redis.ErrNil { - // redis queue empty, let's sleep for a while - time.Sleep(s.retryPeriod) - } else if err != nil { - log.Fatal(err) - } else { + err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname) + if err != nil { + r1.Close() + return err + } - if err != nil { - r1.Close() - r2.Close() - log.Fatal(err) - } - // Compile statistics - err = json.Unmarshal([]byte(grokedline), &m) - if err != nil { - log.Println(err) - } - fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser) - - // Assumes the system parses logs recorded during the current year - m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year()) - // TODO Make this automatic or a config parameter - loc, _ := time.LoadLocation("Europe/Luxembourg") - parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc) - m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10)) - - // Pushing loglines in database 0 - if _, err := r1.Do("SELECT", 0); err != nil { - r1.Close() - return err - } - - // Writing logs - _, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP)) - if err != nil { - r1.Close() - return err - } - - err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname) - if err != nil { - r1.Close() - return err - } - - // Compiler html / jsons - s.nbLines++ - if s.nbLines > s.compilationTrigger { - s.nbLines = 0 - //Non-blocking - if !s.compiling { - go s.Compile() - } + // Compiler html / jsons + s.nbLines++ + if s.nbLines > s.compilationTrigger { + s.nbLines = 0 + //Non-blocking + if !s.compiling { + go s.Compile() } } } + return nil } func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username string, host string) error { diff --git a/main.go b/main.go index 329d23a..3bfebb6 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package main import ( - "bufio" "flag" "fmt" "log" @@ -12,6 +11,7 @@ import ( "sync" "time" + "github.com/D4-project/analyzer-d4-log/inputreader" "github.com/D4-project/analyzer-d4-log/logcompiler" config "github.com/D4-project/d4-golang-utils/config" "github.com/gomodule/redigo/redis" @@ -175,8 +175,9 @@ func main() { log.Fatal("Could not connect to output line on Input Redis") } defer sshdrcon2.Close() + redisReader := inputreader.NewLPOPReader(&sshdrcon2, ri.redisDB, "sshd", *retry) sshd := logcompiler.SSHDCompiler{} - sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, &sshdrcon2, ri.redisDB, "sshd", compilationTrigger, *retry, &compilegr) + sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, redisReader, compilationTrigger, &compilegr) torun = append(torun, &sshd) } } @@ -201,24 +202,16 @@ func main() { log.Fatalf("Error opening seed file: %v", err) } defer f.Close() - scanner := bufio.NewScanner(f) - for scanner.Scan() { - // logline := scanner.Text() - // for _, v := range torun { - // err := v.Pull(logline) - // if err != nil { - // log.Fatal(err) - // } - // } - // nblines++ - // if nblines > compilationTrigger { - // nblines = 0 - // Non-blocking - // if !compiling.compiling { - // go compile() - // } - // } - } + // scanner := bufio.NewScanner(f) + // for scanner.Scan() { + // logline := scanner.Bytes() + // for _, v := range torun { + // go v.Pull() + // if err != nil { + // log.Fatal(err) + // } + // } + // } } else { // Launching Pull routines for _, v := range torun {