Pause queues (#15928)
* Start adding mechanism to return unhandled data Signed-off-by: Andrew Thornton <art27@cantab.net> * Create pushback interface Signed-off-by: Andrew Thornton <art27@cantab.net> * Add Pausable interface to WorkerPool and Manager Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and PushBack for the bytefifos Signed-off-by: Andrew Thornton <art27@cantab.net> * Implement Pausable and Pushback for ChannelQueues and ChannelUniqueQueues Signed-off-by: Andrew Thornton <art27@cantab.net> * Wire in UI for pausing Signed-off-by: Andrew Thornton <art27@cantab.net> * add testcases and fix a few issues Signed-off-by: Andrew Thornton <art27@cantab.net> * fix build Signed-off-by: Andrew Thornton <art27@cantab.net> * prevent "race" in the test Signed-off-by: Andrew Thornton <art27@cantab.net> * fix jsoniter mismerge Signed-off-by: Andrew Thornton <art27@cantab.net> * fix conflicts Signed-off-by: Andrew Thornton <art27@cantab.net> * fix format Signed-off-by: Andrew Thornton <art27@cantab.net> * Add warnings for no worker configurations and prevent data-loss with redis/levelqueue Signed-off-by: Andrew Thornton <art27@cantab.net> * Use StopTimer Signed-off-by: Andrew Thornton <art27@cantab.net> Co-authored-by: Lauris BH <lauris@nix.lv> Co-authored-by: 6543 <6543@obermui.de> Co-authored-by: techknowlogick <techknowlogick@gitea.io> Co-authored-by: wxiaoguang <wxiaoguang@gmail.com>
This commit is contained in:
parent
27ee01e1e8
commit
a82fd98d53
34 changed files with 1389 additions and 122 deletions
|
@ -51,7 +51,20 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
|
|||
}
|
||||
config := configInterface.(PersistableChannelQueueConfiguration)
|
||||
|
||||
channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
|
||||
queue := &PersistableChannelQueue{
|
||||
closed: make(chan struct{}),
|
||||
}
|
||||
|
||||
wrappedHandle := func(data ...Data) (failed []Data) {
|
||||
for _, unhandled := range handle(data...) {
|
||||
if fail := queue.PushBack(unhandled); fail != nil {
|
||||
failed = append(failed, fail)
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
channelQueue, err := NewChannelQueue(wrappedHandle, ChannelQueueConfiguration{
|
||||
WorkerPoolConfiguration: WorkerPoolConfiguration{
|
||||
QueueLength: config.QueueLength,
|
||||
BatchLength: config.BatchLength,
|
||||
|
@ -84,15 +97,12 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
|
|||
DataDir: config.DataDir,
|
||||
}
|
||||
|
||||
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
|
||||
levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
|
||||
if err == nil {
|
||||
queue := &PersistableChannelQueue{
|
||||
channelQueue: channelQueue.(*ChannelQueue),
|
||||
delayedStarter: delayedStarter{
|
||||
internal: levelQueue.(*LevelQueue),
|
||||
name: config.Name,
|
||||
},
|
||||
closed: make(chan struct{}),
|
||||
queue.channelQueue = channelQueue.(*ChannelQueue)
|
||||
queue.delayedStarter = delayedStarter{
|
||||
internal: levelQueue.(*LevelQueue),
|
||||
name: config.Name,
|
||||
}
|
||||
_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
|
||||
return queue, nil
|
||||
|
@ -102,16 +112,13 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
|
|||
return nil, ErrInvalidConfiguration{cfg: cfg}
|
||||
}
|
||||
|
||||
queue := &PersistableChannelQueue{
|
||||
channelQueue: channelQueue.(*ChannelQueue),
|
||||
delayedStarter: delayedStarter{
|
||||
cfg: levelCfg,
|
||||
underlying: LevelQueueType,
|
||||
timeout: config.Timeout,
|
||||
maxAttempts: config.MaxAttempts,
|
||||
name: config.Name,
|
||||
},
|
||||
closed: make(chan struct{}),
|
||||
queue.channelQueue = channelQueue.(*ChannelQueue)
|
||||
queue.delayedStarter = delayedStarter{
|
||||
cfg: levelCfg,
|
||||
underlying: LevelQueueType,
|
||||
timeout: config.Timeout,
|
||||
maxAttempts: config.MaxAttempts,
|
||||
name: config.Name,
|
||||
}
|
||||
_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
|
||||
return queue, nil
|
||||
|
@ -132,6 +139,19 @@ func (q *PersistableChannelQueue) Push(data Data) error {
|
|||
}
|
||||
}
|
||||
|
||||
// PushBack will push the indexer data to queue
|
||||
func (q *PersistableChannelQueue) PushBack(data Data) error {
|
||||
select {
|
||||
case <-q.closed:
|
||||
if pbr, ok := q.internal.(PushBackable); ok {
|
||||
return pbr.PushBack(data)
|
||||
}
|
||||
return q.internal.Push(data)
|
||||
default:
|
||||
return q.channelQueue.Push(data)
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts to run the queue
|
||||
func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
|
||||
log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
|
||||
|
@ -226,6 +246,48 @@ func (q *PersistableChannelQueue) IsEmpty() bool {
|
|||
return q.internal.IsEmpty()
|
||||
}
|
||||
|
||||
// IsPaused returns if the pool is paused
|
||||
func (q *PersistableChannelQueue) IsPaused() bool {
|
||||
return q.channelQueue.IsPaused()
|
||||
}
|
||||
|
||||
// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed
|
||||
func (q *PersistableChannelQueue) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) {
|
||||
return q.channelQueue.IsPausedIsResumed()
|
||||
}
|
||||
|
||||
// Pause pauses the WorkerPool
|
||||
func (q *PersistableChannelQueue) Pause() {
|
||||
q.channelQueue.Pause()
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if q.internal == nil {
|
||||
return
|
||||
}
|
||||
|
||||
pausable, ok := q.internal.(Pausable)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
pausable.Pause()
|
||||
}
|
||||
|
||||
// Resume resumes the WorkerPool
|
||||
func (q *PersistableChannelQueue) Resume() {
|
||||
q.channelQueue.Resume()
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
if q.internal == nil {
|
||||
return
|
||||
}
|
||||
|
||||
pausable, ok := q.internal.(Pausable)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
pausable.Resume()
|
||||
}
|
||||
|
||||
// Shutdown processing this queue
|
||||
func (q *PersistableChannelQueue) Shutdown() {
|
||||
log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue