diff --git a/conf.sample/redis_compilers b/conf.sample/redis_compilers new file mode 100644 index 0000000..1e8d1ba --- /dev/null +++ b/conf.sample/redis_compilers @@ -0,0 +1 @@ +localhost:6381/16 diff --git a/conf.sample/redis_d4 b/conf.sample/redis_d4 deleted file mode 100644 index 5d6f103..0000000 --- a/conf.sample/redis_d4 +++ /dev/null @@ -1 +0,0 @@ -localhost:6380/2 diff --git a/conf.sample/redis_input b/conf.sample/redis_input new file mode 100644 index 0000000..e740532 --- /dev/null +++ b/conf.sample/redis_input @@ -0,0 +1 @@ +localhost:6385/3 diff --git a/conf.sample/redis_parsers b/conf.sample/redis_parsers deleted file mode 100644 index 2c849f7..0000000 --- a/conf.sample/redis_parsers +++ /dev/null @@ -1 +0,0 @@ -localhost:6500/16 diff --git a/conf.sample/redis_queue b/conf.sample/redis_queue deleted file mode 100644 index 046b78d..0000000 --- a/conf.sample/redis_queue +++ /dev/null @@ -1 +0,0 @@ -d42967c1-f7ad-464e-bbc7-4464c653d7a6 \ No newline at end of file diff --git a/logcompiler/compiler.go b/logcompiler/compiler.go new file mode 100644 index 0000000..7774eba --- /dev/null +++ b/logcompiler/compiler.go @@ -0,0 +1,63 @@ +package logcompiler + +import ( + "sync" + "time" + + "github.com/gomodule/redigo/redis" +) + +type ( + // Compiler provides the interface for a Compiler + // It should provide: + // Set to assign a redis connection to it + // Parse to parse a line of log + // Flush recomputes statisitcs and recompile output + Compiler interface { + Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, *redis.Conn, int, string, int, int) + Pull() error + Flush() error + Compile() error + } + + // CompilerStruct will implements Compiler, and should be embedded in + // each type implementing compiler + CompilerStruct struct { + // Compiler redis Read + r0 *redis.Conn + // Compiler redis Write + r1 *redis.Conn + // Input Read + r2 *redis.Conn + db int + // Dedicated queue + q string + // Time in minute before retrying + retryPeriod time.Duration + // Number of line to process before triggering output + compilationTrigger int + // Current line processed + nbLines int + // Global WaitGroup to handle exiting + wg *sync.WaitGroup + // comutex embedding + comutex + } + + comutex struct { + mu sync.Mutex + compiling bool + } +) + +// Set set the redis connections to this compiler +func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, rconn2 *redis.Conn, db int, queue string, ct int, rt int) { + s.r0 = rconn0 + s.r1 = rconn1 + s.r2 = rconn2 + s.q = queue + s.db = db + s.compilationTrigger = ct + s.retryPeriod = time.Duration(rt) * time.Minute + s.compiling = false +} diff --git a/logparser/sshd.go b/logcompiler/sshd.go similarity index 79% rename from logparser/sshd.go rename to logcompiler/sshd.go index f766c31..e52e37c 100644 --- a/logparser/sshd.go +++ b/logcompiler/sshd.go @@ -1,7 +1,7 @@ -package logparser +package logcompiler import ( - "errors" + "encoding/json" "fmt" "html/template" "io/ioutil" @@ -9,7 +9,6 @@ import ( "math" "os" "path/filepath" - "regexp" "strconv" "strings" "time" @@ -21,26 +20,30 @@ import ( "gonum.org/v1/plot/vg" ) -// SshdParser Holds a struct that corresponds to a sshd log line -// and the redis connection -type SshdParser struct { - // Write - r1 *redis.Conn - // Read - r2 *redis.Conn +// SSHDCompiler Holds a struct that corresponds to a sshd groked line +// and the redis connections +type SSHDCompiler struct { + CompilerStruct } -// Set set the redic connection to this parser -func (s *SshdParser) Set(rconn1 *redis.Conn, rconn2 *redis.Conn) { - s.r1 = rconn1 - s.r2 = rconn2 +type groked struct { + SSHMessage string `json:"ssh_message"` + SyslogPid string `json:"syslog_pid"` + SyslogHostname string `json:"syslog_hostname"` + SyslogTimestamp string `json:"syslog_timestamp"` + SshdClientIP string `json:"sshd_client_ip"` + SyslogProgram string `json:"syslog_program"` + SshdInvalidUser string `json:"sshd_invalid_user"` } +var m groked + // Flush recomputes statistics and recompile HTML output -func (s *SshdParser) Flush() error { +// TODO : review after refacto +func (s *SSHDCompiler) Flush() error { log.Println("Flushing") r1 := *s.r1 - r0 := *s.r2 + r0 := *s.r0 // writing in database 1 if _, err := r1.Do("SELECT", 1); err != nil { r0.Close() @@ -96,54 +99,82 @@ func (s *SshdParser) Flush() error { return nil } -// Parse parses a line of sshd log -func (s *SshdParser) Parse(logline string) error { - r := *s.r1 - re := regexp.MustCompile(`^(?P[[:alpha:]]{3} {1,2}\d{1,2}\s\d{2}:\d{2}:\d{2}) (?P[^ ]+) sshd\[[[:alnum:]]+\]: Invalid user (?P.*) from (?P.*$)`) - n1 := re.SubexpNames() - res := re.FindAllStringSubmatch(logline, -1) - if res == nil { - return errors.New("[sshd]: no match") +// Pull pulls a line of groked sshd logline from redis +func (s *SSHDCompiler) Pull() error { + r1 := *s.r1 + r2 := *s.r2 + + for { + + // Reading from specified database on r2 - input + if _, err := r2.Do("SELECT", s.db); err != nil { + r2.Close() + return err + } + + grokedline, err := redis.Bytes(r2.Do("LPOP", s.q)) + fmt.Printf("%s\n", grokedline) + + if err == redis.ErrNil { + // redis queue empty, let's sleep for a while + time.Sleep(s.retryPeriod) + } else if err != nil { + log.Fatal(err) + } else { + + if err != nil { + r1.Close() + r2.Close() + log.Fatal(err) + } + // Compile statistics + err = json.Unmarshal([]byte(grokedline), &m) + if err != nil { + log.Println(err) + } + fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser) + + // Assumes the system parses logs recorded during the current year + m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year()) + // TODO Make this automatic or a config parameter + loc, _ := time.LoadLocation("Europe/Luxembourg") + parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc) + m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10)) + + // Pushing loglines in database 0 + if _, err := r1.Do("SELECT", 0); err != nil { + r1.Close() + return err + } + + // Writing logs + _, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP)) + if err != nil { + r1.Close() + return err + } + + err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname) + if err != nil { + r1.Close() + return err + } + + // Compiler html / jsons + s.nbLines++ + if s.nbLines > s.compilationTrigger { + s.nbLines = 0 + //Non-blocking + if !s.compiling { + //s.(*)wg.Add(1) + go s.Compile() + } + } + } } - r2 := res[0] - - // Build the group map for the line - md := map[string]string{} - for i, n := range r2 { - // fmt.Printf("%d. match='%s'\tname='%s'\n", i, n, n1[i]) - md[n1[i]] = n - } - - // Assumes the system parses logs recorded during the current year - md["date"] = fmt.Sprintf("%v %v", md["date"], time.Now().Year()) - // TODO Make this automatic or a config parameter - loc, _ := time.LoadLocation("Europe/Luxembourg") - parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", md["date"], loc) - md["date"] = string(strconv.FormatInt(parsedTime.Unix(), 10)) - - // Pushing loglines in database 0 - if _, err := r.Do("SELECT", 0); err != nil { - r.Close() - return err - } - - // Writing logs - _, err := redis.Bool(r.Do("HSET", fmt.Sprintf("%v:%v", md["date"], md["host"]), "username", md["username"], "src", md["src"])) - if err != nil { - r.Close() - return err - } - - err = compileStats(s, parsedTime, md["src"], md["username"], md["host"]) - if err != nil { - r.Close() - return err - } - - return nil } -func compileStats(s *SshdParser, parsedTime time.Time, src string, username string, host string) error { +func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username string, host string) error { r := *s.r1 // Pushing statistics in database 1 @@ -214,7 +245,7 @@ func compileStats(s *SshdParser, parsedTime time.Time, src string, username stri return nil } -func compileStat(s *SshdParser, datestr string, mode string, src string, username string, host string) error { +func compileStat(s *SSHDCompiler, datestr string, mode string, src string, username string, host string) error { r := *s.r1 _, err := redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statssrc"), 1, src)) if err != nil { @@ -252,8 +283,8 @@ func compileStat(s *SshdParser, datestr string, mode string, src string, usernam } // Compile create graphs of the results -func (s *SshdParser) Compile() error { - r := *s.r2 +func (s *SSHDCompiler) Compile() error { + r := *s.r0 // Pulling statistics from database 1 if _, err := r.Do("SELECT", 1); err != nil { @@ -490,8 +521,8 @@ func (s *SshdParser) Compile() error { return nil } -func plotStats(s *SshdParser, v string) error { - r := *s.r2 +func plotStats(s *SSHDCompiler, v string) error { + r := *s.r0 zrank, err := redis.Strings(r.Do("ZRANGEBYSCORE", v, "-inf", "+inf", "WITHSCORES")) if err != nil { r.Close() @@ -576,5 +607,7 @@ func plotStats(s *SshdParser, v string) error { return err } + // s.wg.Done() + return nil } diff --git a/logparser/sshd/load.js b/logcompiler/sshd/load.js similarity index 100% rename from logparser/sshd/load.js rename to logcompiler/sshd/load.js diff --git a/logparser/sshd/statistics.gohtml b/logcompiler/sshd/statistics.gohtml similarity index 100% rename from logparser/sshd/statistics.gohtml rename to logcompiler/sshd/statistics.gohtml diff --git a/logparser/parser.go b/logparser/parser.go deleted file mode 100644 index 0803228..0000000 --- a/logparser/parser.go +++ /dev/null @@ -1,17 +0,0 @@ -package logparser - -import "github.com/gomodule/redigo/redis" - -type ( - // Parser provides the interface for a Parser - // It should provide: - // Set to assign a redis connection to it - // Parse to parse a line of log - // Flush recomputes statisitcs and recompile output - Parser interface { - Set(*redis.Conn, *redis.Conn) - Parse(string) error - Flush() error - Compile() error - } -) diff --git a/logparser/parser_test.go b/logparser/parser_test.go deleted file mode 100644 index 0288bfe..0000000 --- a/logparser/parser_test.go +++ /dev/null @@ -1,95 +0,0 @@ -package logparser - -import ( - "bufio" - "fmt" - "log" - "os" - "regexp" - "testing" -) - -var expected = map[int]map[string]string{ - 0: map[string]string{ - "date": "Jan 22 11:59:37", - "host": "sigmund", - "username": "git", - "src": "106.12.14.144", - }, - 1: map[string]string{ - "date": "Jan 22 11:37:19", - "host": "si.mund", - "username": "gestion", - "src": "159.89.153.54", - }, - 2: map[string]string{ - "date": "Jan 22 11:34:46", - "host": "sigmund", - "username": "atpco", - "src": "177.152.124.21", - }, - 3: map[string]string{ - "date": "Jan 22 11:33:07", - "host": "sigmund", - "username": "ki", - "src": "49.233.183.158", - }, - 4: map[string]string{ - "date": "Jan 22 11:29:16", - "host": "sigmund", - "username": "a.min", - "src": "185.56.8.191", - }, - 5: map[string]string{ - "date": "Jan 22 11:29:16", - "host": "sigmund", - "username": " ", - "src": "185.56.8.191", - }, - 6: map[string]string{ - "date": "Jan 22 11:29:16", - "host": "sigmund", - "username": "", - "src": "185.56.8.191", - }, - 7: map[string]string{ - "date": "Feb 3 06:50:51", - "host": "sigmund", - "username": "apples", - "src": "37.117.180.69", - }, -} - -func TestSshdParser(t *testing.T) { - // Opening sshd test file - fmt.Println("[+] Testing the sshd log parser") - f, err := os.Open("./test.log") - if err != nil { - log.Fatalf("Error opening test file: %v", err) - } - defer f.Close() - scanner := bufio.NewScanner(f) - c := 0 - for scanner.Scan() { - re := regexp.MustCompile(`^(?P[[:alpha:]]{3} {1,2}\d{1,2}\s\d{2}:\d{2}:\d{2}) (?P[^ ]+) sshd\[[[:alnum:]]+\]: Invalid user (?P.*) from (?P.*$)`) - n1 := re.SubexpNames() - r2 := re.FindAllStringSubmatch(scanner.Text(), -1)[0] - - // Build the group map for the line - md := map[string]string{} - for i, n := range r2 { - // fmt.Printf("%d. match='%s'\tname='%s'\n", i, n, n1[i]) - md[n1[i]] = n - } - - // Check against the expected map - for _, n := range n1 { - if n != "" { - if md[n] != expected[c][n] { - t.Errorf("%v = '%v'; want '%v'", n, md[n], expected[c][n]) - } - } - } - c++ - } -} diff --git a/logparser/test.log b/logparser/test.log deleted file mode 100644 index 028f24b..0000000 --- a/logparser/test.log +++ /dev/null @@ -1,8 +0,0 @@ -Jan 22 11:59:37 sigmund sshd[26514]: Invalid user git from 106.12.14.144 -Jan 22 11:37:19 si.mund sshd[26143]: Invalid user gestion from 159.89.153.54 -Jan 22 11:34:46 sigmund sshd[26125]: Invalid user atpco from 177.152.124.21 -Jan 22 11:33:07 sigmund sshd[26109]: Invalid user ki from 49.233.183.158 -Jan 22 11:29:16 sigmund sshd[26091]: Invalid user a.min from 185.56.8.191 -Jan 22 11:29:16 sigmund sshd[26091]: Invalid user from 185.56.8.191 -Jan 22 11:29:16 sigmund sshd[26091]: Invalid user from 185.56.8.191 -Feb 3 06:50:51 sigmund sshd[12611]: Invalid user apples from 37.117.180.69 \ No newline at end of file diff --git a/main.go b/main.go index 23809b5..020eb68 100644 --- a/main.go +++ b/main.go @@ -12,17 +12,17 @@ import ( "sync" "time" - "github.com/D4-project/analyzer-d4-log/logparser" + "github.com/D4-project/analyzer-d4-log/logcompiler" config "github.com/D4-project/d4-golang-utils/config" "github.com/gomodule/redigo/redis" ) type ( - redisconfD4 struct { - redisHost string - redisPort string - redisDB int - redisQueue string + // Input is a grok - NIFI or Logstash + redisconfInput struct { + redisHost string + redisPort string + redisDB int } redisconfCompilers struct { redisHost string @@ -33,28 +33,27 @@ type ( httpHost string httpPort string } - comutex struct { - mu sync.Mutex - compiling bool - } ) // Setting up flags var ( - confdir = flag.String("c", "conf.sample", "configuration directory") - all = flag.Bool("a", true, "run all parsers when set. Set by default") - specific = flag.String("o", "", "run only a specific parser [sshd]") - debug = flag.Bool("d", false, "debug info in logs") - fromfile = flag.String("f", "", "parse from file on disk") - retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue") - flush = flag.Bool("F", false, "Flush HTML output, recompile all statistic from redis logs, then quits") - redisD4 redis.Conn - redisCompilers *redis.Pool + // Flags + confdir = flag.String("c", "conf.sample", "configuration directory") + all = flag.Bool("a", true, "run all compilers when set. Set by default") + specific = flag.String("o", "", "run only a specific parser [sshd]") + debug = flag.Bool("d", false, "debug info in logs") + fromfile = flag.String("f", "", "parse from file on disk") + retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue") + flush = flag.Bool("F", false, "Flush HTML output, recompile all statistic from redis logs, then quits") + // Pools of redis connections + redisCompilers *redis.Pool + redisInput *redis.Pool + // Compilers compilers = [1]string{"sshd"} compilationTrigger = 20 - wg sync.WaitGroup - compiling comutex - torun = []logparser.Parser{} + torun = []logcompiler.Compiler{} + // Routine handling + wg sync.WaitGroup ) func main() { @@ -64,6 +63,8 @@ func main() { go func() { <-sortie fmt.Println("Exiting.") + // TODO: handle the pulling routine + // wg.Wait() log.Println("Exit") os.Exit(0) }() @@ -89,14 +90,14 @@ func main() { fmt.Printf("to specify the settings to use:\n\n") fmt.Printf(" mandatory: redis_d4 - host:port/db\n") fmt.Printf(" mandatory: redis_queue - uuid\n") - fmt.Printf(" mandatory: redis_parsers - host:port/maxdb\n") + fmt.Printf(" mandatory: redis_compilers - host:port/maxdb\n") fmt.Printf(" optional: http_server - host:port\n\n") fmt.Printf("See conf.sample for an example.\n") } // Config // c := conf{} - rd4 := redisconfD4{} + ri := redisconfInput{} rp := redisconfCompilers{} flag.Parse() if flag.NFlag() == 0 || *confdir == "" { @@ -112,33 +113,24 @@ func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) } - // Dont't touch D4 server if Flushing + // Dont't touch input server if Flushing if !*flush { - // Parse Redis D4 Config - tmp := config.ReadConfigFile(*confdir, "redis_d4") + // Parse Input Redis Config + tmp := config.ReadConfigFile(*confdir, "redis_input") ss := strings.Split(string(tmp), "/") if len(ss) <= 1 { - log.Fatal("Missing Database in Redis D4 config: should be host:port/database_name") + log.Fatal("Missing Database in Redis input config: should be host:port/database_name") } - rd4.redisDB, _ = strconv.Atoi(ss[1]) + ri.redisDB, _ = strconv.Atoi(ss[1]) var ret bool ret, ss[0] = config.IsNet(ss[0]) if ret { sss := strings.Split(string(ss[0]), ":") - rd4.redisHost = sss[0] - rd4.redisPort = sss[1] + ri.redisHost = sss[0] + ri.redisPort = sss[1] } else { log.Fatal("Redis config error.") } - - rd4.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue")) - // Connect to D4 Redis - // TODO use DialOptions to Dial with a timeout - redisD4, err = redis.Dial("tcp", rd4.redisHost+":"+rd4.redisPort, redis.DialDatabase(rd4.redisDB)) - if err != nil { - log.Fatal(err) - } - defer redisD4.Close() } // Parse Redis Compilers Config @@ -158,45 +150,48 @@ func main() { log.Fatal("Redis config error.") } - // Create a connection Pool + // Create a connection Pool for output Redis redisCompilers = newPool(rp.redisHost+":"+rp.redisPort, rp.redisDBCount) + redisInput = newPool(ri.redisHost+":"+ri.redisPort, 16) - // Line counter to trigger HTML compilation - nblines := 0 - - // Init parser depending on the parser flags: + // Init compiler depending on the compiler flags: if *all { - // Init all parsers + // Init all compilers for _, v := range compilers { switch v { case "sshd": + sshdrcon0, err := redisCompilers.Dial() + if err != nil { + log.Fatal("Could not connect to input line on Compiler Redis") + } sshdrcon1, err := redisCompilers.Dial() if err != nil { - log.Fatal("Could not connect to Line one Redis") + log.Fatal("Could not connect to output line on Compiler Redis") } - sshdrcon2, err := redisCompilers.Dial() + sshdrcon2, err := redisInput.Dial() if err != nil { - log.Fatal("Could not connect to Line two Redis") + log.Fatal("Could not connect to output line on Input Redis") } - sshd := logparser.SshdParser{} - sshd.Set(&sshdrcon1, &sshdrcon2) + sshd := logcompiler.SSHDCompiler{} + sshd.Set(&wg, &sshdrcon0, &sshdrcon1, &sshdrcon2, ri.redisDB, "sshd", compilationTrigger, *retry) torun = append(torun, &sshd) } } } else if *specific != "" { - log.Println("TODO should run specific parser here") + log.Println("TODO should run specific compiler here") } - // If we flush, we bypass the parsing loop + // If we flush, we bypass the compiling loop if *flush { for _, v := range torun { err := v.Flush() if err != nil { log.Fatal(err) } - compile() + // TODO + // compile() } - // Parsing loop + // TODO update that -- deprecated } else if *fromfile != "" { f, err = os.Open(*fromfile) if err != nil { @@ -205,48 +200,27 @@ func main() { defer f.Close() scanner := bufio.NewScanner(f) for scanner.Scan() { - logline := scanner.Text() - for _, v := range torun { - err := v.Parse(logline) - if err != nil { - log.Fatal(err) - } - } - nblines++ - if nblines > compilationTrigger { - nblines = 0 - // Non-blocking - if !compiling.compiling { - go compile() - } - } + // logline := scanner.Text() + // for _, v := range torun { + // err := v.Pull(logline) + // if err != nil { + // log.Fatal(err) + // } + // } + // nblines++ + // if nblines > compilationTrigger { + // nblines = 0 + // Non-blocking + // if !compiling.compiling { + // go compile() + // } + // } } } else { - // Pop D4 redis queue - for { - logline, err := redis.String(redisD4.Do("LPOP", "analyzer:3:"+rd4.redisQueue)) - if err == redis.ErrNil { - // redis queue empty, let's sleep for a while - time.Sleep(time.Duration(*retry) * time.Minute) - } else if err != nil { - log.Fatal(err) - // let's parse - } else { - for _, v := range torun { - err := v.Parse(logline) - if err != nil { - log.Println(err) - } - } - nblines++ - if nblines > compilationTrigger { - nblines = 0 - // Non-blocking - if !compiling.compiling { - go compile() - } - } - } + // Launching Pull routines + for _, v := range torun { + wg.Add(1) + go v.Pull() } } @@ -254,25 +228,27 @@ func main() { log.Println("Exit") } -func compile() { - compiling.mu.Lock() - compiling.compiling = true - wg.Add(1) +// TODO: move into compilers - log.Println("Compiling") +// func compile() { +// compiling.mu.Lock() +// compiling.compiling = true +// wg.Add(1) - for _, v := range torun { - err := v.Compile() - if err != nil { - log.Fatal(err) - } - } +// log.Println("Compiling") - log.Println("Done") - compiling.compiling = false - compiling.mu.Unlock() - wg.Done() -} +// for _, v := range torun { +// err := v.Compile() +// if err != nil { +// log.Fatal(err) +// } +// } + +// log.Println("Done") +// compiling.compiling = false +// compiling.mu.Unlock() +// wg.Done() +// } func newPool(addr string, maxconn int) *redis.Pool { return &redis.Pool{