diff --git a/d4-golang-utils_test.go b/config/d4-golang-utils_test.go similarity index 90% rename from d4-golang-utils_test.go rename to config/d4-golang-utils_test.go index 69bb960..d275093 100644 --- a/d4-golang-utils_test.go +++ b/config/d4-golang-utils_test.go @@ -1,9 +1,7 @@ -package main +package config import ( "testing" - - config "github.com/D4-project/d4-golang-utils/config" ) var testCases = []struct { @@ -29,7 +27,7 @@ var testCases = []struct { func TestIsNet(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - b, _ := config.IsNet(tc.str) + b, _ := IsNet(tc.str) if b != tc.expected { t.Fail() } diff --git a/go.mod b/go.mod index 01a4ed6..a358993 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/D4-project/d4-golang-utils go 1.12 -require github.com/gofrs/uuid v3.2.0+incompatible +require ( + github.com/gofrs/uuid v3.2.0+incompatible + github.com/gomodule/redigo v2.0.0+incompatible +) diff --git a/go.sum b/go.sum index f27a074..445b960 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= +github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= +github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= diff --git a/inputreader/redisreader.go b/inputreader/redisreader.go new file mode 100644 index 0000000..f0345c2 --- /dev/null +++ b/inputreader/redisreader.go @@ -0,0 +1,65 @@ +package inputreader + +import ( + "bytes" + "io" + "time" + + "github.com/gomodule/redigo/redis" +) + +// RedisLPOPReader is a abstraction of LPOP list +// and behaves like a reader +type RedisLPOPReader struct { + // D4 redis connection + r *redis.Conn + // D4 redis database + d int + // D4 Queue storing + q string + // Time in minute before retrying + retryPeriod time.Duration + // Current buffer + buf []byte +} + +// NewLPOPReader creates a new RedisLPOPReader +func NewLPOPReader(rc *redis.Conn, db int, queue string, rt int) (*RedisLPOPReader, error) { + rr := *rc + + if _, err := rr.Do("SELECT", db); err != nil { + rr.Close() + return nil, err + } + + r := &RedisLPOPReader{ + r: rc, + d: db, + q: queue, + retryPeriod: time.Duration(rt) * time.Minute, + } + + return r, nil +} + +// Read LPOP the redis queue and use a bytes reader to copy +// the resulting data in p +func (rl *RedisLPOPReader) Read(p []byte) (n int, err error) { + rr := *rl.r + + buf, err := redis.Bytes(rr.Do("LPOP", rl.q)) + // If redis return empty: EOF (user should not stop) + if err == redis.ErrNil { + return 0, io.EOF + } else if err != nil { + return 0, err + } + rreader := bytes.NewReader(buf) + n, err = rreader.Read(p) + return n, err +} + +// Teardown is called on error to close the redis connection +func (rl *RedisLPOPReader) Teardown() { + (*rl.r).Close() +}