df0d1a2134
For security reasons, scoping access to a redis server via ACL rules is a good practice. Some parts of the codebase handles prefix like cache[^1] and session[^2], but the queue module doesn't. This patch adds this missing functionality to the queue module. Note about relevant test: I tried to keep the PR as small as possible (and reasonable), and not change how the test runs. Updated the existing test to use the same redis address and basically duplicated the test with the extra flag. It does NOT test if the keys are correct, it ensures only it works as expected. To make assertions about the keys, the whole test has to be updated as the general wrapper doesn't allow the main test to check anything provider (redis) specific property. That's not something I wanted to take on now. [^1]:e4c3c039be/modules/cache/cache_redis.go (L139-L150)
[^2]:e4c3c039be/modules/session/redis.go (L122-L129)
Signed-off-by: Victoria Nadasdi <victoria@efertone.me> Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/3836 Reviewed-by: Earl Warren <earl-warren@noreply.codeberg.org> Co-authored-by: Victoria Nadasdi <victoria@efertone.me> Co-committed-by: Victoria Nadasdi <victoria@efertone.me>
159 lines
4 KiB
Go
159 lines
4 KiB
Go
// Copyright 2023 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package queue
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"code.gitea.io/gitea/modules/graceful"
|
|
"code.gitea.io/gitea/modules/log"
|
|
"code.gitea.io/gitea/modules/nosql"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
type baseRedis struct {
|
|
client redis.UniversalClient
|
|
isUnique bool
|
|
cfg *BaseConfig
|
|
prefix string
|
|
|
|
mu sync.Mutex // the old implementation is not thread-safe, the queue operation and set operation should be protected together
|
|
}
|
|
|
|
var _ baseQueue = (*baseRedis)(nil)
|
|
|
|
func newBaseRedisGeneric(cfg *BaseConfig, unique bool) (baseQueue, error) {
|
|
client := nosql.GetManager().GetRedisClient(cfg.ConnStr)
|
|
prefix := ""
|
|
uri := nosql.ToRedisURI(cfg.ConnStr)
|
|
|
|
for key, value := range uri.Query() {
|
|
switch key {
|
|
case "prefix":
|
|
if len(value) > 0 {
|
|
prefix = value[0]
|
|
|
|
// As we are not checking any other values, if we found this one, we can
|
|
// exit from the loop.
|
|
// If a new key check is required, remove this break.
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
var err error
|
|
for i := 0; i < 10; i++ {
|
|
err = client.Ping(graceful.GetManager().ShutdownContext()).Err()
|
|
if err == nil {
|
|
break
|
|
}
|
|
log.Warn("Redis is not ready, waiting for 1 second to retry: %v", err)
|
|
time.Sleep(time.Second)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &baseRedis{cfg: cfg, client: client, isUnique: unique, prefix: prefix}, nil
|
|
}
|
|
|
|
func newBaseRedisSimple(cfg *BaseConfig) (baseQueue, error) {
|
|
return newBaseRedisGeneric(cfg, false)
|
|
}
|
|
|
|
func newBaseRedisUnique(cfg *BaseConfig) (baseQueue, error) {
|
|
return newBaseRedisGeneric(cfg, true)
|
|
}
|
|
|
|
func (q *baseRedis) prefixedName(name string) string {
|
|
return q.prefix + name
|
|
}
|
|
|
|
func (q *baseRedis) PushItem(ctx context.Context, data []byte) error {
|
|
return backoffErr(ctx, backoffBegin, backoffUpper, time.After(pushBlockTime), func() (retry bool, err error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
cnt, err := q.client.LLen(ctx, q.prefixedName(q.cfg.QueueFullName)).Result()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if int(cnt) >= q.cfg.Length {
|
|
return true, nil
|
|
}
|
|
|
|
if q.isUnique {
|
|
added, err := q.client.SAdd(ctx, q.prefixedName(q.cfg.SetFullName), data).Result()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
if added == 0 {
|
|
return false, ErrAlreadyInQueue
|
|
}
|
|
}
|
|
return false, q.client.RPush(ctx, q.prefixedName(q.cfg.QueueFullName), data).Err()
|
|
})
|
|
}
|
|
|
|
func (q *baseRedis) PopItem(ctx context.Context) ([]byte, error) {
|
|
return backoffRetErr(ctx, backoffBegin, backoffUpper, infiniteTimerC, func() (retry bool, data []byte, err error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
data, err = q.client.LPop(ctx, q.prefixedName(q.cfg.QueueFullName)).Bytes()
|
|
if err == redis.Nil {
|
|
return true, nil, nil
|
|
}
|
|
if err != nil {
|
|
return true, nil, nil
|
|
}
|
|
if q.isUnique {
|
|
// the data has been popped, even if there is any error we can't do anything
|
|
_ = q.client.SRem(ctx, q.prefixedName(q.cfg.SetFullName), data).Err()
|
|
}
|
|
return false, data, err
|
|
})
|
|
}
|
|
|
|
func (q *baseRedis) HasItem(ctx context.Context, data []byte) (bool, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
if !q.isUnique {
|
|
return false, nil
|
|
}
|
|
return q.client.SIsMember(ctx, q.prefixedName(q.cfg.SetFullName), data).Result()
|
|
}
|
|
|
|
func (q *baseRedis) Len(ctx context.Context) (int, error) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
cnt, err := q.client.LLen(ctx, q.prefixedName(q.cfg.QueueFullName)).Result()
|
|
return int(cnt), err
|
|
}
|
|
|
|
func (q *baseRedis) Close() error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
return q.client.Close()
|
|
}
|
|
|
|
func (q *baseRedis) RemoveAll(ctx context.Context) error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
c1 := q.client.Del(ctx, q.prefixedName(q.cfg.QueueFullName))
|
|
// the "set" must be cleared after the "list" because there is no transaction.
|
|
// it's better to have duplicate items than losing items.
|
|
c2 := q.client.Del(ctx, q.prefixedName(q.cfg.SetFullName))
|
|
if c1.Err() != nil {
|
|
return c1.Err()
|
|
}
|
|
if c2.Err() != nil {
|
|
return c2.Err()
|
|
}
|
|
return nil // actually, checking errors doesn't make sense here because the state could be out-of-sync
|
|
}
|