You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

702 lines
19 KiB

  1. package main
  2. import (
  3. "bytes"
  4. "crypto/hmac"
  5. "crypto/sha256"
  6. "crypto/tls"
  7. "crypto/x509"
  8. "encoding/binary"
  9. "encoding/json"
  10. "flag"
  11. "fmt"
  12. "golang.org/x/net/proxy"
  13. "io"
  14. "io/ioutil"
  15. "log"
  16. "net"
  17. "os"
  18. "os/signal"
  19. "strconv"
  20. "strings"
  21. "syscall"
  22. "time"
  23. config "github.com/D4-project/d4-golang-utils/config"
  24. uuid "github.com/D4-project/d4-golang-utils/crypto/hash"
  25. "github.com/D4-project/d4-golang-utils/inputreader"
  26. "github.com/gomodule/redigo/redis"
  27. )
  28. const (
  29. // VERSION_SIZE
  30. VERSION_SIZE = 1
  31. // TYPE_SIZE
  32. TYPE_SIZE = 1
  33. // UUID_SIZE
  34. UUID_SIZE = 16
  35. // TIMESTAMP_SIZE
  36. TIMESTAMP_SIZE = 8
  37. // HMAC_SIZE
  38. HMAC_SIZE = 32
  39. // SSIZE payload size size
  40. SSIZE = 4
  41. // HDR_SIZE total header size
  42. HDR_SIZE = VERSION_SIZE + TYPE_SIZE + UUID_SIZE + HMAC_SIZE + TIMESTAMP_SIZE + SSIZE
  43. // MH_FILE_LIMIT defines in bytes the max size of the json meta header file
  44. MH_FILE_LIMIT = 100000
  45. )
  46. type (
  47. // A d4 writer implements the io.Writer Interface by implementing Write() and Close()
  48. // it accepts an io.Writer as sink
  49. d4Writer struct {
  50. w io.Writer
  51. key []byte
  52. fb []byte
  53. pb []byte
  54. }
  55. d4S struct {
  56. src io.Reader
  57. dst d4Writer
  58. confdir string
  59. cka time.Duration
  60. ct time.Duration
  61. ce bool
  62. retry time.Duration
  63. rate time.Duration
  64. cc bool
  65. tor bool
  66. daily bool
  67. json bool
  68. ca x509.CertPool
  69. d4error uint8
  70. errnoCopy uint8
  71. debug bool
  72. conf d4params
  73. mhb *bytes.Buffer
  74. mh []byte
  75. redisInputPool *redis.Pool
  76. redisCon redis.Conn
  77. }
  78. d4params struct {
  79. uuid []byte
  80. snaplen uint32
  81. key []byte
  82. version uint8
  83. source string
  84. destination string
  85. ttype uint8
  86. redisHost string
  87. redisPort string
  88. redisQueue string
  89. redisDB int
  90. folderstr string
  91. }
  92. )
  93. var (
  94. // Verbose mode and logging
  95. buf bytes.Buffer
  96. logger = log.New(&buf, "INFO: ", log.Lshortfile)
  97. debugger = log.New(&buf, "DEBUG: ", log.Lmicroseconds)
  98. debugf = func(debug string) {
  99. debugger.Println("", debug)
  100. }
  101. tmpct, _ = time.ParseDuration("5mn")
  102. tmpcka, _ = time.ParseDuration("30s")
  103. tmpretry, _ = time.ParseDuration("30s")
  104. tmprate, _ = time.ParseDuration("200ms")
  105. confdir = flag.String("c", "", "configuration directory")
  106. debug = flag.Bool("v", false, "Set to True, true, TRUE, 1, or t to enable verbose output on stdout - Don't use in production")
  107. ce = flag.Bool("ce", true, "Set to True, true, TRUE, 1, or t to enable TLS on network destination")
  108. ct = flag.Duration("ct", tmpct, "Set timeout in human format")
  109. cka = flag.Duration("cka", tmpcka, "Keep Alive time human format, 0 to disable")
  110. retry = flag.Duration("rt", tmpretry, "Time in human format before retry after connection failure, set to 0 to exit on failure")
  111. rate = flag.Duration("rl", tmprate, "Rate limiter: time in human format before retry after EOF")
  112. cc = flag.Bool("cc", false, "Check TLS certificate against rootCA.crt")
  113. torflag = flag.Bool("tor", false, "Use a SOCKS5 tor proxy on 9050")
  114. dailyflag = flag.Bool("daily", false, "Sets up filewatcher to watch a new %Y%M%D folder at midnight")
  115. jsonflag = flag.Bool("json", false, "The files watched are json files")
  116. )
  117. func main() {
  118. var d4 d4S
  119. d4p := &d4
  120. // Setting up log file
  121. f, err := os.OpenFile("d4-goclient.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  122. if err != nil {
  123. log.Fatalf("error opening file: %v", err)
  124. }
  125. defer f.Close()
  126. logger.SetOutput(f)
  127. logger.SetFlags(log.LstdFlags | log.Lshortfile)
  128. logger.Println("Init")
  129. flag.Usage = func() {
  130. fmt.Printf("d4 - d4 client\n")
  131. fmt.Printf("Read data from the configured <source> and send it to <destination>\n")
  132. fmt.Printf("\n")
  133. fmt.Printf("Usage: d4 -c config_directory\n")
  134. fmt.Printf("\n")
  135. fmt.Printf("Configuration\n\n")
  136. fmt.Printf("The configuration settings are stored in files in the configuration directory\n")
  137. fmt.Printf("specified with the -c command line switch.\n\n")
  138. fmt.Printf("Files in the configuration directory\n")
  139. fmt.Printf("\n")
  140. fmt.Printf("key - is the private HMAC-SHA-256-128 key.\n")
  141. fmt.Printf(" The HMAC is computed on the header with a HMAC value set to 0\n")
  142. fmt.Printf(" which is updated later.\n")
  143. fmt.Printf("snaplen - the length of bytes that is read from the <source>\n")
  144. fmt.Printf("version - the version of the d4 client\n")
  145. fmt.Printf("type - the type of data that is send. pcap, netflow, ...\n")
  146. fmt.Printf("source - the source where the data is read from\n")
  147. fmt.Printf("destination - the destination where the data is written to\n")
  148. fmt.Printf("redis_d4 - location of redis d4 server\n")
  149. fmt.Printf("redis_queue - analyzer:type:queueuuid to pop\n")
  150. fmt.Printf("\n")
  151. flag.PrintDefaults()
  152. }
  153. flag.Parse()
  154. if flag.NFlag() == 0 || *confdir == "" {
  155. flag.Usage()
  156. os.Exit(1)
  157. } else {
  158. *confdir = strings.TrimSuffix(*confdir, "/")
  159. *confdir = strings.TrimSuffix(*confdir, "\\")
  160. }
  161. d4.confdir = *confdir
  162. d4.ce = *ce
  163. d4.ct = *ct
  164. d4.cc = *cc
  165. d4.json = *jsonflag
  166. d4.cka = *cka
  167. d4.retry = *retry
  168. d4.rate = *rate
  169. d4.tor = *torflag
  170. d4.daily = *dailyflag
  171. s := make(chan os.Signal, 1)
  172. signal.Notify(s, os.Interrupt, os.Kill)
  173. errchan := make(chan error)
  174. eofchan := make(chan string)
  175. metachan := make(chan string)
  176. // Launching the Rate limiters
  177. rateLimiter := time.Tick(d4.rate)
  178. retryLimiter := time.Tick(d4.retry)
  179. //Setup
  180. if !d4loadConfig(d4p) {
  181. panic("Could not load Config.")
  182. }
  183. if !setReaderWriters(d4p, false) {
  184. panic("Could not Init Inputs Outputs.")
  185. }
  186. if !d4.dst.initHeader(d4p) {
  187. panic("Could not Init Headers.")
  188. }
  189. // force is a flag that forces the creation of a new connection
  190. force := false
  191. // On the first run, we send d4 meta header for type 2/254
  192. if d4.conf.ttype == 254 || d4.conf.ttype == 2 {
  193. go sendMeta(d4p, errchan, metachan)
  194. H:
  195. for {
  196. select {
  197. case <-errchan:
  198. select {
  199. case <-retryLimiter:
  200. go sendMeta(d4p, errchan, metachan)
  201. case <-s:
  202. logger.Println("Exiting")
  203. exit(d4p, 0)
  204. }
  205. case <-metachan:
  206. break H
  207. }
  208. }
  209. }
  210. // Launch copy routine
  211. go d4Copy(d4p, errchan, eofchan)
  212. // Handle signals
  213. for {
  214. select {
  215. // Case where the input ran out of data to consume1
  216. case <-eofchan:
  217. // We wait for ratelimiter before polling again
  218. EOF:
  219. for {
  220. select {
  221. case <-rateLimiter:
  222. // copy routine
  223. go d4Copy(d4p, errchan, eofchan)
  224. break EOF
  225. // Exit signal
  226. case <-s:
  227. logger.Println("Exiting")
  228. exit(d4p, 0)
  229. }
  230. }
  231. // ERROR, we check first whether it is network related
  232. case err := <-errchan:
  233. // On connection errors, we force setReaderWriter to reset the connection
  234. force = false
  235. switch t := err.(type) {
  236. case *net.OpError:
  237. force = true
  238. if t.Op == "dial" {
  239. logger.Println("Unknown Host")
  240. } else if t.Op == "read" {
  241. logger.Println("Connection Refused")
  242. } else if t.Op == "write" {
  243. logger.Println("Write error")
  244. }
  245. case syscall.Errno:
  246. if t == syscall.ECONNREFUSED {
  247. force = true
  248. logger.Println("Connection Refused")
  249. }
  250. }
  251. // We wait for retryLimiter before writing again
  252. RETRY:
  253. for {
  254. select {
  255. case <-retryLimiter:
  256. if !setReaderWriters(d4p, force) {
  257. // Can't connect, we break to retry
  258. // force is still true
  259. break
  260. }
  261. if !d4.dst.initHeader(d4p) {
  262. panic("Could not Init Headers.")
  263. }
  264. if (d4.conf.ttype == 254 || d4.conf.ttype == 2) && force {
  265. // setReaderWriter is happy, we should have a working
  266. // connection from now on.
  267. force = false
  268. // Sending meta header for the first time on this new connection
  269. go sendMeta(d4p, errchan, metachan)
  270. }
  271. break RETRY
  272. // Exit signal
  273. case <-s:
  274. logger.Println("Exiting")
  275. exit(d4p, 0)
  276. }
  277. }
  278. // metaheader sent, launch the copy routine
  279. case <-metachan:
  280. go d4Copy(d4p, errchan, eofchan)
  281. // Exit signal
  282. case <-s:
  283. logger.Println("Exiting")
  284. exit(d4p, 0)
  285. }
  286. }
  287. }
  288. func exit(d4 *d4S, exitcode int) {
  289. // Output debug info in the log before closing if debug is enabled
  290. if *debug == true {
  291. (*d4).debug = true
  292. fmt.Print(&buf)
  293. }
  294. os.Exit(exitcode)
  295. }
  296. func d4Copy(d4 *d4S, errchan chan error, eofchan chan string) {
  297. nread, err := io.CopyBuffer(&d4.dst, d4.src, d4.dst.pb)
  298. // Always retry
  299. if err != nil {
  300. logger.Printf("D4copy: %s", err)
  301. errchan <- err
  302. return
  303. }
  304. eofchan <- fmt.Sprintf("EOF: Nread: %d", nread)
  305. }
  306. func sendMeta(d4 *d4S, errchan chan error, metachan chan string) {
  307. // Fill metaheader buffer with metaheader data
  308. d4.mhb = bytes.NewBuffer(d4.mh)
  309. d4.dst.hijackHeader()
  310. // Ugly hack to skip bytes.Buffer WriteTo check that bypasses my fixed lenght buffer
  311. nread, err := io.CopyBuffer(&d4.dst, struct{ io.Reader }{d4.mhb}, d4.dst.pb)
  312. if err != nil {
  313. logger.Printf("Cannot sent meta-header: %s", err)
  314. errchan <- err
  315. return
  316. }
  317. logger.Println(fmt.Sprintf("Meta-Header sent: %d bytes", nread))
  318. d4.dst.restoreHeader()
  319. metachan <- "Header Sent"
  320. return
  321. }
  322. func readConfFile(d4 *d4S, fileName string) []byte {
  323. return config.ReadConfigFile((*d4).confdir, fileName)
  324. }
  325. func d4loadConfig(d4 *d4S) bool {
  326. // populate the map
  327. (*d4).conf = d4params{}
  328. (*d4).conf.source = string(readConfFile(d4, "source"))
  329. if len((*d4).conf.source) < 1 {
  330. log.Fatal("Unsupported source")
  331. }
  332. if (*d4).conf.source == "folder" {
  333. fstr := string(readConfFile(d4, "folder"))
  334. if ffd, err := os.Stat(fstr); os.IsNotExist(err) {
  335. log.Fatal("Folder does not exist")
  336. } else {
  337. if !ffd.IsDir() {
  338. log.Fatal("Folder is not a directory")
  339. }
  340. }
  341. (*d4).conf.folderstr = fstr
  342. }
  343. if (*d4).conf.source == "d4server" {
  344. // Parse Input Redis Config
  345. tmp := string(readConfFile(d4, "redis_d4"))
  346. ss := strings.Split(string(tmp), "/")
  347. if len(ss) <= 1 {
  348. log.Fatal("Missing Database in Redis input config: should be host:port/database_name")
  349. }
  350. (*d4).conf.redisDB, _ = strconv.Atoi(ss[1])
  351. var ret bool
  352. ret, ss[0] = config.IsNet(ss[0])
  353. if ret {
  354. sss := strings.Split(string(ss[0]), ":")
  355. (*d4).conf.redisHost = sss[0]
  356. (*d4).conf.redisPort = sss[1]
  357. } else {
  358. log.Fatal("Redis config error.")
  359. }
  360. (*d4).conf.redisQueue = string(config.ReadConfigFile(*confdir, "redis_queue"))
  361. }
  362. (*d4).conf.destination = string(readConfFile(d4, "destination"))
  363. if len((*d4).conf.destination) < 1 {
  364. log.Fatal("Unsupported Destination")
  365. }
  366. tmpu, err := uuid.FromString(string(readConfFile(d4, "uuid")))
  367. if err != nil {
  368. // generate new uuid
  369. (*d4).conf.uuid = generateUUIDv4()
  370. // And push it into the conf file
  371. f, err := os.OpenFile((*d4).confdir+"/uuid", os.O_WRONLY, 0666)
  372. defer f.Close()
  373. if err != nil {
  374. log.Fatal(err)
  375. }
  376. // store as canonical representation
  377. f.WriteString(fmt.Sprintf("%s", uuid.FromBytesOrNil((*d4).conf.uuid)) + "\n")
  378. } else {
  379. (*d4).conf.uuid = tmpu.Bytes()
  380. }
  381. // parse snaplen to uint32
  382. tmp, err := strconv.ParseUint(string(readConfFile(d4, "snaplen")), 10, 32)
  383. if err != nil || tmp < 1 {
  384. (*d4).conf.snaplen = uint32(4096)
  385. } else {
  386. (*d4).conf.snaplen = uint32(tmp)
  387. }
  388. (*d4).conf.key = readConfFile(d4, "key")
  389. // parse version to uint8
  390. tmp, _ = strconv.ParseUint(string(readConfFile(d4, "version")), 10, 8)
  391. if err != nil || tmp < 1 {
  392. (*d4).conf.version = uint8(1)
  393. } else {
  394. (*d4).conf.version = uint8(tmp)
  395. }
  396. // parse type to uint8
  397. tmp, _ = strconv.ParseUint(string(readConfFile(d4, "type")), 10, 8)
  398. if err != nil || tmp < 1 {
  399. log.Fatal("Unsupported type")
  400. } else {
  401. (*d4).conf.ttype = uint8(tmp)
  402. }
  403. // parse meta header file
  404. data := make([]byte, MH_FILE_LIMIT)
  405. if tmp == 254 || tmp == 2 {
  406. file, err := os.Open((*d4).confdir + "/metaheader.json")
  407. defer file.Close()
  408. if err != nil {
  409. panic("Failed to open Meta-Header File.")
  410. } else {
  411. if count, err := file.Read(data); err != nil {
  412. panic("Failed to open Meta-Header File.")
  413. } else {
  414. if json.Valid(data[:count]) {
  415. if checkType(data[:count]) {
  416. if off, err := file.Seek(0, 0); err != nil || off != 0 {
  417. panic(fmt.Sprintf("Cannot read Meta-Header file: %s", err))
  418. } else {
  419. // create metaheader buffer
  420. d4.mhb = bytes.NewBuffer(d4.mh)
  421. if err := json.Compact((*d4).mhb, data[:count]); err != nil {
  422. logger.Println("Failed to compact meta header file")
  423. }
  424. // Store the metaheader in d4 struct for subsequent retries
  425. (*d4).mh = data[:count]
  426. }
  427. } else {
  428. panic("A Meta-Header File should at least contain a 'type' field.")
  429. }
  430. } else {
  431. panic("Failed to validate open Meta-Header File.")
  432. }
  433. }
  434. }
  435. }
  436. // Add the custom CA cert in D4 certpool
  437. if (*d4).cc {
  438. certb, _ := ioutil.ReadFile((*d4).confdir + "rootCA.crt")
  439. (*d4).ca = *x509.NewCertPool()
  440. ok := (*d4).ca.AppendCertsFromPEM(certb)
  441. if !ok {
  442. panic("Failed to parse provided root certificate.")
  443. }
  444. }
  445. return true
  446. }
  447. func checkType(b []byte) bool {
  448. var f interface{}
  449. if err := json.Unmarshal(b, &f); err != nil {
  450. return false
  451. }
  452. m := f.(map[string]interface{})
  453. for k, v := range m {
  454. if k == "type" {
  455. switch v.(type) {
  456. case string:
  457. if v != nil {
  458. return true
  459. }
  460. }
  461. }
  462. }
  463. return false
  464. }
  465. func newD4Writer(writer io.Writer, key []byte) d4Writer {
  466. return d4Writer{w: writer, key: key}
  467. }
  468. // TODO QUICK IMPLEM, REVISE
  469. func setReaderWriters(d4 *d4S, force bool) bool {
  470. //TODO implement other destination file, fifo unix_socket ...
  471. switch (*d4).conf.source {
  472. case "stdin":
  473. (*d4).src = os.Stdin
  474. case "pcap":
  475. f, _ := os.Open("capture.pcap")
  476. (*d4).src = f
  477. case "d4server":
  478. // Create a new redis connection pool
  479. (*d4).redisInputPool = newPool((*d4).conf.redisHost+":"+(*d4).conf.redisPort, 16)
  480. var err error
  481. (*d4).redisCon, err = (*d4).redisInputPool.Dial()
  482. if err != nil {
  483. logger.Println("Could not connect to d4 Redis")
  484. return false
  485. }
  486. (*d4).src, err = inputreader.NewLPOPReader(&(*d4).redisCon, (*d4).conf.redisDB, (*d4).conf.redisQueue)
  487. if err != nil {
  488. log.Printf("Could not create d4 Redis Descriptor %q \n", err)
  489. return false
  490. }
  491. case "folder":
  492. var err error
  493. (*d4).src, err = inputreader.NewFileWatcherReader((*d4).conf.folderstr, (*d4).json, (*d4).daily, logger)
  494. if err != nil {
  495. log.Printf("Could not create File Watcher %q \n", err)
  496. return false
  497. }
  498. }
  499. isn, dstnet := config.IsNet((*d4).conf.destination)
  500. if isn {
  501. // We test whether a connection already exist
  502. // (case where the reader run out of data)
  503. // force forces to reset the connections after
  504. // failure to reuse it
  505. if _, ok := (*d4).dst.w.(net.Conn); !ok || force {
  506. if (*d4).tor {
  507. dialer := net.Dialer{
  508. Timeout: (*d4).ct,
  509. KeepAlive: (*d4).cka,
  510. FallbackDelay: 0,
  511. }
  512. dial, err := proxy.SOCKS5("tcp", "127.0.0.1:9050", nil, &dialer)
  513. if err != nil {
  514. log.Fatal(err)
  515. }
  516. tlsc := tls.Config{
  517. InsecureSkipVerify: true,
  518. }
  519. if (*d4).cc {
  520. tlsc = tls.Config{
  521. InsecureSkipVerify: false,
  522. RootCAs: &(*d4).ca,
  523. }
  524. }
  525. conn, errc := dial.Dial("tcp", dstnet)
  526. if errc != nil {
  527. logger.Println(errc)
  528. return false
  529. }
  530. if (*d4).ce == true {
  531. conn = tls.Client(conn, &tlsc) // use tls
  532. }
  533. (*d4).dst = newD4Writer(conn, (*d4).conf.key)
  534. } else {
  535. dial := net.Dialer{
  536. Timeout: (*d4).ct,
  537. KeepAlive: (*d4).cka,
  538. FallbackDelay: 0,
  539. }
  540. tlsc := tls.Config{
  541. InsecureSkipVerify: true,
  542. }
  543. if (*d4).cc {
  544. tlsc = tls.Config{
  545. InsecureSkipVerify: false,
  546. RootCAs: &(*d4).ca,
  547. }
  548. }
  549. if (*d4).ce == true {
  550. conn, errc := tls.DialWithDialer(&dial, "tcp", dstnet, &tlsc)
  551. if errc != nil {
  552. logger.Println(errc)
  553. return false
  554. }
  555. (*d4).dst = newD4Writer(conn, (*d4).conf.key)
  556. } else {
  557. conn, errc := dial.Dial("tcp", dstnet)
  558. if errc != nil {
  559. return false
  560. }
  561. (*d4).dst = newD4Writer(conn, (*d4).conf.key)
  562. }
  563. }
  564. }
  565. } else {
  566. switch (*d4).conf.destination {
  567. case "stdout":
  568. (*d4).dst = newD4Writer(os.Stdout, (*d4).conf.key)
  569. case "file":
  570. f, _ := os.Create("test.txt")
  571. (*d4).dst = newD4Writer(f, (*d4).conf.key)
  572. default:
  573. panic(fmt.Sprintf("No suitable destination found, given :%q", (*d4).conf.destination))
  574. }
  575. }
  576. // Create the copy buffer
  577. (*d4).dst.fb = make([]byte, HDR_SIZE+(*d4).conf.snaplen)
  578. (*d4).dst.pb = make([]byte, (*d4).conf.snaplen)
  579. return true
  580. }
  581. func generateUUIDv4() []byte {
  582. uuid, err := uuid.NewV4()
  583. if err != nil {
  584. log.Fatal(err)
  585. }
  586. logger.Println(fmt.Sprintf("UUIDv4: %s", uuid))
  587. return uuid.Bytes()
  588. }
  589. func (d4w *d4Writer) Write(bs []byte) (int, error) {
  590. // bs is pb
  591. // zero out moving parts of the frame
  592. copy(d4w.fb[18:62], make([]byte, 44))
  593. copy(d4w.fb[62:], make([]byte, 62+len(bs)))
  594. // update headers
  595. d4w.updateHeader(len(bs))
  596. // Copy payload after the header
  597. copy(d4w.fb[62:62+len(bs)], bs)
  598. // Now that the packet is complete, compute hmac
  599. d4w.updateHMAC(len(bs))
  600. // Eventually write binary in the sink
  601. err := binary.Write(d4w.w, binary.LittleEndian, d4w.fb[:62+len(bs)])
  602. return len(bs), err
  603. }
  604. // TODO write go idiomatic err return values
  605. func (d4w *d4Writer) updateHeader(lenbs int) bool {
  606. timeUnix := time.Now().Unix()
  607. binary.LittleEndian.PutUint64(d4w.fb[18:26], uint64(timeUnix))
  608. binary.LittleEndian.PutUint32(d4w.fb[58:62], uint32(lenbs))
  609. return true
  610. }
  611. func (d4w *d4Writer) updateHMAC(ps int) bool {
  612. h := hmac.New(sha256.New, d4w.key)
  613. h.Write(d4w.fb[0:1])
  614. h.Write(d4w.fb[1:2])
  615. h.Write(d4w.fb[2:18])
  616. h.Write(d4w.fb[18:26])
  617. h.Write(make([]byte, 32))
  618. h.Write(d4w.fb[58:62])
  619. h.Write(d4w.fb[62 : 62+ps])
  620. copy(d4w.fb[26:58], h.Sum(nil))
  621. return true
  622. }
  623. func (d4w *d4Writer) initHeader(d4 *d4S) bool {
  624. // zero out the header
  625. copy(d4w.fb[:HDR_SIZE], make([]byte, HDR_SIZE))
  626. // put version and type into the header
  627. d4w.fb[0] = (*d4).conf.version
  628. d4w.fb[1] = (*d4).conf.ttype
  629. // put uuid into the header
  630. copy(d4w.fb[2:18], (*d4).conf.uuid)
  631. // timestamp
  632. timeUnix := time.Now().UnixNano()
  633. binary.LittleEndian.PutUint64(d4w.fb[18:26], uint64(timeUnix))
  634. // hmac is set to zero during hmac operations, so leave it alone
  635. // init size of payload at 0
  636. binary.LittleEndian.PutUint32(d4w.fb[58:62], uint32(0))
  637. debugf(fmt.Sprintf("Initialized a %d bytes header:\n", HDR_SIZE))
  638. debugf(fmt.Sprintf("%b\n", d4w.fb[:HDR_SIZE]))
  639. return true
  640. }
  641. // We use type 2 to send the meta header
  642. func (d4w *d4Writer) hijackHeader() bool {
  643. d4w.fb[1] = 2
  644. return true
  645. }
  646. // Switch back the header to 254
  647. func (d4w *d4Writer) restoreHeader() bool {
  648. d4w.fb[1] = 254
  649. return true
  650. }
  651. func newPool(addr string, maxconn int) *redis.Pool {
  652. return &redis.Pool{
  653. MaxActive: maxconn,
  654. MaxIdle: 3,
  655. IdleTimeout: 240 * time.Second,
  656. // Dial or DialContext must be set. When both are set, DialContext takes precedence over Dial.
  657. Dial: func() (redis.Conn, error) { return redis.Dial("tcp", addr) },
  658. }
  659. }