Merge branch 'nifi' of github.com:D4-project/analyzer-d4-log into nifi
commit
5bb4df40e8
|
@ -96,55 +96,58 @@ func (s *SSHDCompiler) Flush() error {
|
||||||
func (s *SSHDCompiler) Pull(c chan error) {
|
func (s *SSHDCompiler) Pull(c chan error) {
|
||||||
r1 := *s.r1
|
r1 := *s.r1
|
||||||
|
|
||||||
|
for {
|
||||||
jsoner := json.NewDecoder(s.reader)
|
jsoner := json.NewDecoder(s.reader)
|
||||||
|
DecodeLoop:
|
||||||
|
for jsoner.More() {
|
||||||
|
var m GrokedSSHD
|
||||||
|
err := jsoner.Decode(&m)
|
||||||
|
if err := jsoner.Decode(&m); err == io.EOF {
|
||||||
|
// On EOF we break this loop to go to a sleep
|
||||||
|
break DecodeLoop
|
||||||
|
} else if err != nil {
|
||||||
|
s.teardown(err)
|
||||||
|
}
|
||||||
|
|
||||||
DecodeLoop:
|
fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser)
|
||||||
for jsoner.More() {
|
|
||||||
var m GrokedSSHD
|
|
||||||
err := jsoner.Decode(&m)
|
|
||||||
if err := jsoner.Decode(&m); err == io.EOF {
|
|
||||||
// In case of EOF, we wait for the reader to have
|
|
||||||
// new data available
|
|
||||||
time.Sleep(s.retryPeriod)
|
|
||||||
continue DecodeLoop
|
|
||||||
} else if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("time: %s, hostname: %s, client_ip: %s, user: %s\n", m.SyslogTimestamp, m.SyslogHostname, m.SshdClientIP, m.SshdInvalidUser)
|
// Assumes the system parses logs recorded during the current year
|
||||||
|
m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year())
|
||||||
|
// TODO Make this automatic or a config parameter
|
||||||
|
loc, _ := time.LoadLocation("Europe/Luxembourg")
|
||||||
|
parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc)
|
||||||
|
m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10))
|
||||||
|
|
||||||
// Assumes the system parses logs recorded during the current year
|
// Pushing loglines in database 0
|
||||||
m.SyslogTimestamp = fmt.Sprintf("%v %v", m.SyslogTimestamp, time.Now().Year())
|
if _, err := r1.Do("SELECT", 0); err != nil {
|
||||||
// TODO Make this automatic or a config parameter
|
s.teardown(err)
|
||||||
loc, _ := time.LoadLocation("Europe/Luxembourg")
|
}
|
||||||
parsedTime, _ := time.ParseInLocation("Jan 2 15:04:05 2006", m.SyslogTimestamp, loc)
|
|
||||||
m.SyslogTimestamp = string(strconv.FormatInt(parsedTime.Unix(), 10))
|
|
||||||
|
|
||||||
// Pushing loglines in database 0
|
// Writing logs
|
||||||
if _, err := r1.Do("SELECT", 0); err != nil {
|
_, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP))
|
||||||
s.teardown(err)
|
if err != nil {
|
||||||
}
|
s.teardown(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Writing logs
|
err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname)
|
||||||
_, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP))
|
if err != nil {
|
||||||
if err != nil {
|
s.teardown(err)
|
||||||
s.teardown(err)
|
}
|
||||||
}
|
|
||||||
|
|
||||||
err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname)
|
// Compiler html / jsons
|
||||||
if err != nil {
|
s.nbLines++
|
||||||
s.teardown(err)
|
if s.nbLines > s.compilationTrigger {
|
||||||
}
|
s.nbLines = 0
|
||||||
|
//Non-blocking
|
||||||
// Compiler html / jsons
|
if !s.compiling {
|
||||||
s.nbLines++
|
go s.compile()
|
||||||
if s.nbLines > s.compilationTrigger {
|
}
|
||||||
s.nbLines = 0
|
|
||||||
//Non-blocking
|
|
||||||
if !s.compiling {
|
|
||||||
go s.compile()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// EOF, we wait for the reader to have
|
||||||
|
// new data available
|
||||||
|
time.Sleep(s.retryPeriod)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue