diff --git a/conf.sample/destination b/conf.sample/destination index a92f867..faa3a15 100644 --- a/conf.sample/destination +++ b/conf.sample/destination @@ -1 +1 @@ -0.0.0.0:4443 +stdout diff --git a/conf.sample/redis_d4 b/conf.sample/redis_d4 new file mode 100644 index 0000000..81a485b --- /dev/null +++ b/conf.sample/redis_d4 @@ -0,0 +1 @@ +localhost:6385/2 diff --git a/conf.sample/redis_queue b/conf.sample/redis_queue new file mode 100644 index 0000000..b7bb6f8 --- /dev/null +++ b/conf.sample/redis_queue @@ -0,0 +1 @@ +analyzer:3:d42967c1-f7ad-464e-bbc7-4464c653d7a6 diff --git a/conf.sample/source b/conf.sample/source index cf52303..378848c 100644 --- a/conf.sample/source +++ b/conf.sample/source @@ -1 +1 @@ -stdin +d4server diff --git a/conf.sample/uuid b/conf.sample/uuid index e69de29..fa43ee4 100644 --- a/conf.sample/uuid +++ b/conf.sample/uuid @@ -0,0 +1 @@ +1d940331-9fc9-4381-8fc9-3b624db66025 diff --git a/d4-goclient.go b/d4-goclient.go index 96645cd..7c2840d 100644 --- a/d4-goclient.go +++ b/d4-goclient.go @@ -22,6 +22,9 @@ import ( config "github.com/D4-project/d4-golang-utils/config" uuid "github.com/D4-project/d4-golang-utils/crypto/hash" + "github.com/D4-project/d4-golang-utils/inputreader" + _ "github.com/D4-project/d4-golang-utils/inputreader" + "github.com/gomodule/redigo/redis" ) const ( @@ -56,21 +59,24 @@ type ( } d4S struct { - src io.Reader - dst d4Writer - confdir string - cka time.Duration - ct time.Duration - ce bool - retry time.Duration - cc bool - ca x509.CertPool - d4error uint8 - errnoCopy uint8 - debug bool - conf d4params - mhb *bytes.Buffer - mh []byte + src io.Reader + dst d4Writer + confdir string + cka time.Duration + ct time.Duration + ce bool + retry time.Duration + rate time.Duration + cc bool + ca x509.CertPool + d4error uint8 + errnoCopy uint8 + debug bool + conf d4params + mhb *bytes.Buffer + mh []byte + redisInputPool *redis.Pool + redisCon redis.Conn } d4params struct { @@ -81,27 +87,34 @@ type ( source string destination string ttype uint8 + redisHost string + redisPort string + redisQueue string + redisDB int } ) var ( - // verbose + // Verbose mode and logging buf bytes.Buffer logger = log.New(&buf, "INFO: ", log.Lshortfile) - infof = func(info string) { - logger.Output(2, info) + debugger = log.New(&buf, "DEBUG: ", log.Lmicroseconds) + debugf = func(debug string) { + debugger.Println("", debug) } tmpct, _ = time.ParseDuration("5mn") tmpcka, _ = time.ParseDuration("30s") tmpretry, _ = time.ParseDuration("30s") + tmprate, _ = time.ParseDuration("200ms") confdir = flag.String("c", "", "configuration directory") - debug = flag.Bool("v", false, "Set to True, true, TRUE, 1, or t to enable verbose output on stdout") + debug = flag.Bool("v", false, "Set to True, true, TRUE, 1, or t to enable verbose output on stdout - Don't use in production") ce = flag.Bool("ce", true, "Set to True, true, TRUE, 1, or t to enable TLS on network destination") ct = flag.Duration("ct", tmpct, "Set timeout in human format") cka = flag.Duration("cka", tmpcka, "Keep Alive time human format, 0 to disable") - retry = flag.Duration("rt", tmpretry, "Time in human format before retry after connection failure, set to 0 to exit on failure") + retry = flag.Duration("rt", tmpretry, "Rime in human format before retry after connection failure, set to 0 to exit on failure") + rate = flag.Duration("rl", tmprate, "Rate limiter: time in human format before retry after EOF") cc = flag.Bool("cc", false, "Check TLS certificate against rootCA.crt") ) @@ -110,6 +123,15 @@ func main() { var d4 d4S d4p := &d4 + // Setting up log file + f, err := os.OpenFile("d4-goclient.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) + if err != nil { + log.Fatalf("error opening file: %v", err) + } + defer f.Close() + logger.SetOutput(f) + logger.Println("Init") + flag.Usage = func() { fmt.Printf("d4 - d4 client\n") fmt.Printf("Read data from the configured and send it to \n") @@ -147,11 +169,14 @@ func main() { d4.cc = *cc d4.cka = *cka d4.retry = *retry + d4.rate = *rate s := make(chan os.Signal, 1) signal.Notify(s, os.Interrupt, os.Kill) c := make(chan string) - k := make(chan string) + + // Launching the Rate limiter + ratelimiter := time.Tick(d4.rate) d4.mhb = bytes.NewBuffer(d4.mh) @@ -167,39 +192,36 @@ func main() { if err != nil { panic(fmt.Sprintf("Cannot initiate session %s", err)) } - infof(fmt.Sprintf("Meta-Header sent: %d bytes", nread)) + logger.Println(fmt.Sprintf("Meta-Header sent: %d bytes", nread)) d4p.dst.restoreHeader() } // copy routine - go d4Copy(d4p, c, k) + go d4Copy(d4p, c) + // Block until the rate limiter allow us to continue + <-ratelimiter } else if d4.retry > 0 { go func() { - infof(fmt.Sprintf("Sleeping for %.f seconds before retry...\n", d4.retry.Seconds())) - fmt.Printf("Sleeping for %.f seconds before retry...\n", d4.retry.Seconds()) + logger.Println(fmt.Sprintf("Sleeping for %.f seconds before retry...", d4.retry.Seconds())) time.Sleep(d4.retry) c <- "done waiting" }() } else { - panic("Unrecoverable error without retry.") + exit(d4p, 1) } // Block until we catch an event select { - case str := <-c: - infof(str) + case <-c: continue - case str := <-k: - fmt.Println(str) - exit(d4p, 1) case <-s: - fmt.Println(" Exiting") + logger.Println("Exiting") exit(d4p, 0) } } } func exit(d4 *d4S, exitcode int) { - // Output logging before closing if debug is enabled + // Output debug info in the log before closing if debug is enabled if *debug == true { (*d4).debug = true fmt.Print(&buf) @@ -218,17 +240,14 @@ func set(d4 *d4S) bool { return false } -func d4Copy(d4 *d4S, c chan string, k chan string) { +func d4Copy(d4 *d4S, c chan string) { nread, err := io.CopyBuffer(&d4.dst, d4.src, d4.dst.pb) + // Always retry if err != nil { - if (d4.retry.Seconds()) > 0 { - c <- fmt.Sprintf("%s", err) - return - } - k <- fmt.Sprintf("%s", err) + c <- fmt.Sprintf("D4copy: %s", err) return } - k <- fmt.Sprintf("EOF: Nread: %d", nread) + c <- fmt.Sprintf("EOF: Nread: %d", nread) return } @@ -243,6 +262,25 @@ func d4loadConfig(d4 *d4S) bool { if len((*d4).conf.source) < 1 { log.Fatal("Unsupported source") } + if (*d4).conf.source == "d4server" { + // Parse Input Redis Config + tmp := config.ReadConfigFile(*confdir, "redis_d4") + ss := strings.Split(string(tmp), "/") + if len(ss) <= 1 { + log.Fatal("Missing Database in Redis input config: should be host:port/database_name") + } + (*d4).conf.redisDB, _ = strconv.Atoi(ss[1]) + var ret bool + ret, ss[0] = config.IsNet(ss[0]) + if ret { + sss := strings.Split(string(ss[0]), ":") + (*d4).conf.redisHost = sss[0] + (*d4).conf.redisPort = sss[1] + } else { + log.Fatal("Redis config error.") + } + (*d4).conf.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue")) + } (*d4).conf.destination = string(readConfFile(d4, "destination")) if len((*d4).conf.destination) < 1 { log.Fatal("Unsupported Destination") @@ -301,7 +339,7 @@ func d4loadConfig(d4 *d4S) bool { panic(fmt.Sprintf("Cannot read Meta-Header file: %s", err)) } else { if err := json.Compact((*d4).mhb, data[:count]); err != nil { - infof("Failed to compact meta header file") + logger.Println("Failed to compact meta header file") } } } else { @@ -357,6 +395,20 @@ func setReaderWriters(d4 *d4S) bool { case "pcap": f, _ := os.Open("capture.pcap") (*d4).src = f + case "d4server": + // Create a new redis connection pool + (*d4).redisInputPool = newPool((*d4).conf.redisHost+":"+(*d4).conf.redisPort, 16) + var err error + (*d4).redisCon, err = (*d4).redisInputPool.Dial() + if err != nil { + logger.Println("Could not connect to d4 Redis") + return false + } + (*d4).src, err = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, int(time.Second*(*d4).retry)) + if err != nil { + log.Printf("Could not create d4 Redis Descriptor %q \n", err) + return false + } } isn, dstnet := config.IsNet((*d4).conf.destination) if isn { @@ -377,7 +429,7 @@ func setReaderWriters(d4 *d4S) bool { if (*d4).ce == true { conn, errc := tls.DialWithDialer(&dial, "tcp", dstnet, &tlsc) if errc != nil { - fmt.Println(errc) + logger.Println(errc) return false } (*d4).dst = newD4Writer(conn, (*d4).conf.key) @@ -412,7 +464,7 @@ func generateUUIDv4() []byte { if err != nil { log.Fatal(err) } - infof(fmt.Sprintf("UUIDv4: %s\n", uuid)) + logger.Println(fmt.Sprintf("UUIDv4: %s\n", uuid)) return uuid.Bytes() } @@ -467,8 +519,8 @@ func (d4w *d4Writer) initHeader(d4 *d4S) bool { // hmac is set to zero during hmac operations, so leave it alone // init size of payload at 0 binary.LittleEndian.PutUint32(d4w.fb[58:62], uint32(0)) - infof(fmt.Sprintf("Initialized a %d bytes header:\n", HDR_SIZE)) - infof(fmt.Sprintf("%b\n", d4w.fb[:HDR_SIZE])) + debugf(fmt.Sprintf("Initialized a %d bytes header:\n", HDR_SIZE)) + debugf(fmt.Sprintf("%b\n", d4w.fb[:HDR_SIZE])) return true } @@ -483,3 +535,13 @@ func (d4w *d4Writer) restoreHeader() bool { d4w.fb[1] = 254 return true } + +func newPool(addr string, maxconn int) *redis.Pool { + return &redis.Pool{ + MaxActive: maxconn, + MaxIdle: 3, + IdleTimeout: 240 * time.Second, + // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial. + Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) }, + } +} diff --git a/go.mod b/go.mod index ac8722a..e01dede 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/D4-project/d4-goclient go 1.13 -require github.com/D4-project/d4-golang-utils v0.0.0-20200212134721-0ef9aada6f2e +require ( + github.com/D4-project/d4-golang-utils v0.1.3-0.20200408055630-2fd1cdf3ea38 + github.com/gomodule/redigo v2.0.0+incompatible +) diff --git a/go.sum b/go.sum index dcaa161..6bef18a 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,8 @@ -github.com/D4-project/d4-golang-utils v0.0.0-20200212134721-0ef9aada6f2e h1:yYTt4RS3K8+L7PUnw7Y8sT4+YuE3Rny42rx8kMOgDmc= -github.com/D4-project/d4-golang-utils v0.0.0-20200212134721-0ef9aada6f2e/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc= +github.com/D4-project/d4-golang-utils v0.1.2 h1:aLdvwIR2CFvIn2FnqPjbHxzLeo3ZL7YEyhCXRL6a9kQ= +github.com/D4-project/d4-golang-utils v0.1.2/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc= +github.com/D4-project/d4-golang-utils v0.1.3-0.20200408055630-2fd1cdf3ea38 h1:jMryCP4eEDOuNBtqDBAIJ0MNuypVaGnCgDq6c8XXLEs= +github.com/D4-project/d4-golang-utils v0.1.3-0.20200408055630-2fd1cdf3ea38/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc= github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= +github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=