add: [grok] moving to grokking support - logic refacto

nifi
Jean-Louis Huynen 2020-03-06 17:02:46 +01:00
parent b3b3649503
commit 547fdba5c8
No known key found for this signature in database
GPG Key ID: 64799157F4BD6B93
13 changed files with 251 additions and 300 deletions

View File

@ -0,0 +1 @@
localhost:6381/16

View File

@ -1 +0,0 @@
localhost:6380/2

1
conf.sample/redis_input Normal file
View File

@ -0,0 +1 @@
localhost:6385/3

View File

@ -1 +0,0 @@
localhost:6500/16

View File

@ -1 +0,0 @@
d42967c1-f7ad-464e-bbc7-4464c653d7a6

63
logcompiler/compiler.go Normal file
View File

@ -0,0 +1,63 @@
package logcompiler
import (
"sync"
"time"
"github.com/gomodule/redigo/redis"
)
type (
// Compiler provides the interface for a Compiler
// It should provide:
// Set to assign a redis connection to it
// Parse to parse a line of log
// Flush recomputes statisitcs and recompile output
Compiler interface {
Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, *redis.Conn, int, string, int, int)
Pull() error
Flush() error
Compile() error
}
// CompilerStruct will implements Compiler, and should be embedded in
// each type implementing compiler
CompilerStruct struct {
// Compiler redis Read
r0 *redis.Conn
// Compiler redis Write
r1 *redis.Conn
// Input Read
r2 *redis.Conn
db int
// Dedicated queue
q string
// Time in minute before retrying
retryPeriod time.Duration
// Number of line to process before triggering output
compilationTrigger int
// Current line processed
nbLines int
// Global WaitGroup to handle exiting
wg *sync.WaitGroup
// comutex embedding
comutex
}
comutex struct {
mu sync.Mutex
compiling bool
}
)
// Set set the redis connections to this compiler
func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, rconn2 *redis.Conn, db int, queue string, ct int, rt int) {
s.r0 = rconn0
s.r1 = rconn1
s.r2 = rconn2
s.q = queue
s.db = db
s.compilationTrigger = ct
s.retryPeriod = time.Duration(rt) * time.Minute
s.compiling = false
}

View File

