Compare commits

...

23 Commits

Author SHA1 Message Date
Jean-Louis Huynen 9bea72ce04
chg: [filewatcher] daily rotation of watched folder - fixed 2021-03-03 12:09:13 +01:00
Jean-Louis Huynen 1e9e4a28d0
chg: [filewatcher] daily rotation of watched folder 2021-03-02 11:53:02 +01:00
Jean-Louis Huynen 97271a7dce
chg: [filerwatcher] fix silly path bug 2021-02-19 10:19:23 +01:00
Jean-Louis Huynen 7c929bd599
chg: [filerwatcher] less verbose output 2021-02-18 15:11:51 +01:00
Jean-Louis Huynen c598be5e25
add: [filerwatcher] go modules 2021-02-17 16:36:36 +01:00
Jean-Louis Huynen 2fdb6bbdd5
add: [filerwatcher] fix max buffer length bug 2021-02-17 16:15:54 +01:00
Jean-Louis Huynen cdc6b5da52
add: [filerwatcher] wip 2021-02-17 15:05:17 +01:00
Jean-Louis Huynen fbb4abac22
add: [filerwatcher] notify watcher routine 2021-02-16 19:10:36 +01:00
Jean-Louis Huynen 36d7e16bd2
add: [filerwatcher] base64 or json files 2021-02-16 11:30:12 +01:00
Jean-Louis Huynen e6be49a2a1
add: [filerwatcher] base64 encoding 2021-02-16 10:44:00 +01:00
Jean-Louis Huynen d7664a3a9e
add: [filerwatcher] initial work on file watcher 2021-02-15 16:19:20 +01:00
Jean-Louis Huynen d46d1ae085
chg: [config] ReadConfigFileLines create if not exist 2020-10-22 15:00:41 +02:00
Jean-Louis Huynen c84446ba7f add: [config] specify config line by line 2020-10-21 17:01:55 +02:00
Jean-Louis Huynen b333ed9208
chg: [config] bugfix config file path 2020-06-19 11:50:54 +02:00
Jean-Louis Huynen e6e4a49f13
fix: [inputreader] remove retry from reader 2020-05-27 16:52:19 +02:00
Jean-Louis Huynen 42a8154142
chg: [inputreader] adds \n on each line 2020-05-25 16:48:18 +02:00
Jean-Louis Huynen 2364985add
Merge pull request #2 from D4-project/d4forward
D4forward
2020-04-24 11:03:30 +02:00
Jean-Louis Huynen 8ae509e0eb
chg: [input] error handling on creation 2020-04-08 16:22:16 +02:00
Jean-Louis Huynen 7a4ba5405e
chg: [test] move config test 2020-04-08 08:19:12 +02:00
Jean-Louis Huynen 2fd1cdf3ea
add: [inputreader] moving redisreader in the shared lib 2020-04-08 07:56:30 +02:00
Jean-Louis Huynen f30d3c1b52
chg: [isNet] silly bug -- remastered 2020-02-25 15:54:10 +01:00
Jean-Louis Huynen 0ef9aada6f
chg: [isNet] fix silly bug 2020-02-12 14:47:21 +01:00
Jean-Louis Huynen aa03cc3065
chg: [.gitignore] remove vscode files 2020-02-12 14:46:29 +01:00
7 changed files with 348 additions and 13 deletions

1
.gitignore vendored
View File

@ -10,3 +10,4 @@
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
.vscode

View File

@ -1,6 +1,7 @@
package config
import (
"bufio"
"bytes"
"fmt"
"io"
@ -23,33 +24,40 @@ func IsNet(host string) (bool, string) {
// E.g., "[fe80::1]:80".
i := strings.LastIndex(host, "]")
if i < 0 {
log.Fatal("Unmatched [ in destination config")
log.Println("Unmatched [ in destination config")
return false, ""
}
if !validPort(host[i+1:]) {
log.Fatal("No valid port specified")
log.Println("No valid port specified")
return false, ""
}
// trim brackets
if net.ParseIP(strings.Trim(host[:i+1], "[]")) != nil {
log.Fatal(fmt.Sprintf("Server IP: %s, Server Port: %s\n", host[:i+1], host[i+1:]))
return true, host
}
} else {
// Ipv4 or DNS name
ss := strings.Split(string(host), ":")
ss := strings.Split(host, ":")
if len(ss) > 1 {
if !validPort(":" + ss[1]) {
log.Fatal("No valid port specified")
log.Println("No valid port specified")
return false, ""
}
// if not nil, its a valid IP adress
if net.ParseIP(ss[0]) != nil {
log.Fatal(fmt.Sprintf("Server IP: %s, Server Port: %s\n", ss[0], ss[1]))
return true, host
} else if validDNS.MatchString(ss[0]) {
log.Fatal(fmt.Sprintf("DNS: %s, Server Port: %s\n", ss[0], ss[1]))
}
// if "localhost", its valid
if strings.Compare("localhost", ss[0]) == 0 {
return true, host
}
// check against the regex
if validDNS.MatchString(ss[0]) {
return true, host
} else {
log.Println(fmt.Sprintf("DNS/IP: %s, Server Port: %s", ss[0], ss[1]))
return false, ""
}
}
}
return false, host
@ -73,10 +81,11 @@ func validPort(port string) bool {
}
// ReadConfigFile takes two argument: folder and fileName.
// Create if not exist
// It reads its content, trims\n and \r, and return []byte
// All errors are Fatal.
func ReadConfigFile(folder string, fileName string) []byte {
f, err := os.OpenFile("./"+folder+"/"+fileName, os.O_RDWR|os.O_CREATE, 0666)
f, err := os.OpenFile(folder+"/"+fileName, os.O_RDONLY|os.O_CREATE, 0666)
defer f.Close()
if err != nil {
log.Fatal(err)
@ -92,7 +101,30 @@ func ReadConfigFile(folder string, fileName string) []byte {
log.Fatal(err)
}
// trim \r and \n if present
r := bytes.TrimSuffix(data[:count], []byte("\n"))
return bytes.TrimSuffix(r, []byte("\r"))
// trim \r and \n if present
r := bytes.TrimSuffix(data[:count], []byte("\n"))
return bytes.TrimSuffix(r, []byte("\r"))
}
// ReadConfigFileLines takes two argument: folder and fileName.
// Create if not exist
// It reads its content line by line,
// and return [][]byte
// All errors are Fatal.
func ReadConfigFileLines(folder string, fileName string) [][]byte {
res := [][]byte{}
f, err := os.OpenFile(folder+"/"+fileName, os.O_RDONLY|os.O_CREATE, 0666)
if err != nil {
log.Fatal(err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
for scanner.Scan() {
res = append(res, []byte(scanner.Text()))
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
return res
}

View File

@ -0,0 +1,36 @@
package config
import (
"testing"
)
var testCases = []struct {
name string
str string
expected bool
}{
{"Well-formed IPv4 with port", "127.0.0.1:4443", true},
{"Well-formed IPv4 without port", "127.0.0.1", false},
{"Malformed IPv4 with port", "127..0.1:4443", false},
{"Malformed IPv4 without port", "127..0.1", false},
{"Well-formed IPv6 with port - 2", "[::1]:4443", true},
{"Well-formed IPv6 without port", "[fe80::1%25en0]", false},
{"Malformed IPv6 with port", "[::::1]:4443", false},
{"Malformed IPv6 without port", "[::::::::1]", false},
{"Malformed IPv6 : missing square brackets", "::::::::1:4443", false},
{"Well-formed DNS name with port", "toto.circl.lu:4443", true},
{"Well-formed DNS name without port", "toto.circl.lu", false},
{"Malformed DNS name with port", ".:4443", false},
{"Localhost with port", "localhost:4443", true},
}
func TestIsNet(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
b, _ := IsNet(tc.str)
if b != tc.expected {
t.Fail()
}
})
}
}

10
go.mod
View File

@ -2,4 +2,12 @@ module github.com/D4-project/d4-golang-utils
go 1.12
require github.com/gofrs/uuid v3.2.0+incompatible
require (
github.com/gofrs/uuid v3.2.0+incompatible
github.com/gomodule/redigo v2.0.0+incompatible
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/redislabs/redisgraph-go v2.0.2+incompatible
github.com/rjeczalik/notify v0.9.2
github.com/robfig/cron/v3 v3.0.1
github.com/stretchr/testify v1.7.0 // indirect
)

25
go.sum
View File

@ -1,2 +1,27 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/mattn/go-runewidth v0.0.9 h1:Lm995f3rfxdpd6TSmuVCHVb/QhupuXlYr8sCI/QdE+0=
github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/redislabs/redisgraph-go v2.0.2+incompatible h1:HW9BqUhvgHeKXnUCrv9HOLPL/QFYxxzR2hsJClTzWk8=
github.com/redislabs/redisgraph-go v2.0.2+incompatible/go.mod h1:GYn4oUFkbkHx49xm2H4G8jZziqKDKdRtDUuTBZTmqBE=
github.com/rjeczalik/notify v0.9.2 h1:MiTWrPj55mNDHEiIX5YUSKefw/+lCQVoAFmD6oQm5w8=
github.com/rjeczalik/notify v0.9.2/go.mod h1:aErll2f0sUX9PXZnVNyeiObbmTlk5jnMoCa4QEjJeqM=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7 h1:bit1t3mgdR35yN0cX0G8orgLtOuyL9Wqxa1mccLB0ig=
golang.org/x/sys v0.0.0-20180926160741-c2ed4eda69e7/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -0,0 +1,172 @@
package inputreader
import (
"bytes"
"encoding/base64"
"fmt"
"github.com/rjeczalik/notify"
"github.com/robfig/cron/v3"
"io"
"log"
"os"
"time"
)
// FileWatcherReader is an abstraction of a folder watcher
// and behaves like a reader
type FileWatcherReader struct {
// Folder to watch
folderstr string
// Notify Channel
eic chan notify.EventInfo
// TearDown channel
exit chan string
// Chan used to restart the watching channel on a new folder
dailySwitch chan bool
dailySwitchExit chan bool
// Current buffer
json bool
// Current file
curfile *os.File
// Current state Watching / Reading
watching bool
// Insert Separator
insertsep bool
// logging
log * log.Logger
}
// NewFileWatcherReader creates a new FileWatcherReader
// json specifies whether we now we handle json files
func NewFileWatcherReader(f string, j bool, daily bool, logger *log.Logger) (*FileWatcherReader, error) {
r := &FileWatcherReader{
folderstr: f,
eic: make(chan notify.EventInfo, 4096),
dailySwitch: make(chan bool),
dailySwitchExit: make(chan bool),
json: j,
watching: true,
insertsep: false,
log: logger,
}
// go routine holding the watcher
go setUpWatcher(r, daily)
// cron task to add daily folder to watch
if daily {
c := cron.New()
c.AddFunc("@midnight", func() {
//c.AddFunc("@every 10s", func() {
// Sending exit signal to setUpWatcher
r.dailySwitch <- true
// Waiting for exit signal from setUpWatcher
<-r.dailySwitchExit
go setUpWatcher(r, daily)
})
c.Start()
}
return r, nil
}
// setUpWatcher holds the watcher
func setUpWatcher(r *FileWatcherReader, daily bool) {
if daily {
// TODO make it customizable
t, _ := time.ParseDuration("1s")
retryWatch(r, t)
} else {
if err := notify.Watch(fmt.Sprintf("%s/...", r.folderstr), r.eic, notify.InCloseWrite); err != nil {
log.Fatal(err)
}
}
defer notify.Stop(r.eic)
<-r.dailySwitch
r.dailySwitchExit <- true
}
// retryWatch tries to set up the watcher until it works every t
func retryWatch(r *FileWatcherReader, t time.Duration) {
dt := time.Now()
//Format YYYYMMDD
// TODO make it customizable
currentFolder := dt.Format("20060102")
r.log.Println(fmt.Sprintf("Watching: %s/%s/...", r.folderstr, currentFolder))
for {
if err := notify.Watch(fmt.Sprintf("%s/%s/...", r.folderstr, currentFolder), r.eic, notify.InCloseWrite); err != nil {
r.log.Println(fmt.Sprintf("Waiting for: %s/%s/... to exist", r.folderstr, currentFolder))
time.Sleep(t)
}else{
return
}
}
}
// Read waits for InCloseWrite file event uses a bytes reader to copy
// the resulting file encoded in b64 in p
func (fw *FileWatcherReader) Read(p []byte) (n int, err error) {
// Watching for new files to read
if fw.watching {
watchloop:
for {
select {
case ei := <-fw.eic:
//r.log.Println("Got event:", ei)
// New File, let's read its content
var err error
fw.curfile, err = os.Open(ei.Path())
if err != nil {
fw.log.Fatal(err)
}
fw.watching = false
break watchloop
case <-fw.exit:
// Exiting
return 0, io.EOF
}
}
}
// Inserting separator
if fw.insertsep {
var buf []byte
buf = append(buf, "\n"...)
rreader := bytes.NewReader(buf)
n, err = rreader.Read(p)
fw.watching = true
fw.insertsep = false
fw.curfile.Close()
return n, err
}
// Reading
// if not json it could be anything so we encode it in b64
if !fw.json {
// base64 stream encoder
b64buffer := new(bytes.Buffer)
b64encoder := base64.NewEncoder(base64.StdEncoding, b64buffer)
// max buffer size is then 3072 = 4096/4*3
buf := make([]byte, 3072)
bytesread, err := fw.curfile.Read(buf)
// buf is the input
b64encoder.Write(buf[:bytesread])
// Close the encoder to flush partially written blocks
b64encoder.Close()
if err == io.EOF {
fw.insertsep = true
}
// Copy from b64buffer to p
n, err = b64buffer.Read(p[:len(b64buffer.Bytes())])
} else {
n, err = fw.curfile.Read(p)
if err == io.EOF {
fw.insertsep = true
}
}
return n, err
}
// Teardown is called on error to stop the Reading loop if needed
func (rl *FileWatcherReader) Teardown() {
rl.exit <- "exit"
}

View File

@ -0,0 +1,61 @@
package inputreader
import (
"bytes"
"github.com/gomodule/redigo/redis"
"io"
)
// RedisLPOPReader is a abstraction of LPOP list
// and behaves like a reader
type RedisLPOPReader struct {
// D4 redis connection
r *redis.Conn
// D4 redis database
d int
// D4 Queue storing
q string
// Current buffer
buf []byte
}
// NewLPOPReader creates a new RedisLPOPReader
func NewLPOPReader(rc *redis.Conn, db int, queue string) (*RedisLPOPReader, error) {
rr := *rc
if _, err := rr.Do("SELECT", db); err != nil {
rr.Close()
return nil, err
}
r := &RedisLPOPReader{
r: rc,
d: db,
q: queue,
}
return r, nil
}
// Read LPOP the redis queue and use a bytes reader to copy
// the resulting data in p
func (rl *RedisLPOPReader) Read(p []byte) (n int, err error) {
rr := *rl.r
buf, err := redis.Bytes(rr.Do("LPOP", rl.q))
// If redis return empty: EOF (user should not stop)
if err == redis.ErrNil {
return 0, io.EOF
} else if err != nil {
return 0, err
}
buf = append(buf, "\n"...)
rreader := bytes.NewReader(buf)
n, err = rreader.Read(p)
return n, err
}
// Teardown is called on error to close the redis connection
func (rl *RedisLPOPReader) Teardown() {
(*rl.r).Close()
}