chg: [compiler] teardown function + error channel

nifi
Jean-Louis Huynen 2020-03-13 14:59:04 +01:00
parent aef4b518c0
commit 31b491ba97
No known key found for this signature in database
GPG Key ID: 64799157F4BD6B93
3 changed files with 60 additions and 85 deletions

View File

@ -4,6 +4,7 @@ import (
"io" "io"
"sync" "sync"
"github.com/D4-project/analyzer-d4-log/inputreader"
"github.com/gomodule/redigo/redis" "github.com/gomodule/redigo/redis"
) )
@ -14,11 +15,10 @@ type (
// Parse to parse a line of log // Parse to parse a line of log
// Flush recomputes statisitcs and recompile output // Flush recomputes statisitcs and recompile output
Compiler interface { Compiler interface {
Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup) Set(*sync.WaitGroup, *redis.Conn, *redis.Conn, io.Reader, int, *sync.WaitGroup, *chan error)
SetReader(io.Reader) SetReader(io.Reader)
Pull() error Pull(chan error)
Flush() error Flush() error
Compile() error
} }
// CompilerStruct will implements Compiler, and should be embedded in // CompilerStruct will implements Compiler, and should be embedded in
@ -36,6 +36,8 @@ type (
nbLines int nbLines int
// Global WaitGroup to handle graceful exiting a compilation routines // Global WaitGroup to handle graceful exiting a compilation routines
compilegr *sync.WaitGroup compilegr *sync.WaitGroup
// Goroutines error channel
pullreturn *chan error
// Comutex embedding // Comutex embedding
comutex comutex
} }
@ -47,16 +49,32 @@ type (
) )
// Set set the redis connections to this compiler // Set set the redis connections to this compiler
func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, reader io.Reader, ct int, compilegr *sync.WaitGroup) { func (s *CompilerStruct) Set(wg *sync.WaitGroup, rconn0 *redis.Conn, rconn1 *redis.Conn, reader io.Reader, ct int, compilegr *sync.WaitGroup, c *chan error) {
s.r0 = rconn0 s.r0 = rconn0
s.r1 = rconn1 s.r1 = rconn1
s.reader = reader s.reader = reader
s.compilationTrigger = ct s.compilationTrigger = ct
s.compiling = false s.compiling = false
s.compilegr = compilegr s.compilegr = compilegr
s.pullreturn = c
} }
// SetReader Changes compiler's input // SetReader Changes compiler's input
func (s *CompilerStruct) SetReader(reader io.Reader) { func (s *CompilerStruct) SetReader(reader io.Reader) {
s.reader = reader s.reader = reader
} }
// tear down is called on error to close redis connections
// and log errors
func (s *CompilerStruct) teardown(err error) {
*s.pullreturn <- err
(*s.r0).Close()
(*s.r1).Close()
// If the reader is a LPOPReader, we need to teardown the connection
switch s.reader.(type) {
case *inputreader.RedisLPOPReader:
tmp := *s.reader.(*inputreader.RedisLPOPReader)
tmp.Teardown()
}
}

View File

