From df3255305067cdcb6157c5426e9d10ea21a882ae Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Mon, 9 Mar 2020 11:25:29 +0100 Subject: [PATCH] chg: [grok] graceful compilation shutdown --- logcompiler/compiler.go | 11 ++++++----- logcompiler/sshd.go | 18 +++++++++++++----- main.go | 39 +++++++++++---------------------------- 3 files changed, 30 insertions(+), 38 deletions(-) diff --git a/logcompiler/compiler.go b/logcompiler/compiler.go index 7774eba..42bfa80 100644 --- a/logcompiler/compiler.go +++ b/logcompiler/compiler.go @@ -14,7 +14,7 @@ type ( // Parse to parse a line of log // Flush recomputes statisitcs and recompile output Compiler interface { - Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, *redis.Conn, int, string, int, int) + Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, *redis.Conn, int, string, int, int, *sync.WaitGroup) Pull() error Flush() error Compile() error @@ -38,9 +38,9 @@ type ( compilationTrigger int // Current line processed nbLines int - // Global WaitGroup to handle exiting - wg *sync.WaitGroup - // comutex embedding + // Global WaitGroup to handle graceful exiting a compilation routines + compilegr *sync.WaitGroup + // Comutex embedding comutex } @@ -51,7 +51,7 @@ type ( ) // Set set the redis connections to this compiler -func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, rconn2 *redis.Conn, db int, queue string, ct int, rt int) { +func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, rconn2 *redis.Conn, db int, queue string, ct int, rt int, compilegr *sync.WaitGroup) { s.r0 = rconn0 s.r1 = rconn1 s.r2 = rconn2 @@ -60,4 +60,5 @@ func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *red s.compilationTrigger = ct s.retryPeriod = time.Duration(rt) * time.Minute s.compiling = false + s.compilegr = compilegr } diff --git a/logcompiler/sshd.go b/logcompiler/sshd.go index e52e37c..bda2cc4 100644 --- a/logcompiler/sshd.go +++ b/logcompiler/sshd.go @@ -166,7 +166,6 @@ func (s *SSHDCompiler) Pull() error { s.nbLines = 0 //Non-blocking if !s.compiling { - //s.(*)wg.Add(1) go s.Compile() } } @@ -284,6 +283,10 @@ func compileStat(s *SSHDCompiler, datestr string, mode string, src string, usern // Compile create graphs of the results func (s *SSHDCompiler) Compile() error { + s.mu.Lock() + s.compiling = true + s.compilegr.Add(1) + log.Println("[+] SSHD compiling") r := *s.r0 // Pulling statistics from database 1 @@ -415,9 +418,10 @@ func (s *SSHDCompiler) Compile() error { } // Parse Template - t, err := template.ParseFiles(filepath.Join("logparser", "sshd", "statistics.gohtml")) + t, err := template.ParseFiles(filepath.Join("logcompiler", "sshd", "statistics.gohtml")) if err != nil { r.Close() + log.Println(err) return err } @@ -508,7 +512,7 @@ func (s *SSHDCompiler) Compile() error { } // Copy js asset file - input, err := ioutil.ReadFile(filepath.Join("logparser", "sshd", "load.js")) + input, err := ioutil.ReadFile(filepath.Join("logcompiler", "sshd", "load.js")) if err != nil { log.Println(err) } @@ -518,6 +522,12 @@ func (s *SSHDCompiler) Compile() error { log.Println(err) } + log.Println("[-] SSHD compiling finished.") + s.compiling = false + s.mu.Unlock() + // Tell main program we can exit if needed now + s.compilegr.Done() + return nil } @@ -607,7 +617,5 @@ func plotStats(s *SSHDCompiler, v string) error { return err } - // s.wg.Done() - return nil } diff --git a/main.go b/main.go index 020eb68..329d23a 100644 --- a/main.go +++ b/main.go @@ -53,7 +53,8 @@ var ( compilationTrigger = 20 torun = []logcompiler.Compiler{} // Routine handling - wg sync.WaitGroup + pullgr sync.WaitGroup + compilegr sync.WaitGroup ) func main() { @@ -63,8 +64,7 @@ func main() { go func() { <-sortie fmt.Println("Exiting.") - // TODO: handle the pulling routine - // wg.Wait() + compilegr.Wait() log.Println("Exit") os.Exit(0) }() @@ -164,16 +164,19 @@ func main() { if err != nil { log.Fatal("Could not connect to input line on Compiler Redis") } + defer sshdrcon0.Close() sshdrcon1, err := redisCompilers.Dial() if err != nil { log.Fatal("Could not connect to output line on Compiler Redis") } + defer sshdrcon1.Close() sshdrcon2, err := redisInput.Dial() if err != nil { log.Fatal("Could not connect to output line on Input Redis") } + defer sshdrcon2.Close() sshd := logcompiler.SSHDCompiler{} - sshd.Set(&wg, &sshdrcon0, &sshdrcon1, &sshdrcon2, ri.redisDB, "sshd", compilationTrigger, *retry) + sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, &sshdrcon2, ri.redisDB, "sshd", compilationTrigger, *retry, &compilegr) torun = append(torun, &sshd) } } @@ -219,37 +222,17 @@ func main() { } else { // Launching Pull routines for _, v := range torun { - wg.Add(1) + // we add pulling routines to a waitgroup, + // they can immediately die when exiting. + pullgr.Add(1) go v.Pull() } } - wg.Wait() + pullgr.Wait() log.Println("Exit") } -// TODO: move into compilers - -// func compile() { -// compiling.mu.Lock() -// compiling.compiling = true -// wg.Add(1) - -// log.Println("Compiling") - -// for _, v := range torun { -// err := v.Compile() -// if err != nil { -// log.Fatal(err) -// } -// } - -// log.Println("Done") -// compiling.compiling = false -// compiling.mu.Unlock() -// wg.Done() -// } - func newPool(addr string, maxconn int) *redis.Pool { return &redis.Pool{ MaxActive: maxconn,