chg: [input] d4 redis, retry

d4forward
Jean-Louis Huynen 2020-04-08 16:21:22 +02:00
parent 2d3d71ec5b
commit ac5cd4449a
No known key found for this signature in database
GPG Key ID: 64799157F4BD6B93
1 changed files with 13 additions and 16 deletions

View File

@ -160,7 +160,6 @@ func main() {
s := make(chan os.Signal, 1)
signal.Notify(s, os.Interrupt, os.Kill)
c := make(chan string)
k := make(chan string)
d4.mhb = bytes.NewBuffer(d4.mh)
@ -180,7 +179,7 @@ func main() {
d4p.dst.restoreHeader()
}
// copy routine
go d4Copy(d4p, c, k)
go d4Copy(d4p, c)
} else if d4.retry > 0 {
go func() {
infof(fmt.Sprintf("Sleeping for %.f seconds before retry...\n", d4.retry.Seconds()))
@ -196,10 +195,8 @@ func main() {
select {
case str := <-c:
infof(str)
// log.Printf("Channel c: %q\n", str)
continue
case str := <-k:
fmt.Println(str)
exit(d4p, 1)
case <-s:
fmt.Println(" Exiting")
exit(d4p, 0)
@ -227,17 +224,14 @@ func set(d4 *d4S) bool {
return false
}
func d4Copy(d4 *d4S, c chan string, k chan string) {
func d4Copy(d4 *d4S, c chan string) {
nread, err := io.CopyBuffer(&d4.dst, d4.src, d4.dst.pb)
// Always retry
if err != nil {
if (d4.retry.Seconds()) > 0 {
c <- fmt.Sprintf("%s", err)
return
}
k <- fmt.Sprintf("%s", err)
c <- fmt.Sprintf("D4copy: %s", err)
return
}
k <- fmt.Sprintf("EOF: Nread: %d", nread)
c <- fmt.Sprintf("EOF: Nread: %d", nread)
return
}
@ -391,11 +385,14 @@ func setReaderWriters(d4 *d4S) bool {
var err error
(*d4).redisCon, err = (*d4).redisInputPool.Dial()
if err != nil {
log.Fatal("Could not connect to d4 Redis")
log.Println("Could not connect to d4 Redis")
return false
}
(*d4).src, err = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, int(time.Second*(*d4).retry))
if err != nil {
log.Printf("Could not create d4 Redis Descriptor %q \n", err)
return false
}
// (*d4).src = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, *retry)
// HARDCODING FOR THE MOMENT
(*d4).src = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, 30)
}
isn, dstnet := config.IsNet((*d4).conf.destination)
if isn {