diff --git a/.forgejo/workflows/testing.yml b/.forgejo/workflows/testing.yml index cd955c01a..79c695e27 100644 --- a/.forgejo/workflows/testing.yml +++ b/.forgejo/workflows/testing.yml @@ -53,6 +53,8 @@ jobs: MINIO_DOMAIN: minio MINIO_ROOT_USER: 123456 MINIO_ROOT_PASSWORD: 12345678 + redis: + image: redis:7.2.4 steps: - uses: https://code.forgejo.org/actions/checkout@v3 - uses: https://code.forgejo.org/actions/setup-go@v4 @@ -82,6 +84,7 @@ jobs: env: RACE_ENABLED: 'true' TAGS: bindata + TEST_REDIS_SERVER: redis:6379 test-mysql: if: ${{ !startsWith(vars.ROLE, 'forgejo-') }} runs-on: docker diff --git a/modules/queue/base_redis.go b/modules/queue/base_redis.go index a1e234943..abab0f5c5 100644 --- a/modules/queue/base_redis.go +++ b/modules/queue/base_redis.go @@ -19,6 +19,7 @@ type baseRedis struct { client redis.UniversalClient isUnique bool cfg *BaseConfig + prefix string mu sync.Mutex // the old implementation is not thread-safe, the queue operation and set operation should be protected together } @@ -27,6 +28,22 @@ var _ baseQueue = (*baseRedis)(nil) func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) { client := nosql.GetManager().GetRedisClient(cfg.ConnStr) + prefix := "" + uri := nosql.ToRedisURI(cfg.ConnStr) + + for key, value := range uri.Query() { + switch key { + case "prefix": + if len(value) > 0 { + prefix = value[0] + + // As we are not checking any other values, if we found this one, we can + // exit from the loop. + // If a new key check is required, remove this break. + break + } + } + } var err error for i := 0; i < 10; i++ { @@ -41,7 +58,7 @@ func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) { return nil, err } - return &baseRedis{cfg: cfg, client: client, isUnique: unique}, nil + return &baseRedis{cfg: cfg, client: client, isUnique: unique, prefix: prefix}, nil } func newBaseRedisSimple(cfg *BaseConfig) (baseQueue, error) { @@ -52,12 +69,16 @@ func newBaseRedisUnique(cfg *BaseConfig) (baseQueue, error) { return newBaseRedisGeneric(cfg, true) } +func (q *baseRedis) prefixedName(name string) string { + return q.prefix + name +} + func (q *baseRedis) PushItem(ctx context.Context, data []byte) error { return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) { q.mu.Lock() defer q.mu.Unlock() - cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result() + cnt, err := q.client.LLen(ctx, q.prefixedName(q.cfg.QueueFullName)).Result() if err != nil { return false, err } @@ -66,7 +87,7 @@ func (q *baseRedis) PushItem(ctx context.Context, data []byte) error { } if q.isUnique { - added, err := q.client.SAdd(ctx, q.cfg.SetFullName, data).Result() + added, err := q.client.SAdd(ctx, q.prefixedName(q.cfg.SetFullName), data).Result() if err != nil { return false, err } @@ -74,7 +95,7 @@ func (q *baseRedis) PushItem(ctx context.Context, data []byte) error { return false, ErrAlreadyInQueue } } - return false, q.client.RPush(ctx, q.cfg.QueueFullName, data).Err() + return false, q.client.RPush(ctx, q.prefixedName(q.cfg.QueueFullName), data).Err() }) } @@ -83,7 +104,7 @@ func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) { q.mu.Lock() defer q.mu.Unlock() - data, err = q.client.LPop(ctx, q.cfg.QueueFullName).Bytes() + data, err = q.client.LPop(ctx, q.prefixedName(q.cfg.QueueFullName)).Bytes() if err == redis.Nil { return true, nil, nil } @@ -92,7 +113,7 @@ func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) { } if q.isUnique { // the data has been popped, even if there is any error we can't do anything - _ = q.client.SRem(ctx, q.cfg.SetFullName, data).Err() + _ = q.client.SRem(ctx, q.prefixedName(q.cfg.SetFullName), data).Err() } return false, data, err }) @@ -104,13 +125,13 @@ func (q *baseRedis) HasItem(ctx context.Context, data []byte) (bool, error) { if !q.isUnique { return false, nil } - return q.client.SIsMember(ctx, q.cfg.SetFullName, data).Result() + return q.client.SIsMember(ctx, q.prefixedName(q.cfg.SetFullName), data).Result() } func (q *baseRedis) Len(ctx context.Context) (int, error) { q.mu.Lock() defer q.mu.Unlock() - cnt, err := q.client.LLen(ctx, q.cfg.QueueFullName).Result() + cnt, err := q.client.LLen(ctx, q.prefixedName(q.cfg.QueueFullName)).Result() return int(cnt), err } @@ -124,10 +145,10 @@ func (q *baseRedis) RemoveAll(ctx context.Context) error { q.mu.Lock() defer q.mu.Unlock() - c1 := q.client.Del(ctx, q.cfg.QueueFullName) + c1 := q.client.Del(ctx, q.prefixedName(q.cfg.QueueFullName)) // the "set" must be cleared after the "list" because there is no transaction. // it's better to have duplicate items than losing items. - c2 := q.client.Del(ctx, q.cfg.SetFullName) + c2 := q.client.Del(ctx, q.prefixedName(q.cfg.SetFullName)) if c1.Err() != nil { return c1.Err() } diff --git a/modules/queue/base_redis_test.go b/modules/queue/base_redis_test.go index be8bfbfe3..ed01a990b 100644 --- a/modules/queue/base_redis_test.go +++ b/modules/queue/base_redis_test.go @@ -16,6 +16,17 @@ import ( "github.com/stretchr/testify/assert" ) +const defaultTestRedisServer = "127.0.0.1:6379" + +func testRedisHost() string { + value := os.Getenv("TEST_REDIS_SERVER") + if value != "" { + return value + } + + return defaultTestRedisServer +} + func waitRedisReady(conn string, dur time.Duration) (ready bool) { ctxTimed, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() @@ -47,25 +58,67 @@ func redisServerCmd(t *testing.T) *exec.Cmd { } func TestBaseRedis(t *testing.T) { + redisAddress := "redis://" + testRedisHost() + "/0" + queueSettings := setting.QueueSettings{ + Length: 10, + ConnStr: redisAddress, + } + var redisServer *exec.Cmd + if !waitRedisReady(redisAddress, 0) { + redisServer = redisServerCmd(t) + + if redisServer == nil { + t.Skip("redis-server not found in Forgejo test yet") + return + } + + assert.NoError(t, redisServer.Start()) + if !assert.True(t, waitRedisReady(redisAddress, 5*time.Second), "start redis-server") { + return + } + } + defer func() { if redisServer != nil { _ = redisServer.Process.Signal(os.Interrupt) _ = redisServer.Wait() } }() - if !waitRedisReady("redis://127.0.0.1:6379/0", 0) { + + testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false) + testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true) +} + +func TestBaseRedisWithPrefix(t *testing.T) { + redisAddress := "redis://" + testRedisHost() + "/0?prefix=forgejo:queue:" + queueSettings := setting.QueueSettings{ + Length: 10, + ConnStr: redisAddress, + } + + var redisServer *exec.Cmd + if !waitRedisReady(redisAddress, 0) { redisServer = redisServerCmd(t) - if true { + + if redisServer == nil { t.Skip("redis-server not found in Forgejo test yet") return } + assert.NoError(t, redisServer.Start()) - if !assert.True(t, waitRedisReady("redis://127.0.0.1:6379/0", 5*time.Second), "start redis-server") { + if !assert.True(t, waitRedisReady(redisAddress, 5*time.Second), "start redis-server") { return } } - testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", setting.QueueSettings{Length: 10}), false) - testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", setting.QueueSettings{Length: 10}), true) + defer func() { + if redisServer != nil { + _ = redisServer.Process.Signal(os.Interrupt) + _ = redisServer.Wait() + } + }() + + testQueueBasic(t, newBaseRedisSimple, toBaseConfig("baseRedis", queueSettings), false) + testQueueBasic(t, newBaseRedisUnique, toBaseConfig("baseRedisUnique", queueSettings), true) } diff --git a/release-notes/8.0.0/feat/3836.md b/release-notes/8.0.0/feat/3836.md new file mode 100644 index 000000000..1052c6d22 --- /dev/null +++ b/release-notes/8.0.0/feat/3836.md @@ -0,0 +1 @@ +Parse prefix parameter from redis URI for queues and use that as prefix to keys