Merge pull request #11 from D4-project/d4forward

add: [forwardredis] forward d4 redis queue to another d4 server
pull/15/head
Jean-Louis Huynen 2020-04-24 11:23:10 +02:00 committed by GitHub
commit 2624114144
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 122 additions and 50 deletions

View File

@ -1 +1 @@
0.0.0.0:4443 stdout

1
conf.sample/redis_d4 Normal file
View File

@ -0,0 +1 @@
localhost:6385/2

1
conf.sample/redis_queue Normal file
View File

@ -0,0 +1 @@
analyzer:3:d42967c1-f7ad-464e-bbc7-4464c653d7a6

View File

@ -1 +1 @@
stdin d4server

View File

@ -0,0 +1 @@
1d940331-9fc9-4381-8fc9-3b624db66025

View File

@ -22,6 +22,9 @@ import (
config "github.com/D4-project/d4-golang-utils/config" config "github.com/D4-project/d4-golang-utils/config"
uuid "github.com/D4-project/d4-golang-utils/crypto/hash" uuid "github.com/D4-project/d4-golang-utils/crypto/hash"
"github.com/D4-project/d4-golang-utils/inputreader"
_ "github.com/D4-project/d4-golang-utils/inputreader"
"github.com/gomodule/redigo/redis"
) )
const ( const (
@ -63,6 +66,7 @@ type (
ct time.Duration ct time.Duration
ce bool ce bool
retry time.Duration retry time.Duration
rate time.Duration
cc bool cc bool
ca x509.CertPool ca x509.CertPool
d4error uint8 d4error uint8
@ -71,6 +75,8 @@ type (
conf d4params conf d4params
mhb *bytes.Buffer mhb *bytes.Buffer
mh []byte mh []byte
redisInputPool *redis.Pool
redisCon redis.Conn
} }
d4params struct { d4params struct {
@ -81,27 +87,34 @@ type (
source string source string
destination string destination string
ttype uint8 ttype uint8
redisHost string
redisPort string
redisQueue string
redisDB int
} }
) )
var ( var (
// verbose // Verbose mode and logging
buf bytes.Buffer buf bytes.Buffer
logger = log.New(&buf, "INFO: ", log.Lshortfile) logger = log.New(&buf, "INFO: ", log.Lshortfile)
infof = func(info string) { debugger = log.New(&buf, "DEBUG: ", log.Lmicroseconds)
logger.Output(2, info) debugf = func(debug string) {
debugger.Println("", debug)
} }
tmpct, _ = time.ParseDuration("5mn") tmpct, _ = time.ParseDuration("5mn")
tmpcka, _ = time.ParseDuration("30s") tmpcka, _ = time.ParseDuration("30s")
tmpretry, _ = time.ParseDuration("30s") tmpretry, _ = time.ParseDuration("30s")
tmprate, _ = time.ParseDuration("200ms")
confdir = flag.String("c", "", "configuration directory") confdir = flag.String("c", "", "configuration directory")
debug = flag.Bool("v", false, "Set to True, true, TRUE, 1, or t to enable verbose output on stdout") debug = flag.Bool("v", false, "Set to True, true, TRUE, 1, or t to enable verbose output on stdout - Don't use in production")
ce = flag.Bool("ce", true, "Set to True, true, TRUE, 1, or t to enable TLS on network destination") ce = flag.Bool("ce", true, "Set to True, true, TRUE, 1, or t to enable TLS on network destination")
ct = flag.Duration("ct", tmpct, "Set timeout in human format") ct = flag.Duration("ct", tmpct, "Set timeout in human format")
cka = flag.Duration("cka", tmpcka, "Keep Alive time human format, 0 to disable") cka = flag.Duration("cka", tmpcka, "Keep Alive time human format, 0 to disable")
retry = flag.Duration("rt", tmpretry, "Time in human format before retry after connection failure, set to 0 to exit on failure") retry = flag.Duration("rt", tmpretry, "Rime in human format before retry after connection failure, set to 0 to exit on failure")
rate = flag.Duration("rl", tmprate, "Rate limiter: time in human format before retry after EOF")
cc = flag.Bool("cc", false, "Check TLS certificate against rootCA.crt") cc = flag.Bool("cc", false, "Check TLS certificate against rootCA.crt")
) )
@ -110,6 +123,15 @@ func main() {
var d4 d4S var d4 d4S
d4p := &d4 d4p := &d4
// Setting up log file
f, err := os.OpenFile("d4-goclient.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("error opening file: %v", err)
}
defer f.Close()
logger.SetOutput(f)
logger.Println("Init")
flag.Usage = func() { flag.Usage = func() {
fmt.Printf("d4 - d4 client\n") fmt.Printf("d4 - d4 client\n")
fmt.Printf("Read data from the configured <source> and send it to <destination>\n") fmt.Printf("Read data from the configured <source> and send it to <destination>\n")
@ -147,11 +169,14 @@ func main() {
d4.cc = *cc d4.cc = *cc
d4.cka = *cka d4.cka = *cka
d4.retry = *retry d4.retry = *retry
d4.rate = *rate
s := make(chan os.Signal, 1) s := make(chan os.Signal, 1)
signal.Notify(s, os.Interrupt, os.Kill) signal.Notify(s, os.Interrupt, os.Kill)
c := make(chan string) c := make(chan string)
k := make(chan string)
// Launching the Rate limiter
ratelimiter := time.Tick(d4.rate)
d4.mhb = bytes.NewBuffer(d4.mh) d4.mhb = bytes.NewBuffer(d4.mh)
@ -167,39 +192,36 @@ func main() {
if err != nil { if err != nil {
panic(fmt.Sprintf("Cannot initiate session %s", err)) panic(fmt.Sprintf("Cannot initiate session %s", err))
} }
infof(fmt.Sprintf("Meta-Header sent: %d bytes", nread)) logger.Println(fmt.Sprintf("Meta-Header sent: %d bytes", nread))
d4p.dst.restoreHeader() d4p.dst.restoreHeader()
} }
// copy routine // copy routine
go d4Copy(d4p, c, k) go d4Copy(d4p, c)
// Block until the rate limiter allow us to continue
<-ratelimiter
} else if d4.retry > 0 { } else if d4.retry > 0 {
go func() { go func() {
infof(fmt.Sprintf("Sleeping for %.f seconds before retry...\n", d4.retry.Seconds())) logger.Println(fmt.Sprintf("Sleeping for %.f seconds before retry...", d4.retry.Seconds()))
fmt.Printf("Sleeping for %.f seconds before retry...\n", d4.retry.Seconds())
time.Sleep(d4.retry) time.Sleep(d4.retry)
c <- "done waiting" c <- "done waiting"
}() }()
} else { } else {
panic("Unrecoverable error without retry.") exit(d4p, 1)
} }
// Block until we catch an event // Block until we catch an event
select { select {
case str := <-c: case <-c:
infof(str)
continue continue
case str := <-k:
fmt.Println(str)
exit(d4p, 1)
case <-s: case <-s:
fmt.Println(" Exiting") logger.Println("Exiting")
exit(d4p, 0) exit(d4p, 0)
} }
} }
} }
func exit(d4 *d4S, exitcode int) { func exit(d4 *d4S, exitcode int) {
// Output logging before closing if debug is enabled // Output debug info in the log before closing if debug is enabled
if *debug == true { if *debug == true {
(*d4).debug = true (*d4).debug = true
fmt.Print(&buf) fmt.Print(&buf)
@ -218,17 +240,14 @@ func set(d4 *d4S) bool {
return false return false
} }
func d4Copy(d4 *d4S, c chan string, k chan string) { func d4Copy(d4 *d4S, c chan string) {
nread, err := io.CopyBuffer(&d4.dst, d4.src, d4.dst.pb) nread, err := io.CopyBuffer(&d4.dst, d4.src, d4.dst.pb)
// Always retry
if err != nil { if err != nil {
if (d4.retry.Seconds()) > 0 { c <- fmt.Sprintf("D4copy: %s", err)
c <- fmt.Sprintf("%s", err)
return return
} }
k <- fmt.Sprintf("%s", err) c <- fmt.Sprintf("EOF: Nread: %d", nread)
return
}
k <- fmt.Sprintf("EOF: Nread: %d", nread)
return return
} }
@ -243,6 +262,25 @@ func d4loadConfig(d4 *d4S) bool {
if len((*d4).conf.source) < 1 { if len((*d4).conf.source) < 1 {
log.Fatal("Unsupported source") log.Fatal("Unsupported source")
} }
if (*d4).conf.source == "d4server" {
// Parse Input Redis Config
tmp := config.ReadConfigFile(*confdir, "redis_d4")
ss := strings.Split(string(tmp), "/")
if len(ss) <= 1 {
log.Fatal("Missing Database in Redis input config: should be host:port/database_name")
}
(*d4).conf.redisDB, _ = strconv.Atoi(ss[1])
var ret bool
ret, ss[0] = config.IsNet(ss[0])
if ret {
sss := strings.Split(string(ss[0]), ":")
(*d4).conf.redisHost = sss[0]
(*d4).conf.redisPort = sss[1]
} else {
log.Fatal("Redis config error.")
}
(*d4).conf.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue"))
}
(*d4).conf.destination = string(readConfFile(d4, "destination")) (*d4).conf.destination = string(readConfFile(d4, "destination"))
if len((*d4).conf.destination) < 1 { if len((*d4).conf.destination) < 1 {
log.Fatal("Unsupported Destination") log.Fatal("Unsupported Destination")
@ -301,7 +339,7 @@ func d4loadConfig(d4 *d4S) bool {
panic(fmt.Sprintf("Cannot read Meta-Header file: %s", err)) panic(fmt.Sprintf("Cannot read Meta-Header file: %s", err))
} else { } else {
if err := json.Compact((*d4).mhb, data[:count]); err != nil { if err := json.Compact((*d4).mhb, data[:count]); err != nil {
infof("Failed to compact meta header file") logger.Println("Failed to compact meta header file")
} }
} }
} else { } else {
@ -357,6 +395,20 @@ func setReaderWriters(d4 *d4S) bool {
case "pcap": case "pcap":
f, _ := os.Open("capture.pcap") f, _ := os.Open("capture.pcap")
(*d4).src = f (*d4).src = f
case "d4server":
// Create a new redis connection pool
(*d4).redisInputPool = newPool((*d4).conf.redisHost+":"+(*d4).conf.redisPort, 16)
var err error
(*d4).redisCon, err = (*d4).redisInputPool.Dial()
if err != nil {
logger.Println("Could not connect to d4 Redis")
return false
}
(*d4).src, err = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, int(time.Second*(*d4).retry))
if err != nil {
log.Printf("Could not create d4 Redis Descriptor %q \n", err)
return false
}
} }
isn, dstnet := config.IsNet((*d4).conf.destination) isn, dstnet := config.IsNet((*d4).conf.destination)
if isn { if isn {
@ -377,7 +429,7 @@ func setReaderWriters(d4 *d4S) bool {
if (*d4).ce == true { if (*d4).ce == true {
conn, errc := tls.DialWithDialer(&dial, "tcp", dstnet, &tlsc) conn, errc := tls.DialWithDialer(&dial, "tcp", dstnet, &tlsc)
if errc != nil { if errc != nil {
fmt.Println(errc) logger.Println(errc)
return false return false
} }
(*d4).dst = newD4Writer(conn, (*d4).conf.key) (*d4).dst = newD4Writer(conn, (*d4).conf.key)
@ -412,7 +464,7 @@ func generateUUIDv4() []byte {
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
infof(fmt.Sprintf("UUIDv4: %s\n", uuid)) logger.Println(fmt.Sprintf("UUIDv4: %s\n", uuid))
return uuid.Bytes() return uuid.Bytes()
} }
@ -467,8 +519,8 @@ func (d4w *d4Writer) initHeader(d4 *d4S) bool {
// hmac is set to zero during hmac operations, so leave it alone // hmac is set to zero during hmac operations, so leave it alone
// init size of payload at 0 // init size of payload at 0
binary.LittleEndian.PutUint32(d4w.fb[58:62], uint32(0)) binary.LittleEndian.PutUint32(d4w.fb[58:62], uint32(0))
infof(fmt.Sprintf("Initialized a %d bytes header:\n", HDR_SIZE)) debugf(fmt.Sprintf("Initialized a %d bytes header:\n", HDR_SIZE))
infof(fmt.Sprintf("%b\n", d4w.fb[:HDR_SIZE])) debugf(fmt.Sprintf("%b\n", d4w.fb[:HDR_SIZE]))
return true return true
} }
@ -483,3 +535,13 @@ func (d4w *d4Writer) restoreHeader() bool {
d4w.fb[1] = 254 d4w.fb[1] = 254
return true return true
} }
func newPool(addr string, maxconn int) *redis.Pool {
return &redis.Pool{
MaxActive: maxconn,
MaxIdle: 3,
IdleTimeout: 240 * time.Second,
// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
}
}

5
go.mod
View File

@ -2,4 +2,7 @@ module github.com/D4-project/d4-goclient
go 1.13 go 1.13
require github.com/D4-project/d4-golang-utils v0.0.0-20200212134721-0ef9aada6f2e require (
github.com/D4-project/d4-golang-utils v0.1.3-0.20200408055630-2fd1cdf3ea38
github.com/gomodule/redigo v2.0.0+incompatible
)

8
go.sum
View File

@ -1,4 +1,8 @@
github.com/D4-project/d4-golang-utils v0.0.0-20200212134721-0ef9aada6f2e h1:yYTt4RS3K8+L7PUnw7Y8sT4+YuE3Rny42rx8kMOgDmc= github.com/D4-project/d4-golang-utils v0.1.2 h1:aLdvwIR2CFvIn2FnqPjbHxzLeo3ZL7YEyhCXRL6a9kQ=
github.com/D4-project/d4-golang-utils v0.0.0-20200212134721-0ef9aada6f2e/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc= github.com/D4-project/d4-golang-utils v0.1.2/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc=
github.com/D4-project/d4-golang-utils v0.1.3-0.20200408055630-2fd1cdf3ea38 h1:jMryCP4eEDOuNBtqDBAIJ0MNuypVaGnCgDq6c8XXLEs=
github.com/D4-project/d4-golang-utils v0.1.3-0.20200408055630-2fd1cdf3ea38/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=