Ignore Sync errors on pipes when doing CheckAttributeReader.CheckPath, fix the hang of git cat-file (#17096)

* Ignore Sync errors on pipes when doing `CheckAttributeReader.CheckPath`

* apply env patch

* Drop the Sync and fix a number of issues with the Close function

Signed-off-by: Andrew Thornton <art27@cantab.net>

* add logs for DBIndexer and CheckPath

* Fix some more closing bugs

Signed-off-by: Andrew Thornton <art27@cantab.net>

* Add test case for language_stats

Signed-off-by: Andrew Thornton <art27@cantab.net>

* Update modules/indexer/stats/db.go

Co-authored-by: Lauris BH <lauris@nix.lv>

Co-authored-by: Andrew Thornton <art27@cantab.net>
Co-authored-by: Lauris BH <lauris@nix.lv>
Co-authored-by: 6543 <6543@obermui.de>
This commit is contained in:
wxiaoguang 2021-09-21 03:46:51 +08:00 committed by GitHub
parent 5ac857f4d4
commit b231d0deab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 960 additions and 65 deletions

View file

@ -12,6 +12,8 @@ import (
"os"
"strconv"
"strings"
"code.gitea.io/gitea/modules/log"
)
// CheckAttributeOpts represents the possible options to CheckAttribute
@ -112,42 +114,48 @@ func (c *CheckAttributeReader) Init(ctx context.Context) error {
if len(c.IndexFile) > 0 && CheckGitVersionAtLeast("1.7.8") == nil {
cmdArgs = append(cmdArgs, "--cached")
c.env = []string{"GIT_INDEX_FILE=" + c.IndexFile}
c.env = append(c.env, "GIT_INDEX_FILE="+c.IndexFile)
}
if len(c.WorkTree) > 0 && CheckGitVersionAtLeast("1.7.8") == nil {
c.env = []string{"GIT_WORK_TREE=" + c.WorkTree}
c.env = append(c.env, "GIT_WORK_TREE="+c.WorkTree)
}
if len(c.Attributes) > 0 {
cmdArgs = append(cmdArgs, c.Attributes...)
cmdArgs = append(cmdArgs, "--")
} else {
c.env = append(c.env, "GIT_FLUSH=1")
if len(c.Attributes) == 0 {
lw := new(nulSeparatedAttributeWriter)
lw.attributes = make(chan attributeTriple)
lw.closed = make(chan struct{})
c.stdOut = lw
c.stdOut.Close()
return fmt.Errorf("no provided Attributes to check")
}
cmdArgs = append(cmdArgs, c.Attributes...)
cmdArgs = append(cmdArgs, "--")
c.ctx, c.cancel = context.WithCancel(ctx)
c.cmd = NewCommandContext(c.ctx, cmdArgs...)
var err error
c.stdinReader, c.stdinWriter, err = os.Pipe()
if err != nil {
c.cancel()
return err
}
if CheckGitVersionAtLeast("1.8.5") == nil {
lw := new(nulSeparatedAttributeWriter)
lw.attributes = make(chan attributeTriple, 5)
lw.closed = make(chan struct{})
c.stdOut = lw
} else {
lw := new(lineSeparatedAttributeWriter)
lw.attributes = make(chan attributeTriple, 5)
lw.closed = make(chan struct{})
c.stdOut = lw
}
return nil
@ -155,13 +163,14 @@ func (c *CheckAttributeReader) Init(ctx context.Context) error {
// Run run cmd
func (c *CheckAttributeReader) Run() error {
defer func() {
_ = c.Close()
}()
stdErr := new(bytes.Buffer)
err := c.cmd.RunInDirTimeoutEnvFullPipelineFunc(c.env, -1, c.Repo.Path, c.stdOut, stdErr, c.stdinReader, func(_ context.Context, _ context.CancelFunc) error {
close(c.running)
return nil
})
defer c.cancel()
_ = c.stdOut.Close()
if err != nil && c.ctx.Err() != nil && err.Error() != "signal: killed" {
return fmt.Errorf("failed to run attr-check. Error: %w\nStderr: %s", err, stdErr.String())
}
@ -170,27 +179,31 @@ func (c *CheckAttributeReader) Run() error {
}
// CheckPath check attr for given path
func (c *CheckAttributeReader) CheckPath(path string) (map[string]string, error) {
func (c *CheckAttributeReader) CheckPath(path string) (rs map[string]string, err error) {
defer func() {
if err != nil {
log.Error("CheckPath returns error: %v", err)
}
}()
select {
case <-c.ctx.Done():
return nil, c.ctx.Err()
case <-c.running:
}
if _, err := c.stdinWriter.Write([]byte(path + "\x00")); err != nil {
defer c.cancel()
if _, err = c.stdinWriter.Write([]byte(path + "\x00")); err != nil {
defer c.Close()
return nil, err
}
if err := c.stdinWriter.Sync(); err != nil {
defer c.cancel()
return nil, err
}
rs := make(map[string]string)
rs = make(map[string]string)
for range c.Attributes {
select {
case attr := <-c.stdOut.ReadAttribute():
case attr, ok := <-c.stdOut.ReadAttribute():
if !ok {
return nil, c.ctx.Err()
}
rs[attr.Attribute] = attr.Value
case <-c.ctx.Done():
return nil, c.ctx.Err()
@ -201,13 +214,16 @@ func (c *CheckAttributeReader) CheckPath(path string) (map[string]string, error)
// Close close pip after use
func (c *CheckAttributeReader) Close() error {
err := c.stdinWriter.Close()
_ = c.stdinReader.Close()
_ = c.stdOut.Close()
c.cancel()
select {
case <-c.running:
default:
close(c.running)
}
defer c.cancel()
return c.stdinWriter.Close()
return err
}
type attributeWriter interface {
@ -224,6 +240,7 @@ type attributeTriple struct {
type nulSeparatedAttributeWriter struct {
tmp []byte
attributes chan attributeTriple
closed chan struct{}
working attributeTriple
pos int
}
@ -267,13 +284,20 @@ func (wr *nulSeparatedAttributeWriter) ReadAttribute() <-chan attributeTriple {
}
func (wr *nulSeparatedAttributeWriter) Close() error {
select {
case <-wr.closed:
return nil
default:
}
close(wr.attributes)
close(wr.closed)
return nil
}
type lineSeparatedAttributeWriter struct {
tmp []byte
attributes chan attributeTriple
closed chan struct{}
}
func (wr *lineSeparatedAttributeWriter) Write(p []byte) (n int, err error) {
@ -356,6 +380,12 @@ func (wr *lineSeparatedAttributeWriter) ReadAttribute() <-chan attributeTriple {
}
func (wr *lineSeparatedAttributeWriter) Close() error {
select {
case <-wr.closed:
return nil
default:
}
close(wr.attributes)
close(wr.closed)
return nil
}