analyzer-d4-log/main.go

267 lines
6.3 KiB
Go
Raw Normal View History

package main
import (
"bufio"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
2020-01-28 15:20:09 +01:00
"sync"
"time"
"github.com/D4-project/analyzer-d4-log/logparser"
config "github.com/D4-project/d4-golang-utils/config"
"github.com/gomodule/redigo/redis"
)
type (
redisconfD4 struct {
redisHost string
redisPort string
redisDB int
redisQueue string
}
redisconfParsers struct {
redisHost string
redisPort string
redisDBCount int
}
conf struct {
httpHost string
httpPort string
}
2020-01-28 15:20:09 +01:00
comutex struct {
mu sync.Mutex
compiling bool
}
)
// Setting up flags
var (
2020-01-28 15:20:09 +01:00
confdir = flag.String("c", "conf.sample", "configuration directory")
all = flag.Bool("a", true, "run all parsers when set. Set by default")
specific = flag.String("o", "", "run only a specific parser [sshd]")
2020-01-30 17:31:47 +01:00
debug = flag.Bool("d", false, "debug info in logs")
fromfile = flag.String("f", "", "parse from file on disk")
retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue")
2020-01-28 15:20:09 +01:00
redisD4 redis.Conn
redisParsers *redis.Pool
parsers = [1]string{"sshd"}
compilationTrigger = 20
2020-01-28 15:20:09 +01:00
wg sync.WaitGroup
compiling comutex
2020-01-30 17:31:47 +01:00
torun = []logparser.Parser{}
)
func main() {
sortie := make(chan os.Signal, 1)
signal.Notify(sortie, os.Interrupt, os.Kill)
// Signal goroutine
go func() {
<-sortie
fmt.Println("Exiting.")
log.Println("Exit")
os.Exit(0)
}()
// Setting up log file
f, err := os.OpenFile("analyzer-d4-log.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("error opening file: %v", err)
}
defer f.Close()
log.SetOutput(f)
log.Println("Init")
// Usage and flags
flag.Usage = func() {
fmt.Printf("analyzer-d4-log:\n\n")
fmt.Printf(" Generate statistics about logs collected through d4 in\n")
fmt.Printf(" HTML format. Optionally serves the results over HTTP.\n")
fmt.Printf("\n")
flag.PrintDefaults()
fmt.Printf("\n")
fmt.Printf("The configuration directory should hold the following files\n")
fmt.Printf("to specify the settings to use:\n\n")
fmt.Printf(" mandatory: redis_d4 - host:port/db\n")
fmt.Printf(" mandatory: redis_queue - uuid\n")
fmt.Printf(" mandatory: redis_parsers - host:port/maxdb\n")
fmt.Printf(" optional: http_server - host:port\n\n")
fmt.Printf("See conf.sample for an example.\n")
}
// Config
// c := conf{}
rd4 := redisconfD4{}
rp := redisconfParsers{}
flag.Parse()
if flag.NFlag() == 0 || *confdir == "" {
flag.Usage()
os.Exit(1)
} else {
*confdir = strings.TrimSuffix(*confdir, "/")
*confdir = strings.TrimSuffix(*confdir, "\\")
}
2020-01-30 17:31:47 +01:00
// Debug log
if *debug {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}
// Parse Redis D4 Config
tmp := config.ReadConfigFile(*confdir, "redis_d4")
ss := strings.Split(string(tmp), "/")
if len(ss) <= 1 {
log.Fatal("Missing Database in Redis D4 config: should be host:port/database_name")
}
rd4.redisDB, _ = strconv.Atoi(ss[1])
var ret bool
ret, ss[0] = config.IsNet(ss[0])
if !ret {
sss := strings.Split(string(ss[0]), ":")
rd4.redisHost = sss[0]
rd4.redisPort = sss[1]
}
rd4.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue"))
// Connect to D4 Redis
// TODO use DialOptions to Dial with a timeout
redisD4, err = redis.Dial("tcp", rd4.redisHost+":"+rd4.redisPort, redis.DialDatabase(rd4.redisDB))
if err != nil {
log.Fatal(err)
}
defer redisD4.Close()
// Parse Redis Parsers Config
tmp = config.ReadConfigFile(*confdir, "redis_parsers")
ss = strings.Split(string(tmp), "/")
if len(ss) <= 1 {
log.Fatal("Missing Database Count in Redis config: should be host:port/max number of DB")
}
rp.redisDBCount, _ = strconv.Atoi(ss[1])
2020-01-27 16:07:09 +01:00
ret, ss[0] = config.IsNet(ss[0])
if !ret {
sss := strings.Split(string(ss[0]), ":")
rp.redisHost = sss[0]
rp.redisPort = sss[1]
}
// Create a connection Pool
redisParsers = newPool(rp.redisHost+":"+rp.redisPort, rp.redisDBCount)
2020-01-28 15:20:09 +01:00
// Line counter to trigger HTML compilation
nblines := 0
// Init parser depending on the parser flags:
if *all {
// Init all parsers
for _, v := range parsers {
switch v {
case "sshd":
2020-01-30 17:31:47 +01:00
sshdrcon1, err := redisParsers.Dial()
if err != nil {
2020-01-30 17:31:47 +01:00
log.Fatal("Could not connect to Line one Redis")
}
2020-01-30 17:31:47 +01:00
sshdrcon2, err := redisParsers.Dial()
2020-01-27 16:07:09 +01:00
if err != nil {
2020-02-03 08:38:18 +01:00
log.Fatal("Could not connect to Line two Redis")
2020-01-27 16:07:09 +01:00
}
sshd := logparser.SshdParser{}
2020-01-30 17:31:47 +01:00
sshd.Set(&sshdrcon1, &sshdrcon2)
torun = append(torun, &sshd)
}
}
} else if *specific != "" {
log.Println("TODO should run specific parser here")
}
// Parsing loop
if *fromfile != "" {
f, err = os.Open(*fromfile)
2020-01-30 17:31:47 +01:00
if err != nil {
log.Fatalf("Error opening seed file: %v", err)
2020-01-30 17:31:47 +01:00
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
logline := scanner.Text()
for _, v := range torun {
err := v.Parse(logline)
if err != nil {
log.Fatal(err)
}
}
nblines++
if nblines > compilationTrigger {
nblines = 0
// Non-blocking
if !compiling.compiling {
go compile()
}
}
2020-01-27 16:07:09 +01:00
}
} else {
// Pop D4 redis queue
for {
logline, err := redis.String(redisD4.Do("LPOP", "analyzer:3:"+rd4.redisQueue))
if err == redis.ErrNil {
// redis queue empty, let's sleep for a while
time.Sleep(time.Duration(*retry) * time.Minute)
} else if err != nil {
log.Fatal(err)
// let's parse
} else {
for _, v := range torun {
err := v.Parse(logline)
if err != nil {
log.Fatal(err)
}
}
nblines++
if nblines > compilationTrigger {
nblines = 0
// Non-blocking
if !compiling.compiling {
go compile()
}
}
2020-01-28 15:20:09 +01:00
}
}
2020-01-27 16:07:09 +01:00
}
2020-01-28 15:20:09 +01:00
wg.Wait()
log.Println("Exit")
}
2020-01-28 15:20:09 +01:00
func compile() {
compiling.mu.Lock()
compiling.compiling = true
wg.Add(1)
2020-01-30 17:31:47 +01:00
log.Println("Compiling")
for _, v := range torun {
err := v.Compile()
if err != nil {
log.Fatal(err)
}
}
log.Println("Done")
2020-01-28 15:20:09 +01:00
compiling.compiling = false
compiling.mu.Unlock()
wg.Done()
}
func newPool(addr string, maxconn int) *redis.Pool {
return &redis.Pool{
MaxActive: maxconn,
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}