add: [sshd] Flushing Statistics Feature

nifi
Jean-Louis Huynen 2020-02-05 10:45:05 +01:00
parent b874e6c172
commit 0b40c4e4a3
No known key found for this signature in database
GPG Key ID: 64799157F4BD6B93
3 changed files with 126 additions and 41 deletions

View File

@ -7,9 +7,11 @@ type (
// It should provide:
// Set to assign a redis connection to it
// Parse to parse a line of log
// Flush recomputes statisitcs and recompile output
Parser interface {
Set(*redis.Conn, *redis.Conn)
Parse(string) error
Flush() error
Compile() error
}
)

View File

@ -22,7 +22,9 @@ import (
// SshdParser Holds a struct that corresponds to a sshd log line
// and the redis connection
type SshdParser struct {
// Write
r1 *redis.Conn
// Read
r2 *redis.Conn
}
@ -32,6 +34,66 @@ func (s *SshdParser) Set(rconn1 *redis.Conn, rconn2 *redis.Conn) {
s.r2 = rconn2
}
// Flush recomputes statistics and recompile HTML output
func (s *SshdParser) Flush() error {
log.Println("Flushing")
r1 := *s.r1
r0 := *s.r2
// writing in database 1
if _, err := r1.Do("SELECT", 1); err != nil {
r0.Close()
r1.Close()
return err
}
// flush stats DB
if _, err := r1.Do("FLUSHDB"); err != nil {
r0.Close()
r1.Close()
return err
}
log.Println("Statistics Database Flushed")
// reading from database 0
if _, err := r0.Do("SELECT", 0); err != nil {
r0.Close()
r1.Close()
return err
}
// Compile statistics / html output for each line
keys, err := redis.Strings(r0.Do("KEYS", "*"))
if err != nil {
r0.Close()
r1.Close()
return err
}
for _, v := range keys {
dateHost := strings.Split(v, ":")
kkeys, err := redis.StringMap(r0.Do("HGETALL", v))
if err != nil {
r0.Close()
r1.Close()
return err
}
dateInt, err := strconv.ParseInt(dateHost[0], 10, 64)
if err != nil {
r0.Close()
r1.Close()
return err
}
parsedTime := time.Unix(dateInt, 0)
err = compileStats(s, parsedTime, kkeys["src"], kkeys["username"], dateHost[1])
if err != nil {
r0.Close()
r1.Close()
return err
}
}
return nil
}
// Parse parses a line of sshd log
func (s *SshdParser) Parse(logline string) error {
r := *s.r1
@ -58,12 +120,26 @@ func (s *SshdParser) Parse(logline string) error {
r.Close()
return err
}
// Writing logs
_, err := redis.Bool(r.Do("HSET", fmt.Sprintf("%v:%v", md["date"], md["host"]), "username", md["username"], "src", md["src"]))
if err != nil {
r.Close()
return err
}
err = compileStats(s, parsedTime, md["src"], md["username"], md["host"])
if err != nil {
r.Close()
return err
}
return nil
}
func compileStats(s *SshdParser, parsedTime time.Time, src string, username string, host string) error {
r := *s.r1
// Pushing statistics in database 1
if _, err := r.Do("SELECT", 1); err != nil {
r.Close()
@ -74,8 +150,7 @@ func (s *SshdParser) Parse(logline string) error {
dstr := fmt.Sprintf("%v%v%v", parsedTime.Year(), fmt.Sprintf("%02d", int(parsedTime.Month())), fmt.Sprintf("%02d", int(parsedTime.Day())))
// Check current entry date as oldest if older than the current
var oldest string
if oldest, err = redis.String(r.Do("GET", "oldest")); err == redis.ErrNil {
if oldest, err := redis.String(r.Do("GET", "oldest")); err == redis.ErrNil {
r.Do("SET", "oldest", dstr)
} else if err != nil {
r.Close()
@ -92,8 +167,7 @@ func (s *SshdParser) Parse(logline string) error {
}
// Check current entry date as oldest if older than the current
var newest string
if newest, err = redis.String(r.Do("GET", "newest")); err == redis.ErrNil {
if newest, err := redis.String(r.Do("GET", "newest")); err == redis.ErrNil {
r.Do("SET", "newest", dstr)
} else if err != nil {
r.Close()
@ -109,7 +183,7 @@ func (s *SshdParser) Parse(logline string) error {
}
}
err = compileStats(s, dstr, "daily", md["src"], md["username"], md["host"])
err := compileStat(s, dstr, "daily", src, username, host)
if err != nil {
r.Close()
return err
@ -117,7 +191,7 @@ func (s *SshdParser) Parse(logline string) error {
// Monthly
mstr := fmt.Sprintf("%v%v", parsedTime.Year(), fmt.Sprintf("%02d", int(parsedTime.Month())))
err = compileStats(s, mstr, "daily", md["src"], md["username"], md["host"])
err = compileStat(s, mstr, "daily", src, username, host)
if err != nil {
r.Close()
return err
@ -125,7 +199,7 @@ func (s *SshdParser) Parse(logline string) error {
// Yearly
ystr := fmt.Sprintf("%v", parsedTime.Year())
err = compileStats(s, ystr, "daily", md["src"], md["username"], md["host"])
err = compileStat(s, ystr, "daily", src, username, host)
if err != nil {
r.Close()
return err
@ -134,7 +208,7 @@ func (s *SshdParser) Parse(logline string) error {
return nil
}
func compileStats(s *SshdParser, datestr string, mode string, src string, username string, host string) error {
func compileStat(s *SshdParser, datestr string, mode string, src string, username string, host string) error {
r := *s.r1
_, err := redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statssrc"), 1, src))
if err != nil {
@ -247,13 +321,9 @@ func (s *SshdParser) Compile() error {
}
#imageholder {
background: black;
color: white;
padding: 1em;
position: absolute;
top: 50%;
left: 50%;
margin-right: -50%;
transform: translate(-50%, -40%)
margin: auto;
width: 50%;
padding: 10px;
}
span {
float: left;
@ -274,7 +344,7 @@ func (s *SshdParser) Compile() error {
<option value="statshost">Hosts</option>
</select>
</span>
<span id="imageholder"></span>
<div id="imageholder"></div>
</body>
</html>`

63
main.go
View File

@ -47,6 +47,7 @@ var (
debug = flag.Bool("d", false, "debug info in logs")
fromfile = flag.String("f", "", "parse from file on disk")
retry = flag.Int("r", 1, "time in minute before retry on empty d4 queue")
flush = flag.Bool("F", false, "Flush HTML output, recompile all statistic from redis logs, then quits")
redisD4 redis.Conn
redisParsers *redis.Pool
parsers = [1]string{"sshd"}
@ -111,36 +112,40 @@ func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
}
// Parse Redis D4 Config
tmp := config.ReadConfigFile(*confdir, "redis_d4")
ss := strings.Split(string(tmp), "/")
if len(ss) <= 1 {
log.Fatal("Missing Database in Redis D4 config: should be host:port/database_name")
// Dont't touch D4 server if Flushing
if !*flush {
// Parse Redis D4 Config
tmp := config.ReadConfigFile(*confdir, "redis_d4")
ss := strings.Split(string(tmp), "/")
if len(ss) <= 1 {
log.Fatal("Missing Database in Redis D4 config: should be host:port/database_name")
}
rd4.redisDB, _ = strconv.Atoi(ss[1])
var ret bool
ret, ss[0] = config.IsNet(ss[0])
if !ret {
sss := strings.Split(string(ss[0]), ":")
rd4.redisHost = sss[0]
rd4.redisPort = sss[1]
}
rd4.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue"))
// Connect to D4 Redis
// TODO use DialOptions to Dial with a timeout
redisD4, err = redis.Dial("tcp", rd4.redisHost+":"+rd4.redisPort, redis.DialDatabase(rd4.redisDB))
if err != nil {
log.Fatal(err)
}
defer redisD4.Close()
}
rd4.redisDB, _ = strconv.Atoi(ss[1])
var ret bool
ret, ss[0] = config.IsNet(ss[0])
if !ret {
sss := strings.Split(string(ss[0]), ":")
rd4.redisHost = sss[0]
rd4.redisPort = sss[1]
}
rd4.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue"))
// Connect to D4 Redis
// TODO use DialOptions to Dial with a timeout
redisD4, err = redis.Dial("tcp", rd4.redisHost+":"+rd4.redisPort, redis.DialDatabase(rd4.redisDB))
if err != nil {
log.Fatal(err)
}
defer redisD4.Close()
// Parse Redis Parsers Config
tmp = config.ReadConfigFile(*confdir, "redis_parsers")
ss = strings.Split(string(tmp), "/")
tmp := config.ReadConfigFile(*confdir, "redis_parsers")
ss := strings.Split(string(tmp), "/")
if len(ss) <= 1 {
log.Fatal("Missing Database Count in Redis config: should be host:port/max number of DB")
}
rp.redisDBCount, _ = strconv.Atoi(ss[1])
var ret bool
ret, ss[0] = config.IsNet(ss[0])
if !ret {
sss := strings.Split(string(ss[0]), ":")
@ -177,8 +182,16 @@ func main() {
log.Println("TODO should run specific parser here")
}
// Parsing loop
if *fromfile != "" {
// If we flush, we bypass the parsing loop
if *flush {
for _, v := range torun {
err := v.Flush()
if err != nil {
log.Fatal(err)
}
}
// Parsing loop
} else if *fromfile != "" {
f, err = os.Open(*fromfile)
if err != nil {
log.Fatalf("Error opening seed file: %v", err)