From ac5cd4449a3dcb3c83b7fa65cb531389d3f9fb74 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 8 Apr 2020 16:21:22 +0200 Subject: [PATCH] chg: [input] d4 redis, retry --- d4-goclient.go | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/d4-goclient.go b/d4-goclient.go index d33dbd2..26325dd 100644 --- a/d4-goclient.go +++ b/d4-goclient.go @@ -160,7 +160,6 @@ func main() { s := make(chan os.Signal, 1) signal.Notify(s, os.Interrupt, os.Kill) c := make(chan string) - k := make(chan string) d4.mhb = bytes.NewBuffer(d4.mh) @@ -180,7 +179,7 @@ func main() { d4p.dst.restoreHeader() } // copy routine - go d4Copy(d4p, c, k) + go d4Copy(d4p, c) } else if d4.retry > 0 { go func() { infof(fmt.Sprintf("Sleeping for %.f seconds before retry...\n", d4.retry.Seconds())) @@ -196,10 +195,8 @@ func main() { select { case str := <-c: infof(str) + // log.Printf("Channel c: %q\n", str) continue - case str := <-k: - fmt.Println(str) - exit(d4p, 1) case <-s: fmt.Println(" Exiting") exit(d4p, 0) @@ -227,17 +224,14 @@ func set(d4 *d4S) bool { return false } -func d4Copy(d4 *d4S, c chan string, k chan string) { +func d4Copy(d4 *d4S, c chan string) { nread, err := io.CopyBuffer(&d4.dst, d4.src, d4.dst.pb) + // Always retry if err != nil { - if (d4.retry.Seconds()) > 0 { - c <- fmt.Sprintf("%s", err) - return - } - k <- fmt.Sprintf("%s", err) + c <- fmt.Sprintf("D4copy: %s", err) return } - k <- fmt.Sprintf("EOF: Nread: %d", nread) + c <- fmt.Sprintf("EOF: Nread: %d", nread) return } @@ -391,11 +385,14 @@ func setReaderWriters(d4 *d4S) bool { var err error (*d4).redisCon, err = (*d4).redisInputPool.Dial() if err != nil { - log.Fatal("Could not connect to d4 Redis") + log.Println("Could not connect to d4 Redis") + return false + } + (*d4).src, err = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, int(time.Second*(*d4).retry)) + if err != nil { + log.Printf("Could not create d4 Redis Descriptor %q \n", err) + return false } - // (*d4).src = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, *retry) - // HARDCODING FOR THE MOMENT - (*d4).src = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, 30) } isn, dstnet := config.IsNet((*d4).conf.destination) if isn {