diff --git a/d4-goclient.go b/d4-goclient.go index f2eea1d..e65ab9f 100644 --- a/d4-goclient.go +++ b/d4-goclient.go @@ -18,6 +18,7 @@ import ( "os/signal" "strconv" "strings" + "syscall" "time" config "github.com/D4-project/d4-golang-utils/config" @@ -66,7 +67,7 @@ type ( ct time.Duration ce bool retry time.Duration - rate time.Duration + rate time.Duration cc bool ca x509.CertPool d4error uint8 @@ -96,17 +97,17 @@ type ( var ( // Verbose mode and logging - buf bytes.Buffer - logger = log.New(&buf, "INFO: ", log.Lshortfile) + buf bytes.Buffer + logger = log.New(&buf, "INFO: ", log.Lshortfile) debugger = log.New(&buf, "DEBUG: ", log.Lmicroseconds) - debugf = func(debug string) { + debugf = func(debug string) { debugger.Println("", debug) } tmpct, _ = time.ParseDuration("5mn") tmpcka, _ = time.ParseDuration("30s") tmpretry, _ = time.ParseDuration("30s") - tmprate, _ = time.ParseDuration("200ms") + tmprate, _ = time.ParseDuration("200ms") confdir = flag.String("c", "", "configuration directory") debug = flag.Bool("v", false, "Set to True, true, TRUE, 1, or t to enable verbose output on stdout - Don't use in production") @@ -114,7 +115,7 @@ var ( ct = flag.Duration("ct", tmpct, "Set timeout in human format") cka = flag.Duration("cka", tmpcka, "Keep Alive time human format, 0 to disable") retry = flag.Duration("rt", tmpretry, "Time in human format before retry after connection failure, set to 0 to exit on failure") - rate = flag.Duration("rl", tmprate, "Rate limiter: time in human format before retry after EOF") + rate = flag.Duration("rl", tmprate, "Rate limiter: time in human format before retry after EOF") cc = flag.Bool("cc", false, "Check TLS certificate against rootCA.crt") ) @@ -130,6 +131,7 @@ func main() { } defer f.Close() logger.SetOutput(f) + logger.SetFlags(log.LstdFlags | log.Lshortfile) logger.Println("Init") flag.Usage = func() { @@ -165,6 +167,7 @@ func main() { *confdir = strings.TrimSuffix(*confdir, "/") *confdir = strings.TrimSuffix(*confdir, "\\") } + d4.confdir = *confdir d4.ce = *ce d4.ct = *ct @@ -175,52 +178,121 @@ func main() { s := make(chan os.Signal, 1) signal.Notify(s, os.Interrupt, os.Kill) - c := make(chan string) + errchan := make(chan error) + eofchan := make(chan string) + metachan := make(chan string) - // Launching the Rate limiter - ratelimiter := time.Tick(d4.rate) + // Launching the Rate limiters + rateLimiter := time.Tick(d4.rate) + retryLimiter := time.Tick(d4.retry) - d4.mhb = bytes.NewBuffer(d4.mh) + //Setup + if !d4loadConfig(d4p) { + panic("Could not load Config.") + } + if !setReaderWriters(d4p, false) { + panic("Could not Init Inputs Outputs.") + } + if !d4.dst.initHeader(d4p) { + panic("Could not Init Headers.") + } - for { - // init or reinit after retry - if set(d4p) { - // type 254 requires to send a meta-header first - if d4.conf.ttype == 254 || d4.conf.ttype == 2 { - // create a jsonreader - d4p.dst.hijackHeader() - // Ugly hack to skip bytes.Buffer WriteTo check that bypasses my fixed lenght buffer - nread, err := io.CopyBuffer(&d4.dst, struct{ io.Reader }{d4.mhb}, d4.dst.pb) - if err != nil { - panic(fmt.Sprintf("Cannot initiate session %s", err)) - } - logger.Println(fmt.Sprintf("Meta-Header sent: %d bytes", nread)) - d4p.dst.restoreHeader() - } - // copy routine - go d4Copy(d4p, c) - // Block until the rate limiter allow us to continue + // force is a flag that forces the creation of a new connection + force := false + + // On the first run, we send d4 meta header for type 2/254 + if d4.conf.ttype == 254 || d4.conf.ttype == 2 { + go sendMeta(d4p, errchan, metachan) + H: + for { select { - case <-ratelimiter: - continue - case <-s: - logger.Println("Exiting") - exit(d4p, 0) + case <-errchan: + select { + case <-retryLimiter: + go sendMeta(d4p, errchan, metachan) + case <-s: + logger.Println("Exiting") + exit(d4p, 0) + } + case <-metachan: + break H } - } else if d4.retry > 0 { - go func() { - logger.Println(fmt.Sprintf("Sleeping for %.f seconds before retry...", d4.retry.Seconds())) - time.Sleep(d4.retry) - c <- "done waiting" - }() - } else { - exit(d4p, 1) } + } - // Block until we catch an event + // Launch copy routine + go d4Copy(d4p, errchan, eofchan) + + // Handle signals + for { select { - case <-c: - continue + // Case where the input ran out of data to consume1 + case <-eofchan: + // We wait for ratelimiter before polling again + EOF: + for { + select { + case <-rateLimiter: + // copy routine + go d4Copy(d4p, errchan, eofchan) + break EOF + // Exit signal + case <-s: + logger.Println("Exiting") + exit(d4p, 0) + } + } + // ERROR, we check first whether it is network related + case err := <-errchan: + // On connection errors, we force setReaderWriter to reset the connection + force = false + switch t := err.(type) { + case *net.OpError: + force = true + if t.Op == "dial" { + logger.Println("Unknown Host") + } else if t.Op == "read" { + logger.Println("Connection Refused") + } else if t.Op == "write" { + logger.Println("Write error") + } + case syscall.Errno: + if t == syscall.ECONNREFUSED { + force = true + logger.Println("Connection Refused") + } + } + // We wait for retryLimiter before writing again + RETRY: + for { + select { + case <-retryLimiter: + if !setReaderWriters(d4p, force) { + // Can't connect, we break to retry + // force is still true + break + } + if !d4.dst.initHeader(d4p) { + panic("Could not Init Headers.") + } + if (d4.conf.ttype == 254 || d4.conf.ttype == 2) && force { + // setReaderWriter is happy, we should have a working + // connection from now on. + force = false + // Sending meta header for the first time on this new connection + go sendMeta(d4p, errchan, metachan) + } + break RETRY + // Exit signal + case <-s: + logger.Println("Exiting") + exit(d4p, 0) + } + } + // metaheader sent, launch the copy routine + case <-metachan: + go d4Copy(d4p, errchan, eofchan) + // Exit signal case <-s: logger.Println("Exiting") exit(d4p, 0) @@ -237,25 +309,31 @@ func exit(d4 *d4S, exitcode int) { os.Exit(exitcode) } -func set(d4 *d4S) bool { - if d4loadConfig(d4) { - if setReaderWriters(d4) { - if d4.dst.initHeader(d4) { - return true - } - } - } - return false -} - -func d4Copy(d4 *d4S, c chan string) { +func d4Copy(d4 *d4S, errchan chan error, eofchan chan string) { nread, err := io.CopyBuffer(&d4.dst, d4.src, d4.dst.pb) // Always retry if err != nil { - c <- fmt.Sprintf("D4copy: %s", err) + logger.Printf("D4copy: %s", err) + errchan <- err return } - c <- fmt.Sprintf("EOF: Nread: %d", nread) + eofchan <- fmt.Sprintf("EOF: Nread: %d", nread) +} + +func sendMeta(d4 *d4S, errchan chan error, metachan chan string) { + // Fill metaheader buffer with metaheader data + d4.mhb = bytes.NewBuffer(d4.mh) + d4.dst.hijackHeader() + // Ugly hack to skip bytes.Buffer WriteTo check that bypasses my fixed lenght buffer + nread, err := io.CopyBuffer(&d4.dst, struct{ io.Reader }{d4.mhb}, d4.dst.pb) + if err != nil { + logger.Printf("Cannot sent meta-header: %s", err) + errchan <- err + return + } + logger.Println(fmt.Sprintf("Meta-Header sent: %d bytes", nread)) + d4.dst.restoreHeader() + metachan <- "Header Sent" return } @@ -346,9 +424,13 @@ func d4loadConfig(d4 *d4S) bool { if off, err := file.Seek(0, 0); err != nil || off != 0 { panic(fmt.Sprintf("Cannot read Meta-Header file: %s", err)) } else { + // create metaheader buffer + d4.mhb = bytes.NewBuffer(d4.mh) if err := json.Compact((*d4).mhb, data[:count]); err != nil { logger.Println("Failed to compact meta header file") } + // Store the metaheader in d4 struct for subsequent retries + (*d4).mh = data[:count] } } else { panic("A Meta-Header File should at least contain a 'type' field.") @@ -395,7 +477,7 @@ func newD4Writer(writer io.Writer, key []byte) d4Writer { } // TODO QUICK IMPLEM, REVISE -func setReaderWriters(d4 *d4S) bool { +func setReaderWriters(d4 *d4S, force bool) bool { //TODO implement other destination file, fifo unix_socket ... switch (*d4).conf.source { case "stdin": @@ -420,9 +502,12 @@ func setReaderWriters(d4 *d4S) bool { } isn, dstnet := config.IsNet((*d4).conf.destination) if isn { - // We test whether a usable connection already exist + // We test whether a connection already exist // (case where the reader run out of data) - if _, ok := (*d4).dst.w.(net.Conn); !ok { + // force forces to reset the connections after + // failure to reuse it + if _, ok := (*d4).dst.w.(net.Conn); !ok || force { + //fmt.Println("Creating a new connection") // We need a connection dial := net.Dialer{ Timeout: (*d4).ct, @@ -452,39 +537,6 @@ func setReaderWriters(d4 *d4S) bool { } (*d4).dst = newD4Writer(conn, (*d4).conf.key) } - }else{ - // The connection can be reused, there is nothing to do - // Except for type 2 - we tear down the old connection, - (*d4).dst.w.(net.Conn).Close() - // and bring up a new one. - dial := net.Dialer{ - Timeout: (*d4).ct, - KeepAlive: (*d4).cka, - FallbackDelay: 0, - } - tlsc := tls.Config{ - InsecureSkipVerify: true, - } - if (*d4).cc { - tlsc = tls.Config{ - InsecureSkipVerify: false, - RootCAs: &(*d4).ca, - } - } - if (*d4).ce == true { - conn, errc := tls.DialWithDialer(&dial, "tcp", dstnet, &tlsc) - if errc != nil { - logger.Println(errc) - return false - } - (*d4).dst = newD4Writer(conn, (*d4).conf.key) - } else { - conn, errc := dial.Dial("tcp", dstnet) - if errc != nil { - return false - } - (*d4).dst = newD4Writer(conn, (*d4).conf.key) - } } } else { switch (*d4).conf.destination { diff --git a/go.mod b/go.mod index 5faa39e..781c8dd 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,5 @@ go 1.13 require ( github.com/D4-project/d4-golang-utils v0.1.5 github.com/gomodule/redigo v2.0.0+incompatible + gonum.org/v1/plot v0.7.0 // indirect ) diff --git a/go.sum b/go.sum index 1f1f174..ca3748a 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,27 @@ +github.com/D4-project/analyzer-d4-log v0.1.3 h1:1bkwwI6zFORiLe5DpFmzlzU62DZK6ToGm3G+F2VA5bk= github.com/D4-project/d4-golang-utils v0.1.3 h1:JrGoQ3Va4SwGl8Pjnb2bK/wxdFaGsjulAmp0bxZO7jg= github.com/D4-project/d4-golang-utils v0.1.3/go.mod h1:GGR5KMhvABZtIfmS5jZkwQnBoP+9/V0ZEETSGiWLaM4= github.com/D4-project/d4-golang-utils v0.1.4/go.mod h1:GGR5KMhvABZtIfmS5jZkwQnBoP+9/V0ZEETSGiWLaM4= github.com/D4-project/d4-golang-utils v0.1.5 h1:0aL2gv0uc56Gn2NwQY8L2C6OQRdq0LpoioAeLWs6zZc= github.com/D4-project/d4-golang-utils v0.1.5/go.mod h1:GGR5KMhvABZtIfmS5jZkwQnBoP+9/V0ZEETSGiWLaM4= +github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af h1:wVe6/Ea46ZMeNkQjjBW6xcqyQA/j5e0D6GytH95g0gQ= +github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= +github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8= +github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= +github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= +github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5 h1:PJr+ZMXIecYc1Ey2zucXdR73SMBtgjPgwa31099IMv0= +github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= +golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81 h1:00VmoueYNlNz/aHIilyyQz/MHSqGoWJzpFv/HW8xpzI= +golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= +golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/plot v0.7.0 h1:Otpxyvra6Ie07ft50OX5BrCfS/BWEMvhsCUHwPEJmLI= +gonum.org/v1/plot v0.7.0/go.mod h1:2wtU6YrrdQAhAF9+MTd5tOQjrov/zF70b1i99Npjvgo= +rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4=