Decouple HookTask from Repository (#17940)
At the moment a repository reference is needed for webhooks. With the upcoming package PR we need to send webhooks without a repository reference. For example a package is uploaded to an organization. In theory this enables the usage of webhooks for future user actions. This PR removes the repository id from `HookTask` and changes how the hooks are processed (see `services/webhook/deliver.go`). In a follow up PR I want to remove the usage of the `UniqueQueue´ and replace it with a normal queue because there is no reason to be unique. Co-authored-by: 6543 <6543@obermui.de>
This commit is contained in:
parent
e828564445
commit
1887c95254
12 changed files with 219 additions and 282 deletions
|
@ -1,6 +1,5 @@
|
|||
-
|
||||
id: 1
|
||||
repo_id: 1
|
||||
hook_id: 1
|
||||
uuid: uuid1
|
||||
is_delivered: true
|
||||
|
|
|
@ -123,6 +123,11 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if _, err := db.GetEngine(ctx).In("hook_id", builder.Select("id").From("webhook").Where(builder.Eq{"webhook.repo_id": repo.ID})).
|
||||
Delete(&webhook.HookTask{}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := db.DeleteBeans(ctx,
|
||||
&access_model.Access{RepoID: repo.ID},
|
||||
&activities_model.Action{RepoID: repo.ID},
|
||||
|
@ -130,7 +135,6 @@ func DeleteRepository(doer *user_model.User, uid, repoID int64) error {
|
|||
&issues_model.Comment{RefRepoID: repoID},
|
||||
&git_model.CommitStatus{RepoID: repoID},
|
||||
&git_model.DeletedBranch{RepoID: repoID},
|
||||
&webhook.HookTask{RepoID: repoID},
|
||||
&git_model.LFSLock{RepoID: repoID},
|
||||
&repo_model.LanguageStat{RepoID: repoID},
|
||||
&issues_model.Milestone{RepoID: repoID},
|
||||
|
|
|
@ -104,7 +104,6 @@ type HookResponse struct {
|
|||
// HookTask represents a hook task.
|
||||
type HookTask struct {
|
||||
ID int64 `xorm:"pk autoincr"`
|
||||
RepoID int64 `xorm:"INDEX"`
|
||||
HookID int64
|
||||
UUID string
|
||||
api.Payloader `xorm:"-"`
|
||||
|
@ -178,14 +177,29 @@ func HookTasks(hookID int64, page int) ([]*HookTask, error) {
|
|||
|
||||
// CreateHookTask creates a new hook task,
|
||||
// it handles conversion from Payload to PayloadContent.
|
||||
func CreateHookTask(t *HookTask) error {
|
||||
func CreateHookTask(ctx context.Context, t *HookTask) (*HookTask, error) {
|
||||
data, err := t.Payloader.JSONPayload()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
t.UUID = gouuid.New().String()
|
||||
t.PayloadContent = string(data)
|
||||
return db.Insert(db.DefaultContext, t)
|
||||
return t, db.Insert(ctx, t)
|
||||
}
|
||||
|
||||
func GetHookTaskByID(ctx context.Context, id int64) (*HookTask, error) {
|
||||
t := &HookTask{}
|
||||
|
||||
has, err := db.GetEngine(ctx).ID(id).Get(t)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !has {
|
||||
return nil, ErrHookTaskNotExist{
|
||||
TaskID: id,
|
||||
}
|
||||
}
|
||||
return t, nil
|
||||
}
|
||||
|
||||
// UpdateHookTask updates information of hook task.
|
||||
|
@ -195,53 +209,36 @@ func UpdateHookTask(t *HookTask) error {
|
|||
}
|
||||
|
||||
// ReplayHookTask copies a hook task to get re-delivered
|
||||
func ReplayHookTask(hookID int64, uuid string) (*HookTask, error) {
|
||||
var newTask *HookTask
|
||||
|
||||
err := db.WithTx(func(ctx context.Context) error {
|
||||
task := &HookTask{
|
||||
func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask, error) {
|
||||
task := &HookTask{
|
||||
HookID: hookID,
|
||||
UUID: uuid,
|
||||
}
|
||||
has, err := db.GetByBean(ctx, task)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if !has {
|
||||
return nil, ErrHookTaskNotExist{
|
||||
HookID: hookID,
|
||||
UUID: uuid,
|
||||
}
|
||||
has, err := db.GetByBean(ctx, task)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if !has {
|
||||
return ErrHookTaskNotExist{
|
||||
HookID: hookID,
|
||||
UUID: uuid,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
newTask = &HookTask{
|
||||
UUID: gouuid.New().String(),
|
||||
RepoID: task.RepoID,
|
||||
HookID: task.HookID,
|
||||
PayloadContent: task.PayloadContent,
|
||||
EventType: task.EventType,
|
||||
}
|
||||
return db.Insert(ctx, newTask)
|
||||
})
|
||||
|
||||
return newTask, err
|
||||
newTask := &HookTask{
|
||||
UUID: gouuid.New().String(),
|
||||
HookID: task.HookID,
|
||||
PayloadContent: task.PayloadContent,
|
||||
EventType: task.EventType,
|
||||
}
|
||||
return newTask, db.Insert(ctx, newTask)
|
||||
}
|
||||
|
||||
// FindUndeliveredHookTasks represents find the undelivered hook tasks
|
||||
func FindUndeliveredHookTasks() ([]*HookTask, error) {
|
||||
func FindUndeliveredHookTasks(ctx context.Context) ([]*HookTask, error) {
|
||||
tasks := make([]*HookTask, 0, 10)
|
||||
if err := db.GetEngine(db.DefaultContext).Where("is_delivered=?", false).Find(&tasks); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// FindRepoUndeliveredHookTasks represents find the undelivered hook tasks of one repository
|
||||
func FindRepoUndeliveredHookTasks(repoID int64) ([]*HookTask, error) {
|
||||
tasks := make([]*HookTask, 0, 5)
|
||||
if err := db.GetEngine(db.DefaultContext).Where("repo_id=? AND is_delivered=?", repoID, false).Find(&tasks); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return tasks, nil
|
||||
return tasks, db.GetEngine(ctx).
|
||||
Where("is_delivered=?", false).
|
||||
Find(&tasks)
|
||||
}
|
||||
|
||||
// CleanupHookTaskTable deletes rows from hook_task as needed.
|
||||
|
@ -250,7 +247,7 @@ func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType,
|
|||
|
||||
if cleanupType == OlderThan {
|
||||
deleteOlderThan := time.Now().Add(-olderThan).UnixNano()
|
||||
deletes, err := db.GetEngine(db.DefaultContext).
|
||||
deletes, err := db.GetEngine(ctx).
|
||||
Where("is_delivered = ? and delivered < ?", true, deleteOlderThan).
|
||||
Delete(new(HookTask))
|
||||
if err != nil {
|
||||
|
@ -259,7 +256,8 @@ func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType,
|
|||
log.Trace("Deleted %d rows from hook_task", deletes)
|
||||
} else if cleanupType == PerWebhook {
|
||||
hookIDs := make([]int64, 0, 10)
|
||||
err := db.GetEngine(db.DefaultContext).Table("webhook").
|
||||
err := db.GetEngine(ctx).
|
||||
Table("webhook").
|
||||
Where("id > 0").
|
||||
Cols("id").
|
||||
Find(&hookIDs)
|
||||
|
|
|
@ -19,13 +19,6 @@ import (
|
|||
"xorm.io/builder"
|
||||
)
|
||||
|
||||
// __ __ ___. .__ __
|
||||
// / \ / \ ____\_ |__ | |__ ____ ____ | | __
|
||||
// \ \/\/ // __ \| __ \| | \ / _ \ / _ \| |/ /
|
||||
// \ /\ ___/| \_\ \ Y ( <_> | <_> ) <
|
||||
// \__/\ / \___ >___ /___| /\____/ \____/|__|_ \
|
||||
// \/ \/ \/ \/ \/
|
||||
|
||||
// ErrWebhookNotExist represents a "WebhookNotExist" kind of error.
|
||||
type ErrWebhookNotExist struct {
|
||||
ID int64
|
||||
|
@ -47,6 +40,7 @@ func (err ErrWebhookNotExist) Unwrap() error {
|
|||
|
||||
// ErrHookTaskNotExist represents a "HookTaskNotExist" kind of error.
|
||||
type ErrHookTaskNotExist struct {
|
||||
TaskID int64
|
||||
HookID int64
|
||||
UUID string
|
||||
}
|
||||
|
@ -58,7 +52,7 @@ func IsErrHookTaskNotExist(err error) bool {
|
|||
}
|
||||
|
||||
func (err ErrHookTaskNotExist) Error() string {
|
||||
return fmt.Sprintf("hook task does not exist [hook: %d, uuid: %s]", err.HookID, err.UUID)
|
||||
return fmt.Sprintf("hook task does not exist [task: %d, hook: %d, uuid: %s]", err.TaskID, err.HookID, err.UUID)
|
||||
}
|
||||
|
||||
func (err ErrHookTaskNotExist) Unwrap() error {
|
||||
|
|
|
@ -208,12 +208,12 @@ func TestHookTasks(t *testing.T) {
|
|||
func TestCreateHookTask(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
hookTask := &HookTask{
|
||||
RepoID: 3,
|
||||
HookID: 3,
|
||||
Payloader: &api.PushPayload{},
|
||||
}
|
||||
unittest.AssertNotExistsBean(t, hookTask)
|
||||
assert.NoError(t, CreateHookTask(hookTask))
|
||||
_, err := CreateHookTask(db.DefaultContext, hookTask)
|
||||
assert.NoError(t, err)
|
||||
unittest.AssertExistsAndLoadBean(t, hookTask)
|
||||
}
|
||||
|
||||
|
@ -232,14 +232,14 @@ func TestUpdateHookTask(t *testing.T) {
|
|||
func TestCleanupHookTaskTable_PerWebhook_DeletesDelivered(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
hookTask := &HookTask{
|
||||
RepoID: 3,
|
||||
HookID: 3,
|
||||
Payloader: &api.PushPayload{},
|
||||
IsDelivered: true,
|
||||
Delivered: time.Now().UnixNano(),
|
||||
}
|
||||
unittest.AssertNotExistsBean(t, hookTask)
|
||||
assert.NoError(t, CreateHookTask(hookTask))
|
||||
_, err := CreateHookTask(db.DefaultContext, hookTask)
|
||||
assert.NoError(t, err)
|
||||
unittest.AssertExistsAndLoadBean(t, hookTask)
|
||||
|
||||
assert.NoError(t, CleanupHookTaskTable(context.Background(), PerWebhook, 168*time.Hour, 0))
|
||||
|
@ -249,13 +249,13 @@ func TestCleanupHookTaskTable_PerWebhook_DeletesDelivered(t *testing.T) {
|
|||
func TestCleanupHookTaskTable_PerWebhook_LeavesUndelivered(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
hookTask := &HookTask{
|
||||
RepoID: 2,
|
||||
HookID: 4,
|
||||
Payloader: &api.PushPayload{},
|
||||
IsDelivered: false,
|
||||
}
|
||||
unittest.AssertNotExistsBean(t, hookTask)
|
||||
assert.NoError(t, CreateHookTask(hookTask))
|
||||
_, err := CreateHookTask(db.DefaultContext, hookTask)
|
||||
assert.NoError(t, err)
|
||||
unittest.AssertExistsAndLoadBean(t, hookTask)
|
||||
|
||||
assert.NoError(t, CleanupHookTaskTable(context.Background(), PerWebhook, 168*time.Hour, 0))
|
||||
|
@ -265,14 +265,14 @@ func TestCleanupHookTaskTable_PerWebhook_LeavesUndelivered(t *testing.T) {
|
|||
func TestCleanupHookTaskTable_PerWebhook_LeavesMostRecentTask(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
hookTask := &HookTask{
|
||||
RepoID: 2,
|
||||
HookID: 4,
|
||||
Payloader: &api.PushPayload{},
|
||||
IsDelivered: true,
|
||||
Delivered: time.Now().UnixNano(),
|
||||
}
|
||||
unittest.AssertNotExistsBean(t, hookTask)
|
||||
assert.NoError(t, CreateHookTask(hookTask))
|
||||
_, err := CreateHookTask(db.DefaultContext, hookTask)
|
||||
assert.NoError(t, err)
|
||||
unittest.AssertExistsAndLoadBean(t, hookTask)
|
||||
|
||||
assert.NoError(t, CleanupHookTaskTable(context.Background(), PerWebhook, 168*time.Hour, 1))
|
||||
|
@ -282,14 +282,14 @@ func TestCleanupHookTaskTable_PerWebhook_LeavesMostRecentTask(t *testing.T) {
|
|||
func TestCleanupHookTaskTable_OlderThan_DeletesDelivered(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
hookTask := &HookTask{
|
||||
RepoID: 3,
|
||||
HookID: 3,
|
||||
Payloader: &api.PushPayload{},
|
||||
IsDelivered: true,
|
||||
Delivered: time.Now().AddDate(0, 0, -8).UnixNano(),
|
||||
}
|
||||
unittest.AssertNotExistsBean(t, hookTask)
|
||||
assert.NoError(t, CreateHookTask(hookTask))
|
||||
_, err := CreateHookTask(db.DefaultContext, hookTask)
|
||||
assert.NoError(t, err)
|
||||
unittest.AssertExistsAndLoadBean(t, hookTask)
|
||||
|
||||
assert.NoError(t, CleanupHookTaskTable(context.Background(), OlderThan, 168*time.Hour, 0))
|
||||
|
@ -299,13 +299,13 @@ func TestCleanupHookTaskTable_OlderThan_DeletesDelivered(t *testing.T) {
|
|||
func TestCleanupHookTaskTable_OlderThan_LeavesUndelivered(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
hookTask := &HookTask{
|
||||
RepoID: 2,
|
||||
HookID: 4,
|
||||
Payloader: &api.PushPayload{},
|
||||
IsDelivered: false,
|
||||
}
|
||||
unittest.AssertNotExistsBean(t, hookTask)
|
||||
assert.NoError(t, CreateHookTask(hookTask))
|
||||
_, err := CreateHookTask(db.DefaultContext, hookTask)
|
||||
assert.NoError(t, err)
|
||||
unittest.AssertExistsAndLoadBean(t, hookTask)
|
||||
|
||||
assert.NoError(t, CleanupHookTaskTable(context.Background(), OlderThan, 168*time.Hour, 0))
|
||||
|
@ -315,14 +315,14 @@ func TestCleanupHookTaskTable_OlderThan_LeavesUndelivered(t *testing.T) {
|
|||
func TestCleanupHookTaskTable_OlderThan_LeavesTaskEarlierThanAgeToDelete(t *testing.T) {
|
||||
assert.NoError(t, unittest.PrepareTestDatabase())
|
||||
hookTask := &HookTask{
|
||||
RepoID: 2,
|
||||
HookID: 4,
|
||||
Payloader: &api.PushPayload{},
|
||||
IsDelivered: true,
|
||||
Delivered: time.Now().AddDate(0, 0, -6).UnixNano(),
|
||||
}
|
||||
unittest.AssertNotExistsBean(t, hookTask)
|
||||
assert.NoError(t, CreateHookTask(hookTask))
|
||||
_, err := CreateHookTask(db.DefaultContext, hookTask)
|
||||
assert.NoError(t, err)
|
||||
unittest.AssertExistsAndLoadBean(t, hookTask)
|
||||
|
||||
assert.NoError(t, CleanupHookTaskTable(context.Background(), OlderThan, 168*time.Hour, 0))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue