chg: [sshd] retry/sleep on lack of incoming data

nifi
Jean-Louis Huynen 2020-05-27 18:08:40 +02:00
parent f69c9348da
commit 117a4d0b2f
1 changed files with 43 additions and 40 deletions

View File

@ -90,55 +90,58 @@ func (s *SSHDCompiler) Flush() error {
func (s *SSHDCompiler) Pull(c chan error) { func (s *SSHDCompiler) Pull(c chan error) {
r1 := *s.r1 r1 := *s.r1
for {
jsoner := json.NewDecoder(s.reader) jsoner := json.NewDecoder(s.reader)
DecodeLoop:
for jsoner.More() {
var m GrokedSSHD
err := jsoner.Decode(&m)
if err := jsoner.Decode(&m); err == io.EOF {
// On EOF we break this loop to go to a sleep
break DecodeLoop
} else if err != nil {
s.teardown(err)
}
DecodeLoop: fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser)
for jsoner.More() {
var m GrokedSSHD
err := jsoner.Decode(&m)
if err := jsoner.Decode(&m); err == io.EOF {
// In case of EOF, we wait for the reader to have
// new data available
time.Sleep(s.retryPeriod)
continue DecodeLoop
} else if err != nil {
log.Fatal(err)
}
fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser) // Assumes the system parses logs recorded during the current year
m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year())
// TODO Make this automatic or a config parameter
loc, _ := time.LoadLocation("Europe/Luxembourg")
parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc)
m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10))
// Assumes the system parses logs recorded during the current year // Pushing loglines in database 0
m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year()) if _, err := r1.Do("SELECT", 0); err != nil {
// TODO Make this automatic or a config parameter s.teardown(err)
loc, _ := time.LoadLocation("Europe/Luxembourg") }
parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc)
m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10))
// Pushing loglines in database 0 // Writing logs
if _, err := r1.Do("SELECT", 0); err != nil { _, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP))
s.teardown(err) if err != nil {
} s.teardown(err)
}
// Writing logs err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname)
_, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP)) if err != nil {
if err != nil { s.teardown(err)
s.teardown(err) }
}
err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname) // Compiler html / jsons
if err != nil { s.nbLines++
s.teardown(err) if s.nbLines > s.compilationTrigger {
} s.nbLines = 0
//Non-blocking
// Compiler html / jsons if !s.compiling {
s.nbLines++ go s.compile()
if s.nbLines > s.compilationTrigger { }
s.nbLines = 0
//Non-blocking
if !s.compiling {
go s.compile()
} }
} }
// EOF, we wait for the reader to have
// new data available
time.Sleep(s.retryPeriod)
} }
} }