@ -1,7 +1,7 @@
package logparser package logcompiler
import ( import (
"errors" "encoding/json"
"fmt" "fmt"
"html/template" "html/template"
"io/ioutil" "io/ioutil"
@ -9,7 +9,6 @@ import (
"math" "math"
"os" "os"
"path/filepath" "path/filepath"
"regexp"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -21,26 +20,30 @@ import (
"gonum.org/v1/plot/vg" "gonum.org/v1/plot/vg"
) )
// SshdParser Holds a struct that corresponds to a sshd log line // SSHDCompiler Holds a struct that corresponds to a sshd groked line
// and the redis connection // and the redis connections
type SshdParser struct { type SSHDCompiler struct {
// Write CompilerStruct
r1 *redis.Conn
// Read
r2 *redis.Conn
} }
// Set set the redic connection to this parser type groked struct {
func (s *SshdParser) Set(rconn1 *redis.Conn, rconn2 *redis.Conn) { SSHMessage string `json:"ssh_message"`
s.r1 = rconn1 SyslogPid string `json:"syslog_pid"`
s.r2 = rconn2 SyslogHostname string `json:"syslog_hostname"`
SyslogTimestamp string `json:"syslog_timestamp"`
SshdClientIP string `json:"sshd_client_ip"`
SyslogProgram string `json:"syslog_program"`
SshdInvalidUser string `json:"sshd_invalid_user"`
} }
var m groked
// Flush recomputes statistics and recompile HTML output // Flush recomputes statistics and recompile HTML output
func (s *SshdParser) Flush() error { // TODO : review after refacto
func (s *SSHDCompiler) Flush() error {
log.Println("Flushing") log.Println("Flushing")
r1 := *s.r1 r1 := *s.r1
r0 := *s.r2 r0 := *s.r0
// writing in database 1 // writing in database 1
if _, err := r1.Do("SELECT", 1); err != nil { if _, err := r1.Do("SELECT", 1); err != nil {
r0.Close() r0.Close()
@ -96,54 +99,82 @@ func (s *SshdParser) Flush() error {
return nil return nil
} }
// Parse parses a line of sshd log // Pull pulls a line of groked sshd logline from redis
func (s *SshdParser) Parse(logline string) error { func (s *SSHDCompiler) Pull() error {
r := *s.r1 r1 := *s.r1
re := regexp.MustCompile(`^(?P<date>[[:alpha:]]{3} {1,2}\d{1,2}\s\d{2}:\d{2}:\d{2}) (?P<host>[^ ]+) sshd\[[[:alnum:]]+\]: Invalid user (?P<username>.*) from (?P<src>.*$)`) r2 := *s.r2
n1 := re.SubexpNames()
res := re.FindAllStringSubmatch(logline, -1) for {
if res == nil {
return errors.New("[sshd]: no match") // Reading from specified database on r2 - input
if _, err := r2.Do("SELECT", s.db); err != nil {
r2.Close()
return err
}
grokedline, err := redis.Bytes(r2.Do("LPOP", s.q))
fmt.Printf("%s\n", grokedline)
if err == redis.ErrNil {
// redis queue empty, let's sleep for a while
time.Sleep(s.retryPeriod)
} else if err != nil {
log.Fatal(err)
} else {
if err != nil {
r1.Close()
r2.Close()
log.Fatal(err)
}
// Compile statistics
err = json.Unmarshal([]byte(grokedline), &m)
if err != nil {
log.Println(err)
}
fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser)
// Assumes the system parses logs recorded during the current year
m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year())
// TODO Make this automatic or a config parameter
loc, _ := time.LoadLocation("Europe/Luxembourg")
parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc)
m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10))
// Pushing loglines in database 0
if _, err := r1.Do("SELECT", 0); err != nil {
r1.Close()
return err
}
// Writing logs
_, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP))
if err != nil {
r1.Close()
return err
}
err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname)
if err != nil {
r1.Close()
return err
}
// Compiler html / jsons
s.nbLines++
if s.nbLines > s.compilationTrigger {
s.nbLines = 0
//Non-blocking
if !s.compiling {
//s.(*)wg.Add(1)
go s.Compile()
}
}
}
} }
r2 := res[0]
// Build the group map for the line
md := map[string]string{}
for i, n := range r2 {
// fmt.Printf("%d. match='%s'\tname='%s'\n", i, n, n1[i])
md[n1[i]] = n
}
// Assumes the system parses logs recorded during the current year
md["date"] = fmt.Sprintf("%v %v", md["date"], time.Now().Year())
// TODO Make this automatic or a config parameter
loc, _ := time.LoadLocation("Europe/Luxembourg")
parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", md["date"], loc)
md["date"] = string(strconv.FormatInt(parsedTime.Unix(), 10))
// Pushing loglines in database 0
if _, err := r.Do("SELECT", 0); err != nil {
r.Close()
return err
}
// Writing logs
_, err := redis.Bool(r.Do("HSET", fmt.Sprintf("%v:%v", md["date"], md["host"]), "username", md["username"], "src", md["src"]))
if err != nil {
r.Close()
return err
}
err = compileStats(s, parsedTime, md["src"], md["username"], md["host"])
if err != nil {
r.Close()
return err
}
return nil
} }
func compileStats(s *SshdParser, parsedTime time.Time, src string, username string, host string) error { func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username string, host string) error {
r := *s.r1 r := *s.r1
// Pushing statistics in database 1 // Pushing statistics in database 1
@ -214,7 +245,7 @@ func compileStats(s *SshdParser, parsedTime time.Time, src string, username stri
return nil return nil
} }
func compileStat(s *SshdParser, datestr string, mode string, src string, username string, host string) error { func compileStat(s *SSHDCompiler, datestr string, mode string, src string, username string, host string) error {
r := *s.r1 r := *s.r1
_, err := redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statssrc"), 1, src)) _, err := redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statssrc"), 1, src))
if err != nil { if err != nil {
@ -252,8 +283,8 @@ func compileStat(s *SshdParser, datestr string, mode string, src string, usernam
} }
// Compile create graphs of the results // Compile create graphs of the results
func (s *SshdParser) Compile() error { func (s *SSHDCompiler) Compile() error {
r := *s.r2 r := *s.r0
// Pulling statistics from database 1 // Pulling statistics from database 1
if _, err := r.Do("SELECT", 1); err != nil { if _, err := r.Do("SELECT", 1); err != nil {
@ -490,8 +521,8 @@ func (s *SshdParser) Compile() error {
return nil return nil
} }
func plotStats(s *SshdParser, v string) error { func plotStats(s *SSHDCompiler, v string) error {
r := *s.r2 r := *s.r0
zrank, err := redis.Strings(r.Do("ZRANGEBYSCORE", v, "-inf", "+inf", "WITHSCORES")) zrank, err := redis.Strings(r.Do("ZRANGEBYSCORE", v, "-inf", "+inf", "WITHSCORES"))
if err != nil { if err != nil {
r.Close() r.Close()
@ -576,5 +607,7 @@ func plotStats(s *SshdParser, v string) error {
return err return err
} }
// s.wg.Done()
return nil return nil
} }

View File

@ -1,17 +0,0 @@
package logparser
import "github.com/gomodule/redigo/redis"
type (
// Parser provides the interface for a Parser
// It should provide:
// Set to assign a redis connection to it
// Parse to parse a line of log
// Flush recomputes statisitcs and recompile output
Parser interface {
Set(*redis.Conn, *redis.Conn)
Parse(string) error
Flush() error
Compile() error
}
)

View File

@ -1,95 +0,0 @@
package logparser
import (
"bufio"
"fmt"
"log"
"os"
"regexp"
"testing"
)
var expected = map[int]map[string]string{
0: map[string]string{
"date": "Jan 22 11:59:37",
"host": "sigmund",
"username": "git",
"src": "106.12.14.144",
},
1: map[string]string{
"date": "Jan 22 11:37:19",
"host": "si.mund",
"username": "gestion",
"src": "159.89.153.54",
},
2: map[string]string{
"date": "Jan 22 11:34:46",
"host": "sigmund",
"username": "atpco",
"src": "177.152.124.21",
},
3: map[string]string{
"date": "Jan 22 11:33:07",
"host": "sigmund",
"username": "ki",
"src": "49.233.183.158",
},
4: map[string]string{
"date": "Jan 22 11:29:16",
"host": "sigmund",
"username": "a.min",
"src": "185.56.8.191",
},
5: map[string]string{
"date": "Jan 22 11:29:16",
"host": "sigmund",
"username": " ",
"src": "185.56.8.191",
},
6: map[string]string{
"date": "Jan 22 11:29:16",
"host": "sigmund",
"username": "",
"src": "185.56.8.191",
},
7: map[string]string{
"date": "Feb 3 06:50:51",
"host": "sigmund",
"username": "apples",
"src": "37.117.180.69",
},
}
func TestSshdParser(t *testing.T) {
// Opening sshd test file
fmt.Println("[+] Testing the sshd log parser")
f, err := os.Open("./test.log")
if err != nil {
log.Fatalf("Error opening test file: %v", err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
c := 0
for scanner.Scan() {
re := regexp.MustCompile(`^(?P<date>[[:alpha:]]{3} {1,2}\d{1,2}\s\d{2}:\d{2}:\d{2}) (?P<host>[^ ]+) sshd\[[[:alnum:]]+\]: Invalid user (?P<username>.*) from (?P<src>.*$)`)
n1 := re.SubexpNames()
r2 := re.FindAllStringSubmatch(scanner.Text(), -1)[0]
// Build the group map for the line
md := map[string]string{}
for i, n := range r2 {
// fmt.Printf("%d. match='%s'\tname='%s'\n", i, n, n1[i])
md[n1[i]] = n
}
// Check against the expected map
for _, n := range n1 {
if n != "" {
if md[n] != expected[c][n] {
t.Errorf("%v = '%v'; want '%v'", n, md[n], expected[c][n])
}
}
}
c++
}
}

View File

@ -1,8 +0,0 @@
Jan 22 11:59:37 sigmund sshd[26514]: Invalid user git from 106.12.14.144
Jan 22 11:37:19 si.mund sshd[26143]: Invalid user gestion from 159.89.153.54
Jan 22 11:34:46 sigmund sshd[26125]: Invalid user atpco from 177.152.124.21
Jan 22 11:33:07 sigmund sshd[26109]: Invalid user ki from 49.233.183.158
Jan 22 11:29:16 sigmund sshd[26091]: Invalid user a.min from 185.56.8.191
Jan 22 11:29:16 sigmund sshd[26091]: Invalid user from 185.56.8.191
Jan 22 11:29:16 sigmund sshd[26091]: Invalid user from 185.56.8.191
Feb 3 06:50:51 sigmund sshd[12611]: Invalid user apples from 37.117.180.69

198
main.go
View File

@ -12,17 +12,17 @@ import (
"sync" "sync"
"time" "time"
"github.com/D4-project/analyzer-d4-log/logparser" "github.com/D4-project/analyzer-d4-log/logcompiler"
config "github.com/D4-project/d4-golang-utils/config" config "github.com/D4-project/d4-golang-utils/config"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
) )
type ( type (
redisconfD4 struct { // Input is a grok - NIFI or Logstash
redisHost string redisconfInput struct {
redisPort string redisHost string
redisDB int redisPort string
redisQueue string redisDB int
} }
redisconfCompilers struct { redisconfCompilers struct {
redisHost string redisHost string
@ -33,28 +33,27 @@ type (
httpHost string httpHost string
httpPort string httpPort string
} }
comutex struct {
mu sync.Mutex
compiling bool
}
) )
// Setting up flags // Setting up flags
var ( var (
confdir = flag.String("c", "conf.sample", "configuration directory") // Flags
all = flag.Bool("a", true, "run all parsers when set. Set by default") confdir = flag.String("c", "conf.sample", "configuration directory")
specific = flag.String("o", "", "run only a specific parser [sshd]") all = flag.Bool("a", true, "run all compilers when set. Set by default")
debug = flag.Bool("d", false, "debug info in logs") specific = flag.String("o", "", "run only a specific parser [sshd]")
fromfile = flag.String("f", "", "parse from file on disk") debug = flag.Bool("d", false, "debug info in logs")
retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue") fromfile = flag.String("f", "", "parse from file on disk")
flush = flag.Bool("F", false, "Flush HTML output, recompile all statistic from redis logs, then quits") retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue")
redisD4 redis.Conn flush = flag.Bool("F", false, "Flush HTML output, recompile all statistic from redis logs, then quits")
redisCompilers *redis.Pool // Pools of redis connections
redisCompilers *redis.Pool
redisInput *redis.Pool
// Compilers
compilers = [1]string{"sshd"} compilers = [1]string{"sshd"}
compilationTrigger = 20 compilationTrigger = 20
wg sync.WaitGroup torun = []logcompiler.Compiler{}
compiling comutex // Routine handling
torun = []logparser.Parser{} wg sync.WaitGroup
) )
func main() { func main() {
@ -64,6 +63,8 @@ func main() {
go func() { go func() {
<-sortie <-sortie
fmt.Println("Exiting.") fmt.Println("Exiting.")
// TODO: handle the pulling routine
// wg.Wait()
log.Println("Exit") log.Println("Exit")
os.Exit(0) os.Exit(0)
}() }()
@ -89,14 +90,14 @@ func main() {
fmt.Printf("to specify the settings to use:\n\n") fmt.Printf("to specify the settings to use:\n\n")
fmt.Printf(" mandatory: redis_d4 - host:port/db\n") fmt.Printf(" mandatory: redis_d4 - host:port/db\n")
fmt.Printf(" mandatory: redis_queue - uuid\n") fmt.Printf(" mandatory: redis_queue - uuid\n")
fmt.Printf(" mandatory: redis_parsers - host:port/maxdb\n") fmt.Printf(" mandatory: redis_compilers - host:port/maxdb\n")
fmt.Printf(" optional: http_server - host:port\n\n") fmt.Printf(" optional: http_server - host:port\n\n")
fmt.Printf("See conf.sample for an example.\n") fmt.Printf("See conf.sample for an example.\n")
} }
// Config // Config
// c := conf{} // c := conf{}
rd4 := redisconfD4{} ri := redisconfInput{}
rp := redisconfCompilers{} rp := redisconfCompilers{}
flag.Parse() flag.Parse()
if flag.NFlag() == 0 || *confdir == "" { if flag.NFlag() == 0 || *confdir == "" {
@ -112,33 +113,24 @@ func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile) log.SetFlags(log.LstdFlags | log.Lshortfile)
} }
// Dont't touch D4 server if Flushing // Dont't touch input server if Flushing
if !*flush { if !*flush {
// Parse Redis D4 Config // Parse Input Redis Config
tmp := config.ReadConfigFile(*confdir, "redis_d4") tmp := config.ReadConfigFile(*confdir, "redis_input")
ss := strings.Split(string(tmp), "/") ss := strings.Split(string(tmp), "/")
if len(ss) <= 1 { if len(ss) <= 1 {
log.Fatal("Missing Database in Redis D4 config: should be host:port/database_name") log.Fatal("Missing Database in Redis input config: should be host:port/database_name")
} }
rd4.redisDB, _ = strconv.Atoi(ss[1]) ri.redisDB, _ = strconv.Atoi(ss[1])
var ret bool var ret bool
ret, ss[0] = config.IsNet(ss[0]) ret, ss[0] = config.IsNet(ss[0])
if ret { if ret {
sss := strings.Split(string(ss[0]), ":") sss := strings.Split(string(ss[0]), ":")
rd4.redisHost = sss[0] ri.redisHost = sss[0]
rd4.redisPort = sss[1] ri.redisPort = sss[1]
} else { } else {
log.Fatal("Redis config error.") log.Fatal("Redis config error.")
} }
rd4.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue"))
// Connect to D4 Redis
// TODO use DialOptions to Dial with a timeout
redisD4, err = redis.Dial("tcp", rd4.redisHost+":"+rd4.redisPort, redis.DialDatabase(rd4.redisDB))
if err != nil {
log.Fatal(err)
}
defer redisD4.Close()
} }
// Parse Redis Compilers Config // Parse Redis Compilers Config
@ -158,45 +150,48 @@ func main() {
log.Fatal("Redis config error.") log.Fatal("Redis config error.")
} }
// Create a connection Pool // Create a connection Pool for output Redis
redisCompilers = newPool(rp.redisHost+":"+rp.redisPort, rp.redisDBCount) redisCompilers = newPool(rp.redisHost+":"+rp.redisPort, rp.redisDBCount)
redisInput = newPool(ri.redisHost+":"+ri.redisPort, 16)
// Line counter to trigger HTML compilation // Init compiler depending on the compiler flags:
nblines := 0
// Init parser depending on the parser flags:
if *all { if *all {
// Init all parsers // Init all compilers
for _, v := range compilers { for _, v := range compilers {
switch v { switch v {
case "sshd": case "sshd":
sshdrcon0, err := redisCompilers.Dial()
if err != nil {
log.Fatal("Could not connect to input line on Compiler Redis")
}
sshdrcon1, err := redisCompilers.Dial() sshdrcon1, err := redisCompilers.Dial()
if err != nil { if err != nil {
log.Fatal("Could not connect to Line one Redis") log.Fatal("Could not connect to output line on Compiler Redis")
} }
sshdrcon2, err := redisCompilers.Dial() sshdrcon2, err := redisInput.Dial()
if err != nil { if err != nil {
log.Fatal("Could not connect to Line two Redis") log.Fatal("Could not connect to output line on Input Redis")
} }
sshd := logparser.SshdParser{} sshd := logcompiler.SSHDCompiler{}
sshd.Set(&sshdrcon1, &sshdrcon2) sshd.Set(&wg, &sshdrcon0, &sshdrcon1, &sshdrcon2, ri.redisDB, "sshd", compilationTrigger, *retry)
torun = append(torun, &sshd) torun = append(torun, &sshd)
} }
} }
} else if *specific != "" { } else if *specific != "" {
log.Println("TODO should run specific parser here") log.Println("TODO should run specific compiler here")
} }
// If we flush, we bypass the parsing loop // If we flush, we bypass the compiling loop
if *flush { if *flush {
for _, v := range torun { for _, v := range torun {
err := v.Flush() err := v.Flush()
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
compile() // TODO
// compile()
} }
// Parsing loop // TODO update that -- deprecated
} else if *fromfile != "" { } else if *fromfile != "" {
f, err = os.Open(*fromfile) f, err = os.Open(*fromfile)
if err != nil { if err != nil {
@ -205,48 +200,27 @@ func main() {
defer f.Close() defer f.Close()
scanner := bufio.NewScanner(f) scanner := bufio.NewScanner(f)
for scanner.Scan() { for scanner.Scan() {
logline := scanner.Text() // logline := scanner.Text()
for _, v := range torun { // for _, v := range torun {
err := v.Parse(logline) // err := v.Pull(logline)
if err != nil { // if err != nil {
log.Fatal(err) // log.Fatal(err)
} // }
} // }
nblines++ // nblines++
if nblines > compilationTrigger { // if nblines > compilationTrigger {
nblines = 0 // nblines = 0
// Non-blocking // Non-blocking
if !compiling.compiling { // if !compiling.compiling {
go compile() // go compile()
} // }
} // }
} }
} else { } else {
// Pop D4 redis queue // Launching Pull routines
for { for _, v := range torun {
logline, err := redis.String(redisD4.Do("LPOP", "analyzer:3:"+rd4.redisQueue)) wg.Add(1)
if err == redis.ErrNil { go v.Pull()
// redis queue empty, let's sleep for a while
time.Sleep(time.Duration(*retry) * time.Minute)
} else if err != nil {
log.Fatal(err)
// let's parse
} else {
for _, v := range torun {
err := v.Parse(logline)
if err != nil {
log.Println(err)
}
}
nblines++
if nblines > compilationTrigger {
nblines = 0
// Non-blocking
if !compiling.compiling {
go compile()
}
}
}
} }
} }
@ -254,25 +228,27 @@ func main() {
log.Println("Exit") log.Println("Exit")
} }
func compile() { // TODO: move into compilers
compiling.mu.Lock()
compiling.compiling = true
wg.Add(1)
log.Println("Compiling") // func compile() {
// compiling.mu.Lock()
// compiling.compiling = true
// wg.Add(1)
for _, v := range torun { // log.Println("Compiling")
err := v.Compile()
if err != nil {
log.Fatal(err)
}
}
log.Println("Done") // for _, v := range torun {
compiling.compiling = false // err := v.Compile()
compiling.mu.Unlock() // if err != nil {
wg.Done() // log.Fatal(err)
} // }
// }
// log.Println("Done")
// compiling.compiling = false
// compiling.mu.Unlock()
// wg.Done()
// }
func newPool(addr string, maxconn int) *redis.Pool { func newPool(addr string, maxconn int) *redis.Pool {
return &redis.Pool{ return &redis.Pool{