chg: [grok] graceful compilation shutdown

nifi
Jean-Louis Huynen 2020-03-09 11:25:29 +01:00
parent 547fdba5c8
commit df32553050
No known key found for this signature in database
GPG Key ID: 64799157F4BD6B93
3 changed files with 30 additions and 38 deletions

View File

@ -14,7 +14,7 @@ type (
// Parse to parse a line of log // Parse to parse a line of log
// Flush recomputes statisitcs and recompile output // Flush recomputes statisitcs and recompile output
Compiler interface { Compiler interface {
Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, *redis.Conn, int, string, int, int) Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, *redis.Conn, int, string, int, int, *sync.WaitGroup)
Pull() error Pull() error
Flush() error Flush() error
Compile() error Compile() error
@ -38,9 +38,9 @@ type (
compilationTrigger int compilationTrigger int
// Current line processed // Current line processed
nbLines int nbLines int
// Global WaitGroup to handle exiting // Global WaitGroup to handle graceful exiting a compilation routines
wg *sync.WaitGroup compilegr *sync.WaitGroup
// comutex embedding // Comutex embedding
comutex comutex
} }
@ -51,7 +51,7 @@ type (
) )
// Set set the redis connections to this compiler // Set set the redis connections to this compiler
func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, rconn2 *redis.Conn, db int, queue string, ct int, rt int) { func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, rconn2 *redis.Conn, db int, queue string, ct int, rt int, compilegr *sync.WaitGroup) {
s.r0 = rconn0 s.r0 = rconn0
s.r1 = rconn1 s.r1 = rconn1
s.r2 = rconn2 s.r2 = rconn2
@ -60,4 +60,5 @@ func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *red
s.compilationTrigger = ct s.compilationTrigger = ct
s.retryPeriod = time.Duration(rt) * time.Minute s.retryPeriod = time.Duration(rt) * time.Minute
s.compiling = false s.compiling = false
s.compilegr = compilegr
} }

View File

@ -166,7 +166,6 @@ func (s *SSHDCompiler) Pull() error {
s.nbLines = 0 s.nbLines = 0
//Non-blocking //Non-blocking
if !s.compiling { if !s.compiling {
//s.(*)wg.Add(1)
go s.Compile() go s.Compile()
} }
} }
@ -284,6 +283,10 @@ func compileStat(s *SSHDCompiler, datestr string, mode string, src string, usern
// Compile create graphs of the results // Compile create graphs of the results
func (s *SSHDCompiler) Compile() error { func (s *SSHDCompiler) Compile() error {
s.mu.Lock()
s.compiling = true
s.compilegr.Add(1)
log.Println("[+] SSHD compiling")
r := *s.r0 r := *s.r0
// Pulling statistics from database 1 // Pulling statistics from database 1
@ -415,9 +418,10 @@ func (s *SSHDCompiler) Compile() error {
} }
// Parse Template // Parse Template
t, err := template.ParseFiles(filepath.Join("logparser", "sshd", "statistics.gohtml")) t, err := template.ParseFiles(filepath.Join("logcompiler", "sshd", "statistics.gohtml"))
if err != nil { if err != nil {
r.Close() r.Close()
log.Println(err)
return err return err
} }
@ -508,7 +512,7 @@ func (s *SSHDCompiler) Compile() error {
} }
// Copy js asset file // Copy js asset file
input, err := ioutil.ReadFile(filepath.Join("logparser", "sshd", "load.js")) input, err := ioutil.ReadFile(filepath.Join("logcompiler", "sshd", "load.js"))
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
@ -518,6 +522,12 @@ func (s *SSHDCompiler) Compile() error {
log.Println(err) log.Println(err)
} }
log.Println("[-] SSHD compiling finished.")
s.compiling = false
s.mu.Unlock()
// Tell main program we can exit if needed now
s.compilegr.Done()
return nil return nil
} }
@ -607,7 +617,5 @@ func plotStats(s *SSHDCompiler, v string) error {
return err return err
} }
// s.wg.Done()
return nil return nil
} }

39
main.go
View File

@ -53,7 +53,8 @@ var (
compilationTrigger = 20 compilationTrigger = 20
torun = []logcompiler.Compiler{} torun = []logcompiler.Compiler{}
// Routine handling // Routine handling
wg sync.WaitGroup pullgr sync.WaitGroup
compilegr sync.WaitGroup
) )
func main() { func main() {
@ -63,8 +64,7 @@ func main() {
go func() { go func() {
<-sortie <-sortie
fmt.Println("Exiting.") fmt.Println("Exiting.")
// TODO: handle the pulling routine compilegr.Wait()
// wg.Wait()
log.Println("Exit") log.Println("Exit")
os.Exit(0) os.Exit(0)
}() }()
@ -164,16 +164,19 @@ func main() {
if err != nil { if err != nil {
log.Fatal("Could not connect to input line on Compiler Redis") log.Fatal("Could not connect to input line on Compiler Redis")
} }
defer sshdrcon0.Close()
sshdrcon1, err := redisCompilers.Dial() sshdrcon1, err := redisCompilers.Dial()
if err != nil { if err != nil {
log.Fatal("Could not connect to output line on Compiler Redis") log.Fatal("Could not connect to output line on Compiler Redis")
} }
defer sshdrcon1.Close()
sshdrcon2, err := redisInput.Dial() sshdrcon2, err := redisInput.Dial()
if err != nil { if err != nil {
log.Fatal("Could not connect to output line on Input Redis") log.Fatal("Could not connect to output line on Input Redis")
} }
defer sshdrcon2.Close()
sshd := logcompiler.SSHDCompiler{} sshd := logcompiler.SSHDCompiler{}
sshd.Set(&wg, &sshdrcon0, &sshdrcon1, &sshdrcon2, ri.redisDB, "sshd", compilationTrigger, *retry) sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, &sshdrcon2, ri.redisDB, "sshd", compilationTrigger, *retry, &compilegr)
torun = append(torun, &sshd) torun = append(torun, &sshd)
} }
} }
@ -219,37 +222,17 @@ func main() {
} else { } else {
// Launching Pull routines // Launching Pull routines
for _, v := range torun { for _, v := range torun {
wg.Add(1) // we add pulling routines to a waitgroup,
// they can immediately die when exiting.
pullgr.Add(1)
go v.Pull() go v.Pull()
} }
} }
wg.Wait() pullgr.Wait()
log.Println("Exit") log.Println("Exit")
} }
// TODO: move into compilers
// func compile() {
// compiling.mu.Lock()
// compiling.compiling = true
// wg.Add(1)
// log.Println("Compiling")
// for _, v := range torun {
// err := v.Compile()
// if err != nil {
// log.Fatal(err)
// }
// }
// log.Println("Done")
// compiling.compiling = false
// compiling.mu.Unlock()
// wg.Done()
// }
func newPool(addr string, maxconn int) *redis.Pool { func newPool(addr string, maxconn int) *redis.Pool {
return &redis.Pool{ return &redis.Pool{
MaxActive: maxconn, MaxActive: maxconn,