diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index 431f48390..6f75b8357 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -109,32 +109,6 @@ func (q *ChannelQueue) Flush(timeout time.Duration) error { return q.FlushWithContext(ctx) } -// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty -func (q *ChannelQueue) FlushWithContext(ctx context.Context) error { - log.Trace("ChannelQueue: %d Flush", q.qid) - paused, _ := q.IsPausedIsResumed() - for { - select { - case <-paused: - return nil - case data, ok := <-q.dataChan: - if !ok { - return nil - } - if unhandled := q.handle(data); unhandled != nil { - log.Error("Unhandled Data whilst flushing queue %d", q.qid) - } - atomic.AddInt64(&q.numInQueue, -1) - case <-q.baseCtx.Done(): - return q.baseCtx.Err() - case <-ctx.Done(): - return ctx.Err() - default: - return nil - } - } -} - // Shutdown processing from this queue func (q *ChannelQueue) Shutdown() { q.lock.Lock() diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index f2d3dbdc9..c43bd1db3 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -8,7 +8,6 @@ import ( "fmt" "runtime/pprof" "sync" - "sync/atomic" "time" "code.gitea.io/gitea/modules/container" @@ -167,35 +166,6 @@ func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error { return q.FlushWithContext(ctx) } -// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty -func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error { - log.Trace("ChannelUniqueQueue: %d Flush", q.qid) - paused, _ := q.IsPausedIsResumed() - for { - select { - case <-paused: - return nil - default: - } - select { - case data, ok := <-q.dataChan: - if !ok { - return nil - } - if unhandled := q.handle(data); unhandled != nil { - log.Error("Unhandled Data whilst flushing queue %d", q.qid) - } - atomic.AddInt64(&q.numInQueue, -1) - case <-q.baseCtx.Done(): - return q.baseCtx.Err() - case <-ctx.Done(): - return ctx.Err() - default: - return nil - } - } -} - // Shutdown processing from this queue func (q *ChannelUniqueQueue) Shutdown() { log.Trace("ChannelUniqueQueue: %s Shutting down", q.name) diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 244927880..b32128cb8 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -463,13 +463,43 @@ func (p *WorkerPool) IsEmpty() bool { return atomic.LoadInt64(&p.numInQueue) == 0 } +// contextError returns either ctx.Done(), the base context's error or nil +func (p *WorkerPool) contextError(ctx context.Context) error { + select { + case <-p.baseCtx.Done(): + return p.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } +} + // FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty // NB: The worker will not be registered with the manager. func (p *WorkerPool) FlushWithContext(ctx context.Context) error { log.Trace("WorkerPool: %d Flush", p.qid) + paused, _ := p.IsPausedIsResumed() for { + // Because select will return any case that is satisified at random we precheck here before looking at dataChan. select { - case data := <-p.dataChan: + case <-paused: + // Ensure that even if paused that the cancelled error is still sent + return p.contextError(ctx) + case <-p.baseCtx.Done(): + return p.baseCtx.Err() + case <-ctx.Done(): + return ctx.Err() + default: + } + + select { + case <-paused: + return p.contextError(ctx) + case data, ok := <-p.dataChan: + if !ok { + return nil + } if unhandled := p.handle(data); unhandled != nil { log.Error("Unhandled Data whilst flushing queue %d", p.qid) } @@ -495,6 +525,7 @@ func (p *WorkerPool) doWork(ctx context.Context) { paused, _ := p.IsPausedIsResumed() data := make([]Data, 0, p.batchLength) for { + // Because select will return any case that is satisified at random we precheck here before looking at dataChan. select { case <-paused: log.Trace("Worker for Queue %d Pausing", p.qid) @@ -515,8 +546,19 @@ func (p *WorkerPool) doWork(ctx context.Context) { log.Trace("Worker shutting down") return } + case <-ctx.Done(): + if len(data) > 0 { + log.Trace("Handling: %d data, %v", len(data), data) + if unhandled := p.handle(data...); unhandled != nil { + log.Error("Unhandled Data in queue %d", p.qid) + } + atomic.AddInt64(&p.numInQueue, -1*int64(len(data))) + } + log.Trace("Worker shutting down") + return default: } + select { case <-paused: // go back around