@ -2,6 +2,7 @@ package logcompiler
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"html/template" "html/template"
"io/ioutil" "io/ioutil"
@ -45,53 +46,39 @@ func (s *SSHDCompiler) Flush() error {
r0 := *s.r0 r0 := *s.r0
// writing in database 1 // writing in database 1
if _, err := r1.Do("SELECT", 1); err != nil { if _, err := r1.Do("SELECT", 1); err != nil {
r0.Close() s.teardown(err)
r1.Close()
return err
} }
// flush stats DB // flush stats DB
if _, err := r1.Do("FLUSHDB"); err != nil { if _, err := r1.Do("FLUSHDB"); err != nil {
r0.Close() s.teardown(err)
r1.Close()
return err
} }
log.Println("Statistics Database Flushed") log.Println("Statistics Database Flushed")
// reading from database 0 // reading from database 0
if _, err := r0.Do("SELECT", 0); err != nil { if _, err := r0.Do("SELECT", 0); err != nil {
r0.Close() s.teardown(err)
r1.Close()
return err
} }
// Compile statistics / html output for each line // Compile statistics / html output for each line
keys, err := redis.Strings(r0.Do("KEYS", "*")) keys, err := redis.Strings(r0.Do("KEYS", "*"))
if err != nil { if err != nil {
r0.Close() s.teardown(err)
r1.Close()
return err
} }
for _, v := range keys { for _, v := range keys {
dateHost := strings.Split(v, ":") dateHost := strings.Split(v, ":")
kkeys, err := redis.StringMap(r0.Do("HGETALL", v)) kkeys, err := redis.StringMap(r0.Do("HGETALL", v))
if err != nil { if err != nil {
r0.Close() s.teardown(err)
r1.Close()
return err
} }
dateInt, err := strconv.ParseInt(dateHost[0], 10, 64) dateInt, err := strconv.ParseInt(dateHost[0], 10, 64)
if err != nil { if err != nil {
r0.Close() s.teardown(err)
r1.Close()
return err
} }
parsedTime := time.Unix(dateInt, 0) parsedTime := time.Unix(dateInt, 0)
err = compileStats(s, parsedTime, kkeys["src"], kkeys["username"], dateHost[1]) err = compileStats(s, parsedTime, kkeys["src"], kkeys["username"], dateHost[1])
if err != nil { if err != nil {
r0.Close() s.teardown(err)
r1.Close()
return err
} }
} }
@ -99,7 +86,7 @@ func (s *SSHDCompiler) Flush() error {
} }
// Pull pulls a line of groked sshd logline from redis // Pull pulls a line of groked sshd logline from redis
func (s *SSHDCompiler) Pull() error { func (s *SSHDCompiler) Pull(c chan error) {
r1 := *s.r1 r1 := *s.r1
jsoner := json.NewDecoder(s.reader) jsoner := json.NewDecoder(s.reader)
@ -121,21 +108,18 @@ func (s *SSHDCompiler) Pull() error {
// Pushing loglines in database 0 // Pushing loglines in database 0
if _, err := r1.Do("SELECT", 0); err != nil { if _, err := r1.Do("SELECT", 0); err != nil {
r1.Close() s.teardown(err)
return err
} }
// Writing logs // Writing logs
_, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP)) _, err = redis.Bool(r1.Do("HSET", fmt.Sprintf("%v:%v", m.SyslogTimestamp, m.SyslogHostname), "username", m.SshdInvalidUser, "src", m.SshdClientIP))
if err != nil { if err != nil {
r1.Close() s.teardown(err)
return err
} }
err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname) err = compileStats(s, parsedTime, m.SshdClientIP, m.SshdInvalidUser, m.SyslogHostname)
if err != nil { if err != nil {
r1.Close() s.teardown(err)
return err
} }
// Compiler html / jsons // Compiler html / jsons
@ -144,11 +128,10 @@ func (s *SSHDCompiler) Pull() error {
s.nbLines = 0 s.nbLines = 0
//Non-blocking //Non-blocking
if !s.compiling { if !s.compiling {
go s.Compile() go s.compile()
} }
} }
} }
return nil
} }
func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username string, host string) error { func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username string, host string) error {
@ -156,8 +139,7 @@ func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username st
// Pushing statistics in database 1 // Pushing statistics in database 1
if _, err := r.Do("SELECT", 1); err != nil { if _, err := r.Do("SELECT", 1); err != nil {
r.Close() s.teardown(err)
return err
} }
// Daily // Daily
@ -167,8 +149,7 @@ func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username st
if oldest, err := redis.String(r.Do("GET", "oldest")); err == redis.ErrNil { if oldest, err := redis.String(r.Do("GET", "oldest")); err == redis.ErrNil {
r.Do("SET", "oldest", dstr) r.Do("SET", "oldest", dstr)
} else if err != nil { } else if err != nil {
r.Close() s.teardown(err)
return err
} else { } else {
// Check if dates are the same // Check if dates are the same
if oldest != dstr { if oldest != dstr {
@ -184,8 +165,7 @@ func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username st
if newest, err := redis.String(r.Do("GET", "newest")); err == redis.ErrNil { if newest, err := redis.String(r.Do("GET", "newest")); err == redis.ErrNil {
r.Do("SET", "newest", dstr) r.Do("SET", "newest", dstr)
} else if err != nil { } else if err != nil {
r.Close() s.teardown(err)
return err
} else { } else {
// Check if dates are the same // Check if dates are the same
if newest != dstr { if newest != dstr {
@ -199,24 +179,21 @@ func compileStats(s *SSHDCompiler, parsedTime time.Time, src string, username st
err := compileStat(s, dstr, "daily", src, username, host) err := compileStat(s, dstr, "daily", src, username, host)
if err != nil { if err != nil {
r.Close() s.teardown(err)
return err
} }
// Monthly // Monthly
mstr := fmt.Sprintf("%v%v", parsedTime.Year(), fmt.Sprintf("%02d", int(parsedTime.Month()))) mstr := fmt.Sprintf("%v%v", parsedTime.Year(), fmt.Sprintf("%02d", int(parsedTime.Month())))
err = compileStat(s, mstr, "daily", src, username, host) err = compileStat(s, mstr, "daily", src, username, host)
if err != nil { if err != nil {
r.Close() s.teardown(err)
return err
} }
// Yearly // Yearly
ystr := fmt.Sprintf("%v", parsedTime.Year()) ystr := fmt.Sprintf("%v", parsedTime.Year())
err = compileStat(s, ystr, "daily", src, username, host) err = compileStat(s, ystr, "daily", src, username, host)
if err != nil { if err != nil {
r.Close() s.teardown(err)
return err
} }
return nil return nil
@ -226,41 +203,35 @@ func compileStat(s *SSHDCompiler, datestr string, mode string, src string, usern
r := *s.r1 r := *s.r1
_, err := redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statssrc"), 1, src)) _, err := redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statssrc"), 1, src))
if err != nil { if err != nil {
r.Close()
return err return err
} }
_, err = redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statsusername"), 1, username)) _, err = redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statsusername"), 1, username))
if err != nil { if err != nil {
r.Close()
return err return err
} }
_, err = redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statshost"), 1, host)) _, err = redis.String(r.Do("ZINCRBY", fmt.Sprintf("%v:%v", datestr, "statshost"), 1, host))
if err != nil { if err != nil {
r.Close()
return err return err
} }
_, err = redis.Int(r.Do("SADD", fmt.Sprintf("toupdate:%v", mode), fmt.Sprintf("%v:%v", datestr, "statssrc"))) _, err = redis.Int(r.Do("SADD", fmt.Sprintf("toupdate:%v", mode), fmt.Sprintf("%v:%v", datestr, "statssrc")))
if err != nil { if err != nil {
r.Close()
return err return err
} }
_, err = redis.Int(r.Do("SADD", fmt.Sprintf("toupdate:%v", mode), fmt.Sprintf("%v:%v", datestr, "statsusername"))) _, err = redis.Int(r.Do("SADD", fmt.Sprintf("toupdate:%v", mode), fmt.Sprintf("%v:%v", datestr, "statsusername")))
if err != nil { if err != nil {
r.Close()
return err return err
} }
_, err = redis.Int(r.Do("SADD", fmt.Sprintf("toupdate:%v", mode), fmt.Sprintf("%v:%v", datestr, "statshost"))) _, err = redis.Int(r.Do("SADD", fmt.Sprintf("toupdate:%v", mode), fmt.Sprintf("%v:%v", datestr, "statshost")))
if err != nil { if err != nil {
r.Close()
return err return err
} }
return nil return nil
} }
// Compile create graphs of the results // compile create json and graphical representation of the results
func (s *SSHDCompiler) Compile() error { func (s *SSHDCompiler) compile() error {
s.mu.Lock() s.mu.Lock()
s.compiling = true s.compiling = true
s.compilegr.Add(1) s.compilegr.Add(1)
@ -269,14 +240,12 @@ func (s *SSHDCompiler) Compile() error {
// Pulling statistics from database 1 // Pulling statistics from database 1
if _, err := r.Do("SELECT", 1); err != nil { if _, err := r.Do("SELECT", 1); err != nil {
r.Close()
return err return err
} }
// List days for which we need to update statistics // List days for which we need to update statistics
toupdateD, err := redis.Strings(r.Do("SMEMBERS", "toupdate:daily")) toupdateD, err := redis.Strings(r.Do("SMEMBERS", "toupdate:daily"))
if err != nil { if err != nil {
r.Close()
return err return err
} }
@ -284,7 +253,6 @@ func (s *SSHDCompiler) Compile() error {
for _, v := range toupdateD { for _, v := range toupdateD {
err = plotStats(s, v) err = plotStats(s, v)
if err != nil { if err != nil {
r.Close()
return err return err
} }
} }
@ -292,7 +260,6 @@ func (s *SSHDCompiler) Compile() error {
// List months for which we need to update statistics // List months for which we need to update statistics
toupdateM, err := redis.Strings(r.Do("SMEMBERS", "toupdate:monthly")) toupdateM, err := redis.Strings(r.Do("SMEMBERS", "toupdate:monthly"))
if err != nil { if err != nil {
r.Close()
return err return err
} }
@ -300,7 +267,6 @@ func (s *SSHDCompiler) Compile() error {
for _, v := range toupdateM { for _, v := range toupdateM {
err = plotStats(s, v) err = plotStats(s, v)
if err != nil { if err != nil {
r.Close()
return err return err
} }
} }
@ -308,7 +274,6 @@ func (s *SSHDCompiler) Compile() error {
// List years for which we need to update statistics // List years for which we need to update statistics
toupdateY, err := redis.Strings(r.Do("SMEMBERS", "toupdate:yearly")) toupdateY, err := redis.Strings(r.Do("SMEMBERS", "toupdate:yearly"))
if err != nil { if err != nil {
r.Close()
return err return err
} }
@ -316,7 +281,6 @@ func (s *SSHDCompiler) Compile() error {
for _, v := range toupdateY { for _, v := range toupdateY {
err = plotStats(s, v) err = plotStats(s, v)
if err != nil { if err != nil {
r.Close()
return err return err
} }
} }
@ -325,11 +289,9 @@ func (s *SSHDCompiler) Compile() error {
var newest string var newest string
var oldest string var oldest string
if newest, err = redis.String(r.Do("GET", "newest")); err == redis.ErrNil { if newest, err = redis.String(r.Do("GET", "newest")); err == redis.ErrNil {
r.Close()
return err return err
} }
if oldest, err = redis.String(r.Do("GET", "oldest")); err == redis.ErrNil { if oldest, err = redis.String(r.Do("GET", "oldest")); err == redis.ErrNil {
r.Close()
return err return err
} }
parsedOldest, _ := time.Parse("20060102", oldest) parsedOldest, _ := time.Parse("20060102", oldest)
@ -340,14 +302,12 @@ func (s *SSHDCompiler) Compile() error {
// Gettings list of years for which we have statistics // Gettings list of years for which we have statistics
reply, err := redis.Values(r.Do("SCAN", "0", "MATCH", "????:*", "COUNT", 1000)) reply, err := redis.Values(r.Do("SCAN", "0", "MATCH", "????:*", "COUNT", 1000))
if err != nil { if err != nil {
r.Close()
return err return err
} }
var cursor int64 var cursor int64
var items []string var items []string
_, err = redis.Scan(reply, &cursor, &items) _, err = redis.Scan(reply, &cursor, &items)
if err != nil { if err != nil {
r.Close()
return err return err
} }
@ -371,13 +331,11 @@ func (s *SSHDCompiler) Compile() error {
var mraw []string var mraw []string
reply, err = redis.Values(r.Do("SCAN", "0", "MATCH", v+"??:*", "COUNT", 1000)) reply, err = redis.Values(r.Do("SCAN", "0", "MATCH", v+"??:*", "COUNT", 1000))
if err != nil { if err != nil {
r.Close()
return err return err
} }
_, err = redis.Scan(reply, &cursor, &mraw) _, err = redis.Scan(reply, &cursor, &mraw)
if err != nil { if err != nil {
r.Close()
return err return err
} }
for _, m := range mraw { for _, m := range mraw {
@ -398,8 +356,6 @@ func (s *SSHDCompiler) Compile() error {
// Parse Template // Parse Template
t, err := template.ParseFiles(filepath.Join("logcompiler", "sshd", "statistics.gohtml")) t, err := template.ParseFiles(filepath.Join("logcompiler", "sshd", "statistics.gohtml"))
if err != nil { if err != nil {
r.Close()
log.Println(err)
return err return err
} }
@ -439,7 +395,6 @@ func (s *SSHDCompiler) Compile() error {
if _, err := os.Stat("data"); os.IsNotExist(err) { if _, err := os.Stat("data"); os.IsNotExist(err) {
err := os.Mkdir("data", 0700) err := os.Mkdir("data", 0700)
if err != nil { if err != nil {
r.Close()
return err return err
} }
} }
@ -447,7 +402,6 @@ func (s *SSHDCompiler) Compile() error {
if _, err := os.Stat(filepath.Join("data", "sshd")); os.IsNotExist(err) { if _, err := os.Stat(filepath.Join("data", "sshd")); os.IsNotExist(err) {
err := os.Mkdir(filepath.Join("data", "sshd"), 0700) err := os.Mkdir(filepath.Join("data", "sshd"), 0700)
if err != nil { if err != nil {
r.Close()
return err return err
} }
} }
@ -463,7 +417,6 @@ func (s *SSHDCompiler) Compile() error {
err = t.ExecuteTemplate(f, "dailytpl", daily) err = t.ExecuteTemplate(f, "dailytpl", daily)
err = t.ExecuteTemplate(f, "footertpl", daily) err = t.ExecuteTemplate(f, "footertpl", daily)
if err != nil { if err != nil {
r.Close()
return err return err
} }
@ -474,7 +427,6 @@ func (s *SSHDCompiler) Compile() error {
err = t.ExecuteTemplate(f, "monthlytpl", monthly) err = t.ExecuteTemplate(f, "monthlytpl", monthly)
err = t.ExecuteTemplate(f, "footertpl", monthly) err = t.ExecuteTemplate(f, "footertpl", monthly)
if err != nil { if err != nil {
r.Close()
return err return err
} }
@ -485,19 +437,18 @@ func (s *SSHDCompiler) Compile() error {
err = t.ExecuteTemplate(f, "yearlytpl", yearly) err = t.ExecuteTemplate(f, "yearlytpl", yearly)
err = t.ExecuteTemplate(f, "footertpl", yearly) err = t.ExecuteTemplate(f, "footertpl", yearly)
if err != nil { if err != nil {
r.Close()
return err return err
} }
// Copy js asset file // Copy js asset file
input, err := ioutil.ReadFile(filepath.Join("logcompiler", "sshd", "load.js")) input, err := ioutil.ReadFile(filepath.Join("logcompiler", "sshd", "load.js"))
if err != nil { if err != nil {
log.Println(err) return err
} }
err = ioutil.WriteFile(filepath.Join("data", "sshd", "load.js"), input, 0644) err = ioutil.WriteFile(filepath.Join("data", "sshd", "load.js"), input, 0644)
if err != nil { if err != nil {
log.Println(err) return err
} }
log.Println("[-] SSHD compiling finished.") log.Println("[-] SSHD compiling finished.")
@ -513,7 +464,6 @@ func plotStats(s *SSHDCompiler, v string) error {
r := *s.r0 r := *s.r0
zrank, err := redis.Strings(r.Do("ZRANGEBYSCORE", v, "-inf", "+inf", "WITHSCORES")) zrank, err := redis.Strings(r.Do("ZRANGEBYSCORE", v, "-inf", "+inf", "WITHSCORES"))
if err != nil { if err != nil {
r.Close()
return err return err
} }
@ -534,7 +484,6 @@ func plotStats(s *SSHDCompiler, v string) error {
p, err := plot.New() p, err := plot.New()
if err != nil { if err != nil {
r.Close()
return err return err
} }
@ -548,7 +497,7 @@ func plotStats(s *SSHDCompiler, v string) error {
p.Title.Text = "Host" p.Title.Text = "Host"
default: default:
p.Title.Text = "" p.Title.Text = ""
log.Println("We should not reach this point, open an issue.") return errors.New("we should not reach this point, open an issue")
} }
p.Y.Label.Text = "Count" p.Y.Label.Text = "Count"
@ -568,7 +517,6 @@ func plotStats(s *SSHDCompiler, v string) error {
if _, err := os.Stat("data"); os.IsNotExist(err) { if _, err := os.Stat("data"); os.IsNotExist(err) {
err := os.Mkdir("data", 0700) err := os.Mkdir("data", 0700)
if err != nil { if err != nil {
r.Close()
return err return err
} }
} }
@ -576,7 +524,6 @@ func plotStats(s *SSHDCompiler, v string) error {
if _, err := os.Stat(filepath.Join("data", "sshd")); os.IsNotExist(err) { if _, err := os.Stat(filepath.Join("data", "sshd")); os.IsNotExist(err) {
err := os.Mkdir(filepath.Join("data", "sshd"), 0700) err := os.Mkdir(filepath.Join("data", "sshd"), 0700)
if err != nil { if err != nil {
r.Close()
return err return err
} }
} }
@ -584,14 +531,12 @@ func plotStats(s *SSHDCompiler, v string) error {
if _, err := os.Stat(filepath.Join("data", "sshd", stype[0])); os.IsNotExist(err) { if _, err := os.Stat(filepath.Join("data", "sshd", stype[0])); os.IsNotExist(err) {
err := os.Mkdir(filepath.Join("data", "sshd", stype[0]), 0700) err := os.Mkdir(filepath.Join("data", "sshd", stype[0]), 0700)
if err != nil { if err != nil {
r.Close()
return err return err
} }
} }
xsize := 3 + vg.Length(math.Round(float64(len(keys)/2))) xsize := 3 + vg.Length(math.Round(float64(len(keys)/2)))
if err := p.Save(15*vg.Centimeter, xsize*vg.Centimeter, filepath.Join("data", "sshd", stype[0], fmt.Sprintf("%v.svg", v))); err != nil { if err := p.Save(15*vg.Centimeter, xsize*vg.Centimeter, filepath.Join("data", "sshd", stype[0], fmt.Sprintf("%v.svg", v))); err != nil {
r.Close()
return err return err
} }

16
main.go
View File

@ -154,6 +154,18 @@ func main() {
redisCompilers = newPool(rp.redisHost+":"+rp.redisPort, rp.redisDBCount) redisCompilers = newPool(rp.redisHost+":"+rp.redisPort, rp.redisDBCount)
redisInput = newPool(ri.redisHost+":"+ri.redisPort, 16) redisInput = newPool(ri.redisHost+":"+ri.redisPort, 16)
// Create a chan to get the goroutines errors messages
pullreturn := make(chan error, 1)
// Launching Pull routines monitoring
go func() {
select {
case err := <-pullreturn:
log.Println(err)
os.Exit(1)
log.Println("Exit.")
}
}()
// Init compiler depending on the compiler flags: // Init compiler depending on the compiler flags:
if *all { if *all {
// Init all compilers // Init all compilers
@ -177,7 +189,7 @@ func main() {
defer sshdrcon2.Close() defer sshdrcon2.Close()
redisReader := inputreader.NewLPOPReader(&sshdrcon2, ri.redisDB, "sshd", *retry) redisReader := inputreader.NewLPOPReader(&sshdrcon2, ri.redisDB, "sshd", *retry)
sshd := logcompiler.SSHDCompiler{} sshd := logcompiler.SSHDCompiler{}
sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, redisReader, compilationTrigger, &compilegr) sshd.Set(&pullgr, &sshdrcon0, &sshdrcon1, redisReader, compilationTrigger, &compilegr, &pullreturn)
torun = append(torun, &sshd) torun = append(torun, &sshd)
} }
} }
@ -213,7 +225,7 @@ func main() {
// we add pulling routines to a waitgroup, // we add pulling routines to a waitgroup,
// they can immediately die when exiting. // they can immediately die when exiting.
pullgr.Add(1) pullgr.Add(1)
go v.Pull() go v.Pull(pullreturn)
} }
pullgr.Wait() pullgr.Wait()