Make Requests Processes and create process hierarchy. Associate OpenRepository with context. (#17125)
This PR registers requests with the process manager and manages hierarchy within the processes. Git repos are then associated with a context, (usually the request's context) - with sub commands using this context as their base context. Signed-off-by: Andrew Thornton <art27@cantab.net>
This commit is contained in:
parent
d894c90b70
commit
01087e9eef
66 changed files with 591 additions and 306 deletions
|
@ -82,11 +82,10 @@ func (t *Task) RunWithUser(doer *user_model.User, config Config) {
|
|||
}
|
||||
}()
|
||||
graceful.GetManager().RunWithShutdownContext(func(baseCtx context.Context) {
|
||||
ctx, cancel := context.WithCancel(baseCtx)
|
||||
defer cancel()
|
||||
pm := process.GetManager()
|
||||
pid := pm.Add(config.FormatMessage(t.Name, "process", doer), cancel)
|
||||
defer pm.Remove(pid)
|
||||
ctx, _, finished := pm.AddContext(baseCtx, config.FormatMessage(t.Name, "process", doer))
|
||||
defer finished()
|
||||
|
||||
if err := t.fun(ctx, doer, config); err != nil {
|
||||
if db.IsErrCancelled(err) {
|
||||
message := err.(db.ErrCancelled).Message
|
||||
|
|
|
@ -1303,8 +1303,9 @@ func GetDiff(gitRepo *git.Repository, opts *DiffOptions, files ...string) (*Diff
|
|||
return nil, err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(git.DefaultContext, time.Duration(setting.Git.Timeout.Default)*time.Second)
|
||||
defer cancel()
|
||||
timeout := time.Duration(setting.Git.Timeout.Default) * time.Second
|
||||
ctx, _, finished := process.GetManager().AddContextTimeout(gitRepo.Ctx, timeout, fmt.Sprintf("GetDiffRange [repo_path: %s]", repoPath))
|
||||
defer finished()
|
||||
|
||||
argsLength := 6
|
||||
if len(opts.WhitespaceBehavior) > 0 {
|
||||
|
@ -1369,9 +1370,6 @@ func GetDiff(gitRepo *git.Repository, opts *DiffOptions, files ...string) (*Diff
|
|||
return nil, fmt.Errorf("error during Start: %w", err)
|
||||
}
|
||||
|
||||
pid := process.GetManager().Add(fmt.Sprintf("GetDiffRange [repo_path: %s]", repoPath), cancel)
|
||||
defer process.GetManager().Remove(pid)
|
||||
|
||||
diff, err := ParsePatch(opts.MaxLines, opts.MaxLineCharacters, opts.MaxFiles, stdout, parsePatchSkipToFile)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to ParsePatch: %w", err)
|
||||
|
|
|
@ -7,7 +7,6 @@ package mailer
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -258,11 +257,10 @@ func (s *sendmailSender) Send(from string, to []string, msg io.WriterTo) error {
|
|||
args = append(args, to...)
|
||||
log.Trace("Sending with: %s %v", setting.MailService.SendmailPath, args)
|
||||
|
||||
pm := process.GetManager()
|
||||
desc := fmt.Sprintf("SendMail: %s %v", setting.MailService.SendmailPath, args)
|
||||
|
||||
ctx, cancel := context.WithTimeout(graceful.GetManager().HammerContext(), setting.MailService.SendmailTimeout)
|
||||
defer cancel()
|
||||
ctx, _, finished := process.GetManager().AddContextTimeout(graceful.GetManager().HammerContext(), setting.MailService.SendmailTimeout, desc)
|
||||
defer finished()
|
||||
|
||||
cmd := exec.CommandContext(ctx, setting.MailService.SendmailPath, args...)
|
||||
pipe, err := cmd.StdinPipe()
|
||||
|
@ -272,18 +270,17 @@ func (s *sendmailSender) Send(from string, to []string, msg io.WriterTo) error {
|
|||
}
|
||||
|
||||
if err = cmd.Start(); err != nil {
|
||||
_ = pipe.Close()
|
||||
return err
|
||||
}
|
||||
|
||||
pid := pm.Add(desc, cancel)
|
||||
|
||||
_, err = msg.WriteTo(pipe)
|
||||
|
||||
// we MUST close the pipe or sendmail will hang waiting for more of the message
|
||||
// Also we should wait on our sendmail command even if something fails
|
||||
closeError = pipe.Close()
|
||||
waitError = cmd.Wait()
|
||||
pm.Remove(pid)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
} else if closeError != nil {
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"code.gitea.io/gitea/modules/lfs"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/notification"
|
||||
"code.gitea.io/gitea/modules/process"
|
||||
repo_module "code.gitea.io/gitea/modules/repository"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
"code.gitea.io/gitea/modules/timeutil"
|
||||
|
@ -192,7 +193,7 @@ func runSync(ctx context.Context, m *models.Mirror) ([]*mirrorSyncResult, bool)
|
|||
}
|
||||
gitArgs = append(gitArgs, m.GetRemoteName())
|
||||
|
||||
remoteAddr, remoteErr := git.GetRemoteAddress(repoPath, m.GetRemoteName())
|
||||
remoteAddr, remoteErr := git.GetRemoteAddress(ctx, repoPath, m.GetRemoteName())
|
||||
if remoteErr != nil {
|
||||
log.Error("GetRemoteAddress Error %v", remoteErr)
|
||||
}
|
||||
|
@ -287,7 +288,7 @@ func runSync(ctx context.Context, m *models.Mirror) ([]*mirrorSyncResult, bool)
|
|||
// sanitize the output, since it may contain the remote address, which may
|
||||
// contain a password
|
||||
|
||||
remoteAddr, remoteErr := git.GetRemoteAddress(wikiPath, m.GetRemoteName())
|
||||
remoteAddr, remoteErr := git.GetRemoteAddress(ctx, wikiPath, m.GetRemoteName())
|
||||
if remoteErr != nil {
|
||||
log.Error("GetRemoteAddress Error %v", remoteErr)
|
||||
}
|
||||
|
@ -367,6 +368,9 @@ func SyncPullMirror(ctx context.Context, repoID int64) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Syncing Mirror %s/%s", m.Repo.OwnerName, m.Repo.Name))
|
||||
defer finished()
|
||||
|
||||
log.Trace("SyncMirrors [repo: %-v]: Running Sync", m.Repo)
|
||||
results, ok := runSync(ctx, m)
|
||||
if !ok {
|
||||
|
@ -385,7 +389,7 @@ func SyncPullMirror(ctx context.Context, repoID int64) bool {
|
|||
log.Trace("SyncMirrors [repo: %-v]: no branches updated", m.Repo)
|
||||
} else {
|
||||
log.Trace("SyncMirrors [repo: %-v]: %d branches updated", m.Repo, len(results))
|
||||
gitRepo, err = git.OpenRepository(m.Repo.RepoPath())
|
||||
gitRepo, err = git.OpenRepositoryCtx(ctx, m.Repo.RepoPath())
|
||||
if err != nil {
|
||||
log.Error("OpenRepository [%d]: %v", m.RepoID, err)
|
||||
return false
|
||||
|
|
|
@ -7,6 +7,7 @@ package mirror
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"regexp"
|
||||
"time"
|
||||
|
@ -15,6 +16,7 @@ import (
|
|||
"code.gitea.io/gitea/modules/git"
|
||||
"code.gitea.io/gitea/modules/lfs"
|
||||
"code.gitea.io/gitea/modules/log"
|
||||
"code.gitea.io/gitea/modules/process"
|
||||
"code.gitea.io/gitea/modules/repository"
|
||||
"code.gitea.io/gitea/modules/setting"
|
||||
"code.gitea.io/gitea/modules/timeutil"
|
||||
|
@ -92,6 +94,9 @@ func SyncPushMirror(ctx context.Context, mirrorID int64) bool {
|
|||
|
||||
m.LastError = ""
|
||||
|
||||
ctx, _, finished := process.GetManager().AddContext(ctx, fmt.Sprintf("Syncing PushMirror %s/%s to %s", m.Repo.OwnerName, m.Repo.Name, m.RemoteName))
|
||||
defer finished()
|
||||
|
||||
log.Trace("SyncPushMirror [mirror: %d][repo: %-v]: Running Sync", m.ID, m.Repo)
|
||||
err = runPushSync(ctx, m)
|
||||
if err != nil {
|
||||
|
@ -116,7 +121,7 @@ func runPushSync(ctx context.Context, m *models.PushMirror) error {
|
|||
timeout := time.Duration(setting.Git.Timeout.Mirror) * time.Second
|
||||
|
||||
performPush := func(path string) error {
|
||||
remoteAddr, err := git.GetRemoteAddress(path, m.RemoteName)
|
||||
remoteAddr, err := git.GetRemoteAddress(ctx, path, m.RemoteName)
|
||||
if err != nil {
|
||||
log.Error("GetRemoteAddress(%s) Error %v", path, err)
|
||||
return errors.New("Unexpected error")
|
||||
|
@ -125,7 +130,7 @@ func runPushSync(ctx context.Context, m *models.PushMirror) error {
|
|||
if setting.LFS.StartServer {
|
||||
log.Trace("SyncMirrors [repo: %-v]: syncing LFS objects...", m.Repo)
|
||||
|
||||
gitRepo, err := git.OpenRepository(path)
|
||||
gitRepo, err := git.OpenRepositoryCtx(ctx, path)
|
||||
if err != nil {
|
||||
log.Error("OpenRepository: %v", err)
|
||||
return errors.New("Unexpected error")
|
||||
|
@ -141,7 +146,7 @@ func runPushSync(ctx context.Context, m *models.PushMirror) error {
|
|||
|
||||
log.Trace("Pushing %s mirror[%d] remote %s", path, m.ID, m.RemoteName)
|
||||
|
||||
if err := git.Push(path, git.PushOptions{
|
||||
if err := git.Push(ctx, path, git.PushOptions{
|
||||
Remote: m.RemoteName,
|
||||
Force: true,
|
||||
Mirror: true,
|
||||
|
@ -162,7 +167,7 @@ func runPushSync(ctx context.Context, m *models.PushMirror) error {
|
|||
|
||||
if m.Repo.HasWiki() {
|
||||
wikiPath := m.Repo.WikiPath()
|
||||
_, err := git.GetRemoteAddress(wikiPath, m.RemoteName)
|
||||
_, err := git.GetRemoteAddress(ctx, wikiPath, m.RemoteName)
|
||||
if err == nil {
|
||||
err := performPush(wikiPath)
|
||||
if err != nil {
|
||||
|
|
|
@ -112,7 +112,7 @@ func GetPullRequestCommitStatusState(pr *models.PullRequest) (structs.CommitStat
|
|||
if pr.Flow == models.PullRequestFlowGithub && !headGitRepo.IsBranchExist(pr.HeadBranch) {
|
||||
return "", errors.New("Head branch does not exist, can not merge")
|
||||
}
|
||||
if pr.Flow == models.PullRequestFlowAGit && !git.IsReferenceExist(headGitRepo.Path, pr.GetGitRefName()) {
|
||||
if pr.Flow == models.PullRequestFlowAGit && !git.IsReferenceExist(headGitRepo.Ctx, headGitRepo.Path, pr.GetGitRefName()) {
|
||||
return "", errors.New("Head branch does not exist, can not merge")
|
||||
}
|
||||
|
||||
|
|
|
@ -313,7 +313,7 @@ func AddTestPullRequestTask(doer *user_model.User, repoID int64, branch string,
|
|||
for _, pr := range prs {
|
||||
divergence, err := GetDiverging(pr)
|
||||
if err != nil {
|
||||
if models.IsErrBranchDoesNotExist(err) && !git.IsBranchExist(pr.HeadRepo.RepoPath(), pr.HeadBranch) {
|
||||
if models.IsErrBranchDoesNotExist(err) && !git.IsBranchExist(ctx, pr.HeadRepo.RepoPath(), pr.HeadBranch) {
|
||||
log.Warn("Cannot test PR %s/%d: head_branch %s no longer exists", pr.BaseRepo.Name, pr.IssueID, pr.HeadBranch)
|
||||
} else {
|
||||
log.Error("GetDiverging: %v", err)
|
||||
|
@ -431,7 +431,7 @@ func pushToBaseRepoHelper(pr *models.PullRequest, prefixHeadBranch string) (err
|
|||
|
||||
gitRefName := pr.GetGitRefName()
|
||||
|
||||
if err := git.Push(headRepoPath, git.PushOptions{
|
||||
if err := git.Push(git.DefaultContext, headRepoPath, git.PushOptions{
|
||||
Remote: baseRepoPath,
|
||||
Branch: prefixHeadBranch + pr.HeadBranch + ":" + gitRefName,
|
||||
Force: true,
|
||||
|
|
|
@ -152,7 +152,7 @@ func createTemporaryRepo(pr *models.PullRequest) (string, error) {
|
|||
if err := models.RemoveTemporaryPath(tmpBasePath); err != nil {
|
||||
log.Error("CreateTempRepo: RemoveTemporaryPath: %s", err)
|
||||
}
|
||||
if !git.IsBranchExist(pr.HeadRepo.RepoPath(), pr.HeadBranch) {
|
||||
if !git.IsBranchExist(git.DefaultContext, pr.HeadRepo.RepoPath(), pr.HeadBranch) {
|
||||
return "", models.ErrBranchDoesNotExist{
|
||||
BranchName: pr.HeadBranch,
|
||||
}
|
||||
|
|
|
@ -24,13 +24,13 @@ func CreateNewBranch(doer *user_model.User, repo *models.Repository, oldBranchNa
|
|||
return err
|
||||
}
|
||||
|
||||
if !git.IsBranchExist(repo.RepoPath(), oldBranchName) {
|
||||
if !git.IsBranchExist(git.DefaultContext, repo.RepoPath(), oldBranchName) {
|
||||
return models.ErrBranchDoesNotExist{
|
||||
BranchName: oldBranchName,
|
||||
}
|
||||
}
|
||||
|
||||
if err := git.Push(repo.RepoPath(), git.PushOptions{
|
||||
if err := git.Push(git.DefaultContext, repo.RepoPath(), git.PushOptions{
|
||||
Remote: repo.RepoPath(),
|
||||
Branch: fmt.Sprintf("%s:%s%s", oldBranchName, git.BranchPrefix, branchName),
|
||||
Env: models.PushingEnvironment(doer, repo),
|
||||
|
@ -106,7 +106,7 @@ func CreateNewBranchFromCommit(doer *user_model.User, repo *models.Repository, c
|
|||
return err
|
||||
}
|
||||
|
||||
if err := git.Push(repo.RepoPath(), git.PushOptions{
|
||||
if err := git.Push(git.DefaultContext, repo.RepoPath(), git.PushOptions{
|
||||
Remote: repo.RepoPath(),
|
||||
Branch: fmt.Sprintf("%s:%s%s", commit, git.BranchPrefix, branchName),
|
||||
Env: models.PushingEnvironment(doer, repo),
|
||||
|
|
|
@ -264,7 +264,7 @@ func (t *TemporaryUploadRepository) CommitTreeWithDate(author, committer *user_m
|
|||
func (t *TemporaryUploadRepository) Push(doer *user_model.User, commitHash string, branch string) error {
|
||||
// Because calls hooks we need to pass in the environment
|
||||
env := models.PushingEnvironment(doer, t.repo)
|
||||
if err := git.Push(t.basePath, git.PushOptions{
|
||||
if err := git.Push(t.gitRepo.Ctx, t.basePath, git.PushOptions{
|
||||
Remote: t.repo.RepoPath(),
|
||||
Branch: strings.TrimSpace(commitHash) + ":refs/heads/" + strings.TrimSpace(branch),
|
||||
Env: env,
|
||||
|
|
|
@ -5,7 +5,6 @@
|
|||
package task
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
@ -99,11 +98,9 @@ func runMigrateTask(t *models.Task) (err error) {
|
|||
|
||||
opts.MigrateToRepoID = t.RepoID
|
||||
|
||||
ctx, cancel := context.WithCancel(graceful.GetManager().ShutdownContext())
|
||||
defer cancel()
|
||||
pm := process.GetManager()
|
||||
pid := pm.Add(fmt.Sprintf("MigrateTask: %s/%s", t.Owner.Name, opts.RepoName), cancel)
|
||||
defer pm.Remove(pid)
|
||||
ctx, _, finished := pm.AddContext(graceful.GetManager().ShutdownContext(), fmt.Sprintf("MigrateTask: %s/%s", t.Owner.Name, opts.RepoName))
|
||||
defer finished()
|
||||
|
||||
t.StartTime = timeutil.TimeStampNow()
|
||||
t.Status = structs.TaskStatusRunning
|
||||
|
|
|
@ -128,7 +128,7 @@ func updateWikiPage(doer *user_model.User, repo *models.Repository, oldWikiName,
|
|||
return fmt.Errorf("InitWiki: %v", err)
|
||||
}
|
||||
|
||||
hasMasterBranch := git.IsBranchExist(repo.WikiPath(), "master")
|
||||
hasMasterBranch := git.IsBranchExist(git.DefaultContext, repo.WikiPath(), "master")
|
||||
|
||||
basePath, err := models.CreateTemporaryPath("update-wiki")
|
||||
if err != nil {
|
||||
|
@ -243,7 +243,7 @@ func updateWikiPage(doer *user_model.User, repo *models.Repository, oldWikiName,
|
|||
return err
|
||||
}
|
||||
|
||||
if err := git.Push(basePath, git.PushOptions{
|
||||
if err := git.Push(gitRepo.Ctx, basePath, git.PushOptions{
|
||||
Remote: "origin",
|
||||
Branch: fmt.Sprintf("%s:%s%s", commitHash.String(), git.BranchPrefix, "master"),
|
||||
Env: models.FullPushingEnvironment(
|
||||
|
@ -357,7 +357,7 @@ func DeleteWikiPage(doer *user_model.User, repo *models.Repository, wikiName str
|
|||
return err
|
||||
}
|
||||
|
||||
if err := git.Push(basePath, git.PushOptions{
|
||||
if err := git.Push(gitRepo.Ctx, basePath, git.PushOptions{
|
||||
Remote: "origin",
|
||||
Branch: fmt.Sprintf("%s:%s%s", commitHash.String(), git.BranchPrefix, "master"),
|
||||
Env: models.PushingEnvironment(doer, repo),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue