From 2fd1cdf3ea38312b35636e9a933d03d3fb1f26ee Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 8 Apr 2020 07:56:30 +0200 Subject: [PATCH 1/3] add: [inputreader] moving redisreader in the shared lib --- inputreader/redisreader.go | 65 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 inputreader/redisreader.go diff --git a/inputreader/redisreader.go b/inputreader/redisreader.go new file mode 100644 index 0000000..303a2b6 --- /dev/null +++ b/inputreader/redisreader.go @@ -0,0 +1,65 @@ +package inputreader + +import ( + "bytes" + "io" + "log" + "time" + + "github.com/gomodule/redigo/redis" +) + +// RedisLPOPReader is a abstraction of LPOP list +// and behaves like a reader +type RedisLPOPReader struct { + // D4 redis connection + r *redis.Conn + // D4 redis database + d int + // D4 Queue storing + q string + // Time in minute before retrying + retryPeriod time.Duration + // Current buffer + buf []byte +} + +// NewLPOPReader creates a new RedisLPOPScanner +func NewLPOPReader(rc *redis.Conn, db int, queue string, rt int) *RedisLPOPReader { + rr := *rc + + if _, err := rr.Do("SELECT", db); err != nil { + rr.Close() + log.Fatal(err) + } + + return &RedisLPOPReader{ + r: rc, + d: db, + q: queue, + retryPeriod: time.Duration(rt) * time.Minute, + } +} + +// Read LPOP the redis queue and use a bytes reader to copy +// the resulting data in p +func (rl *RedisLPOPReader) Read(p []byte) (n int, err error) { + rr := *rl.r + + buf, err := redis.Bytes(rr.Do("LPOP", rl.q)) + // If redis return empty: EOF (user should not stop) + if err == redis.ErrNil { + return 0, io.EOF + } else if err != nil { + log.Println(err) + return 0, err + } + rreader := bytes.NewReader(buf) + n, err = rreader.Read(p) + return n, err +} + +// Teardown is called on error to close the redis connection +func (rl *RedisLPOPReader) Teardown() { + (*rl.r).Close() +} From 7a4ba5405ed818badc5e20149fcecdecd14d25e5 Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 8 Apr 2020 08:19:12 +0200 Subject: [PATCH 2/3] chg: [test] move config test --- d4-golang-utils_test.go => config/d4-golang-utils_test.go | 6 ++---- go.mod | 5 ++++- go.sum | 2 ++ 3 files changed, 8 insertions(+), 5 deletions(-) rename d4-golang-utils_test.go => config/d4-golang-utils_test.go (90%) diff --git a/d4-golang-utils_test.go b/config/d4-golang-utils_test.go similarity index 90% rename from d4-golang-utils_test.go rename to config/d4-golang-utils_test.go index 69bb960..d275093 100644 --- a/d4-golang-utils_test.go +++ b/config/d4-golang-utils_test.go @@ -1,9 +1,7 @@ -package main +package config import ( "testing" - - config "github.com/D4-project/d4-golang-utils/config" ) var testCases = []struct { @@ -29,7 +27,7 @@ var testCases = []struct { func TestIsNet(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - b, _ := config.IsNet(tc.str) + b, _ := IsNet(tc.str) if b != tc.expected { t.Fail() } diff --git a/go.mod b/go.mod index 01a4ed6..a358993 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/D4-project/d4-golang-utils go 1.12 -require github.com/gofrs/uuid v3.2.0+incompatible +require ( + github.com/gofrs/uuid v3.2.0+incompatible + github.com/gomodule/redigo v2.0.0+incompatible +) diff --git a/go.sum b/go.sum index f27a074..445b960 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= +github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= From 8ae509e0eba426021fb1cd9baf84e2bcb5a705aa Mon Sep 17 00:00:00 2001 From: Jean-Louis Huynen Date: Wed, 8 Apr 2020 16:22:16 +0200 Subject: [PATCH 3/3] chg: [input] error handling on creation --- inputreader/redisreader.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/inputreader/redisreader.go b/inputreader/redisreader.go index 303a2b6..f0345c2 100644 --- a/inputreader/redisreader.go +++ b/inputreader/redisreader.go @@ -3,7 +3,6 @@ package inputreader import ( "bytes" "io" - "log" "time" "github.com/gomodule/redigo/redis" @@ -24,21 +23,23 @@ type RedisLPOPReader struct { buf []byte } -// NewLPOPReader creates a new RedisLPOPScanner -func NewLPOPReader(rc *redis.Conn, db int, queue string, rt int) *RedisLPOPReader { +// NewLPOPReader creates a new RedisLPOPReader +func NewLPOPReader(rc *redis.Conn, db int, queue string, rt int) (*RedisLPOPReader, error) { rr := *rc if _, err := rr.Do("SELECT", db); err != nil { rr.Close() - log.Fatal(err) + return nil, err } - return &RedisLPOPReader{ + r := &RedisLPOPReader{ r: rc, d: db, q: queue, retryPeriod: time.Duration(rt) * time.Minute, } + + return r, nil } // Read LPOP the redis queue and use a bytes reader to copy @@ -51,7 +52,6 @@ func (rl *RedisLPOPReader) Read(p []byte) (n int, err error) { if err == redis.ErrNil { return 0, io.EOF } else if err != nil { - log.Println(err) return 0, err } rreader := bytes.NewReader(buf)