analyzer-d4-log/main.go

247 lines
5.7 KiB
Go

package main
import (
"errors"
"flag"
"fmt"
"log"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"time"
"github.com/D4-project/analyzer-d4-log/logparser"
config "github.com/D4-project/d4-golang-utils/config"
"github.com/gomodule/redigo/redis"
)
type (
redisconfD4 struct {
redisHost string
redisPort string
redisDB int
redisQueue string
}
redisconfParsers struct {
redisHost string
redisPort string
redisDBCount int
}
conf struct {
httpHost string
httpPort string
}
comutex struct {
mu sync.Mutex
compiling bool
}
)
// Setting up flags
var (
confdir = flag.String("c", "conf.sample", "configuration directory")
all = flag.Bool("a", true, "run all parsers when set. Set by default")
specific = flag.String("o", "", "run only a specific parser [sshd]")
debug = flag.Bool("d", false, "debug info in logs")
redisD4 redis.Conn
redisParsers *redis.Pool
parsers = [1]string{"sshd"}
compilationTrigger = 200
wg sync.WaitGroup
compiling comutex
torun = []logparser.Parser{}
)
func main() {
sortie := make(chan os.Signal, 1)
signal.Notify(sortie, os.Interrupt, os.Kill)
// Signal goroutine
go func() {
<-sortie
fmt.Println("Exiting.")
log.Println("Exit")
os.Exit(0)
}()
// Setting up log file
f, err := os.OpenFile("analyzer-d4-log.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("error opening file: %v", err)
}
defer f.Close()
log.SetOutput(f)
log.Println("Init")
// Usage and flags
flag.Usage = func() {
fmt.Printf("analyzer-d4-log:\n\n")
fmt.Printf(" Generate statistics about logs collected through d4 in\n")
fmt.Printf(" HTML format. Optionally serves the results over HTTP.\n")
fmt.Printf("\n")
flag.PrintDefaults()
fmt.Printf("\n")
fmt.Printf("The configuration directory should hold the following files\n")
fmt.Printf("to specify the settings to use:\n\n")
fmt.Printf(" mandatory: redis_d4 - host:port/db\n")
fmt.Printf(" mandatory: redis_queue - uuid\n")
fmt.Printf(" mandatory: redis_parsers - host:port/maxdb\n")
fmt.Printf(" optional: http_server - host:port\n\n")
fmt.Printf("See conf.sample for an example.\n")
}
// Config
// c := conf{}
rd4 := redisconfD4{}
rp := redisconfParsers{}
flag.Parse()
if flag.NFlag() == 0 || *confdir == "" {
flag.Usage()
os.Exit(1)
} else {
*confdir = strings.TrimSuffix(*confdir, "/")
*confdir = strings.TrimSuffix(*confdir, "\\")
}
// Debug log
if *debug {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}
// Parse Redis D4 Config
tmp := config.ReadConfigFile(*confdir, "redis_d4")
ss := strings.Split(string(tmp), "/")
if len(ss) <= 1 {
log.Fatal("Missing Database in Redis D4 config: should be host:port/database_name")
}
rd4.redisDB, _ = strconv.Atoi(ss[1])
var ret bool
ret, ss[0] = config.IsNet(ss[0])
if !ret {
sss := strings.Split(string(ss[0]), ":")
rd4.redisHost = sss[0]
rd4.redisPort = sss[1]
}
rd4.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue"))
// Connect to D4 Redis
// TODO use DialOptions to Dial with a timeout
redisD4, err = redis.Dial("tcp", rd4.redisHost+":"+rd4.redisPort, redis.DialDatabase(rd4.redisDB))
if err != nil {
log.Fatal(err)
}
defer redisD4.Close()
// Parse Redis Parsers Config
tmp = config.ReadConfigFile(*confdir, "redis_parsers")
ss = strings.Split(string(tmp), "/")
if len(ss) <= 1 {
log.Fatal("Missing Database Count in Redis config: should be host:port/max number of DB")
}
rp.redisDBCount, _ = strconv.Atoi(ss[1])
ret, ss[0] = config.IsNet(ss[0])
if !ret {
sss := strings.Split(string(ss[0]), ":")
rp.redisHost = sss[0]
rp.redisPort = sss[1]
}
// Create a connection Pool
redisParsers = newPool(rp.redisHost+":"+rp.redisPort, rp.redisDBCount)
// Line counter to trigger HTML compilation
nblines := 0
// Init parser depending on the parser flags:
if *all {
// Init all parsers
for _, v := range parsers {
switch v {
case "sshd":
sshdrcon1, err := redisParsers.Dial()
if err != nil {
log.Fatal("Could not connect to Line one Redis")
}
sshdrcon2, err := redisParsers.Dial()
if err != nil {
log.Fatal("Could not connect to Line one Redis")
}
sshd := logparser.SshdParser{}
sshd.Set(&sshdrcon1, &sshdrcon2)
torun = append(torun, &sshd)
}
}
} else if *specific != "" {
log.Println("TODO should run specific parser here")
}
f, err = os.Open("./test_seed.log")
if err != nil {
log.Fatalf("Error opening test file: %v", err)
}
defer f.Close()
// scanner := bufio.NewScanner(f)
// for scanner.Scan() {
// Pop D4 redis queue
for {
err := errors.New("")
logline, err := redis.String(redisD4.Do("LPOP", "analyzer:3:"+rd4.redisQueue))
// logline := scanner.Text()
if err != nil {
log.Fatal(err)
}
fmt.Println(logline)
// Run the parsers
for _, v := range torun {
err := v.Parse(logline)
if err != nil {
log.Fatal(err)
}
}
nblines++
if nblines > compilationTrigger {
nblines = 0
// Non-blocking
if !compiling.compiling {
go compile()
}
}
}
wg.Wait()
log.Println("Exit")
}
func compile() {
compiling.mu.Lock()
compiling.compiling = true
wg.Add(1)
log.Println("Compiling")
for _, v := range torun {
err := v.Compile()
if err != nil {
log.Fatal(err)
}
}
log.Println("Done")
compiling.compiling = false
compiling.mu.Unlock()
wg.Done()
}
func newPool(addr string, maxconn int) *redis.Pool {
return &redis.Pool{
MaxActive: maxconn,
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}