chg: [input] functional d4 redis input
parent
17aa026e2b
commit
17dfb9c22b
|
@ -1 +1 @@
|
|||
0.0.0.0:4443
|
||||
stdout
|
||||
|
|
|
@ -1 +1 @@
|
|||
stdin
|
||||
d4server
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
1d940331-9fc9-4381-8fc9-3b624db66025
|
|
@ -22,6 +22,9 @@ import (
|
|||
|
||||
config "github.com/D4-project/d4-golang-utils/config"
|
||||
uuid "github.com/D4-project/d4-golang-utils/crypto/hash"
|
||||
"github.com/D4-project/d4-golang-utils/inputreader"
|
||||
_ "github.com/D4-project/d4-golang-utils/inputreader"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -56,21 +59,23 @@ type (
|
|||
}
|
||||
|
||||
d4S struct {
|
||||
src io.Reader
|
||||
dst d4Writer
|
||||
confdir string
|
||||
cka time.Duration
|
||||
ct time.Duration
|
||||
ce bool
|
||||
retry time.Duration
|
||||
cc bool
|
||||
ca x509.CertPool
|
||||
d4error uint8
|
||||
errnoCopy uint8
|
||||
debug bool
|
||||
conf d4params
|
||||
mhb *bytes.Buffer
|
||||
mh []byte
|
||||
src io.Reader
|
||||
dst d4Writer
|
||||
confdir string
|
||||
cka time.Duration
|
||||
ct time.Duration
|
||||
ce bool
|
||||
retry time.Duration
|
||||
cc bool
|
||||
ca x509.CertPool
|
||||
d4error uint8
|
||||
errnoCopy uint8
|
||||
debug bool
|
||||
conf d4params
|
||||
mhb *bytes.Buffer
|
||||
mh []byte
|
||||
redisInputPool *redis.Pool
|
||||
redisCon redis.Conn
|
||||
}
|
||||
|
||||
d4params struct {
|
||||
|
@ -81,6 +86,10 @@ type (
|
|||
source string
|
||||
destination string
|
||||
ttype uint8
|
||||
redisHost string
|
||||
redisPort string
|
||||
redisQueue string
|
||||
redisDB int
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -243,6 +252,25 @@ func d4loadConfig(d4 *d4S) bool {
|
|||
if len((*d4).conf.source) < 1 {
|
||||
log.Fatal("Unsupported source")
|
||||
}
|
||||
if (*d4).conf.source == "d4server" {
|
||||
// Parse Input Redis Config
|
||||
tmp := config.ReadConfigFile(*confdir, "redis_d4")
|
||||
ss := strings.Split(string(tmp), "/")
|
||||
if len(ss) <= 1 {
|
||||
log.Fatal("Missing Database in Redis input config: should be host:port/database_name")
|
||||
}
|
||||
(*d4).conf.redisDB, _ = strconv.Atoi(ss[1])
|
||||
var ret bool
|
||||
ret, ss[0] = config.IsNet(ss[0])
|
||||
if ret {
|
||||
sss := strings.Split(string(ss[0]), ":")
|
||||
(*d4).conf.redisHost = sss[0]
|
||||
(*d4).conf.redisPort = sss[1]
|
||||
} else {
|
||||
log.Fatal("Redis config error.")
|
||||
}
|
||||
(*d4).conf.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue"))
|
||||
}
|
||||
(*d4).conf.destination = string(readConfFile(d4, "destination"))
|
||||
if len((*d4).conf.destination) < 1 {
|
||||
log.Fatal("Unsupported Destination")
|
||||
|
@ -357,6 +385,17 @@ func setReaderWriters(d4 *d4S) bool {
|
|||
case "pcap":
|
||||
f, _ := os.Open("capture.pcap")
|
||||
(*d4).src = f
|
||||
case "d4server":
|
||||
// Create a new redis connection pool
|
||||
(*d4).redisInputPool = newPool((*d4).conf.redisHost+":"+(*d4).conf.redisPort, 16)
|
||||
var err error
|
||||
(*d4).redisCon, err = (*d4).redisInputPool.Dial()
|
||||
if err != nil {
|
||||
log.Fatal("Could not connect to d4 Redis")
|
||||
}
|
||||
// (*d4).src = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, *retry)
|
||||
// HARDCODING FOR THE MOMENT
|
||||
(*d4).src = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue, 30)
|
||||
}
|
||||
isn, dstnet := config.IsNet((*d4).conf.destination)
|
||||
if isn {
|
||||
|
@ -483,3 +522,13 @@ func (d4w *d4Writer) restoreHeader() bool {
|
|||
d4w.fb[1] = 254
|
||||
return true
|
||||
}
|
||||
|
||||
func newPool(addr string, maxconn int) *redis.Pool {
|
||||
return &redis.Pool{
|
||||
MaxActive: maxconn,
|
||||
MaxIdle: 3,
|
||||
IdleTimeout: 240 * time.Second,
|
||||
// Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
|
||||
Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
|
||||
}
|
||||
}
|
||||
|
|
5
go.mod
5
go.mod
|
@ -2,4 +2,7 @@ module github.com/D4-project/d4-goclient
|
|||
|
||||
go 1.13
|
||||
|
||||
require github.com/D4-project/d4-golang-utils v0.0.0-20200212134721-0ef9aada6f2e
|
||||
require (
|
||||
github.com/D4-project/d4-golang-utils v0.1.3-0.20200408055630-2fd1cdf3ea38
|
||||
github.com/gomodule/redigo v2.0.0+incompatible
|
||||
)
|
||||
|
|
8
go.sum
8
go.sum
|
@ -1,4 +1,8 @@
|
|||
github.com/D4-project/d4-golang-utils v0.0.0-20200212134721-0ef9aada6f2e h1:yYTt4RS3K8+L7PUnw7Y8sT4+YuE3Rny42rx8kMOgDmc=
|
||||
github.com/D4-project/d4-golang-utils v0.0.0-20200212134721-0ef9aada6f2e/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc=
|
||||
github.com/D4-project/d4-golang-utils v0.1.2 h1:aLdvwIR2CFvIn2FnqPjbHxzLeo3ZL7YEyhCXRL6a9kQ=
|
||||
github.com/D4-project/d4-golang-utils v0.1.2/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc=
|
||||
github.com/D4-project/d4-golang-utils v0.1.3-0.20200408055630-2fd1cdf3ea38 h1:jMryCP4eEDOuNBtqDBAIJ0MNuypVaGnCgDq6c8XXLEs=
|
||||
github.com/D4-project/d4-golang-utils v0.1.3-0.20200408055630-2fd1cdf3ea38/go.mod h1:2rq8KBQnNNDocwc/49cnpaqoQA/komoSHKom7ynvqJc=
|
||||
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
|
||||
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
|
||||
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
|
||||
|
|
Loading…
Reference in New Issue