add: [grok] Stream d4 redis pulling

nifi
Jean-Louis Huynen 2020-03-10 16:02:24 +01:00
parent 593c6425b5
commit 9a4d57ee0a
No known key found for this signature in database
GPG Key ID: 64799157F4BD6B93
3 changed files with 57 additions and 95 deletions

View File

@ -1,8 +1,8 @@
package logcompiler
import (
"io"
"sync"
"time"
"github.com/gomodule/redigo/redis"
)
@ -14,7 +14,7 @@ type (
// Parse to parse a line of log
// Flush recomputes statisitcs and recompile output
Compiler interface {
Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, *redis.Conn, int, string, int, int, *sync.WaitGroup)
Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup)
Pull() error
Flush() error
Compile() error
@ -27,13 +27,8 @@ type (
r0 *redis.Conn
// Compiler redis Write
r1 *redis.Conn
// Input Read
r2 *redis.Conn
db int
// Dedicated queue
q string
// Time in minute before retrying
retryPeriod time.Duration
// Input Reader
reader io.Reader
// Number of line to process before triggering output
compilationTrigger int
// Current line processed
@ -51,14 +46,11 @@ type (
)
// Set set the redis connections to this compiler
func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, rconn2 *redis.Conn, db int, queue string, ct int, rt int, compilegr *sync.WaitGroup) {
func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, reader io.Reader, ct int, compilegr *sync.WaitGroup) {
s.r0 = rconn0
s.r1 = rconn1
s.r2 = rconn2
s.q = queue
s.db = db
s.reader = reader
s.compilationTrigger = ct
s.retryPeriod = time.Duration(rt) * time.Minute
s.compiling = false
s.compilegr = compilegr
}

View File

@ -37,8 +37,6 @@ type GrokedSSHD struct {
SshdInvalidUser string `json:"sshd_invalid_user"`
}
var m GrokedSSHD
// Flush recomputes statistics and recompile HTML output
// TODO : review after refacto
func (s *SSHDCompiler) Flush() error {
@ -103,75 +101,54 @@ func (s *SSHDCompiler) Flush() error {
// Pull pulls a line of groked sshd logline from redis
func (s *SSHDCompiler) Pull() error {
r1 := *s.r1
r2 := *s.r2
for {
jsoner := json.NewDecoder(s.reader)
// Reading from specified database on r2 - input
if _, err := r2.Do("SELECT", s.db); err != nil {
r2.Close()
for jsoner.More() {
var m GrokedSSHD
err := jsoner.Decode(&m)
if err != nil {
log.Println(err)
}
fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser)
// Assumes the system parses logs recorded during the current year
m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year())
// TODO Make this automatic or a config parameter
loc, _ := time.LoadLocation("Europe/Luxembourg")
parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc)
m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10))
// Pushing loglines in database 0
if _, err := r1.Do("SELECT", 0); err != nil {
r1.Close()
return err
}
grokedline, err := redis.Bytes(r2.Do("LPOP", s.q))
fmt.Printf("%s\n", grokedline)
// Writing logs
_, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP))
if err != nil {
r1.Close()
return err
}
if err == redis.ErrNil {
// redis queue empty, let's sleep for a while
time.Sleep(s.retryPeriod)
} else if err != nil {
log.Fatal(err)
} else {
err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname)
if err != nil {
r1.Close()
return err
}
if err != nil {
r1.Close()
r2.Close()
log.Fatal(err)
}
// Compile statistics
err = json.Unmarshal([]byte(grokedline), &m)
if err != nil {
log.Println(err)
}
fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser)
// Assumes the system parses logs recorded during the current year
m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year())
// TODO Make this automatic or a config parameter
loc, _ := time.LoadLocation("Europe/Luxembourg")
parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc)
m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10))
// Pushing loglines in database 0
if _, err := r1.Do("SELECT", 0); err != nil {
r1.Close()
return err
}
// Writing logs
_, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP))
if err != nil {
r1.Close()
return err
}
err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname)
if err != nil {
r1.Close()
return err
}
// Compiler html / jsons
s.nbLines++
if s.nbLines > s.compilationTrigger {
s.nbLines = 0
//Non-blocking
if !s.compiling {
go s.Compile()
}
// Compiler html / jsons
s.nbLines++
if s.nbLines > s.compilationTrigger {
s.nbLines = 0
//Non-blocking
if !s.compiling {
go s.Compile()
}
}
}
return nil
}
func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username string, host string) error {

33
main.go
View File

@ -1,7 +1,6 @@
package main
import (
"bufio"
"flag"
"fmt"
"log"
@ -12,6 +11,7 @@ import (
"sync"
"time"
"github.com/D4-project/analyzer-d4-log/inputreader"
"github.com/D4-project/analyzer-d4-log/logcompiler"
config "github.com/D4-project/d4-golang-utils/config"
"github.com/gomodule/redigo/redis"
@ -175,8 +175,9 @@ func main() {
log.Fatal("Could not connect to output line on Input Redis")
}
defer sshdrcon2.Close()
redisReader := inputreader.NewLPOPReader(&sshdrcon2, ri.redisDB, "sshd", *retry)
sshd := logcompiler.SSHDCompiler{}
sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, &sshdrcon2, ri.redisDB, "sshd", compilationTrigger, *retry, &compilegr)
sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, redisReader, compilationTrigger, &compilegr)
torun = append(torun, &sshd)
}
}
@ -201,24 +202,16 @@ func main() {
log.Fatalf("Error opening seed file: %v", err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
// logline := scanner.Text()
// for _, v := range torun {
// err := v.Pull(logline)
// if err != nil {
// log.Fatal(err)
// }
// }
// nblines++
// if nblines > compilationTrigger {
// nblines = 0
// Non-blocking
// if !compiling.compiling {
// go compile()
// }
// }
}
// scanner := bufio.NewScanner(f)
// for scanner.Scan() {
// logline := scanner.Bytes()
// for _, v := range torun {
// go v.Pull()
// if err != nil {
// log.Fatal(err)
// }
// }
// }
} else {
// Launching Pull routines
for _, v := range torun {