From 1fa57347aae05ce5c69e49447297a0be8346d62f Mon Sep 17 00:00:00 2001 From: golangci Date: Sun, 6 May 2018 12:08:57 +0300 Subject: [PATCH] concurrency implemented --- .gitignore | 2 ++ cmd/golangci-lint/main.go | 2 -- internal/commands/root.go | 3 ++ internal/commands/run.go | 11 +++--- pkg/config/config.go | 1 + pkg/runner.go | 73 +++++++++++++++++++++++++++++++++------ 6 files changed, 76 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index e69de29b..c3fc4df0 100644 --- a/.gitignore +++ b/.gitignore @@ -0,0 +1,2 @@ +/*.txt +/*.pprof diff --git a/cmd/golangci-lint/main.go b/cmd/golangci-lint/main.go index 1de123dc..0f26afcb 100644 --- a/cmd/golangci-lint/main.go +++ b/cmd/golangci-lint/main.go @@ -2,7 +2,6 @@ package main import ( "log" - "runtime" "github.com/golangci/golangci-lint/internal/commands" "github.com/golangci/golangci-shared/pkg/analytics" @@ -12,7 +11,6 @@ import ( func main() { log.SetFlags(0) // don't print time analytics.SetLogLevel(logrus.WarnLevel) - runtime.GOMAXPROCS(runtime.NumCPU()) e := commands.NewExecutor() if err := e.Execute(); err != nil { diff --git a/internal/commands/root.go b/internal/commands/root.go index ab3cddc8..e979461b 100644 --- a/internal/commands/root.go +++ b/internal/commands/root.go @@ -1,6 +1,8 @@ package commands import ( + "runtime" + "github.com/spf13/cobra" ) @@ -15,5 +17,6 @@ func (e *Executor) initRoot() { } rootCmd.PersistentFlags().BoolVarP(&e.cfg.Common.IsVerbose, "verbose", "v", false, "verbose output") rootCmd.PersistentFlags().StringVar(&e.cfg.Common.CPUProfilePath, "cpu-profile-path", "", "Path to CPU profile output file") + rootCmd.PersistentFlags().IntVarP(&e.cfg.Common.Concurrency, "concurrency", "j", runtime.NumCPU(), "Concurrency") e.rootCmd = rootCmd } diff --git a/internal/commands/run.go b/internal/commands/run.go index 9f410311..d0e3aec0 100644 --- a/internal/commands/run.go +++ b/internal/commands/run.go @@ -6,6 +6,7 @@ import ( "fmt" "log" "os" + "runtime" "runtime/pprof" "strings" @@ -48,6 +49,8 @@ func (e *Executor) initRun() { func (e Executor) executeRun(cmd *cobra.Command, args []string) { f := func() (error, int) { + runtime.GOMAXPROCS(e.cfg.Common.Concurrency) + if e.cfg.Common.CPUProfilePath != "" { f, err := os.Create(e.cfg.Common.CPUProfilePath) if err != nil { @@ -82,7 +85,7 @@ func (e Executor) executeRun(cmd *cobra.Command, args []string) { }, } - issues, err := runner.Run(ctx, golinters.GetSupportedLinters(), exec, &e.cfg.Run) + issues, err := runner.Run(ctx, golinters.GetSupportedLinters(), exec, e.cfg) if err != nil { return err, 1 } @@ -112,7 +115,7 @@ func outputIssues(format string, issues []result.Issue) error { if format == config.OutFormatColoredLineNumber { outStr = color.GreenString(outStr) } - log.Print(outStr) + fmt.Fprint(os.Stdout, outStr) } for _, i := range issues { @@ -120,7 +123,7 @@ func outputIssues(format string, issues []result.Issue) error { if format == config.OutFormatColoredLineNumber { text = color.RedString(text) } - log.Printf("%s:%d: %s", i.File, i.LineNumber, text) + fmt.Fprintf(os.Stdout, "%s:%d: %s\n", i.File, i.LineNumber, text) } return nil } @@ -130,7 +133,7 @@ func outputIssues(format string, issues []result.Issue) error { if err != nil { return err } - log.Print(string(outputJSON)) + fmt.Fprint(os.Stdout, string(outputJSON)) return nil } diff --git a/pkg/config/config.go b/pkg/config/config.go index e2968356..78e65cc1 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -15,6 +15,7 @@ var OutFormats = []string{OutFormatColoredLineNumber, OutFormatLineNumber, OutFo type Common struct { IsVerbose bool CPUProfilePath string + Concurrency int } type Run struct { diff --git a/pkg/runner.go b/pkg/runner.go index 47568db4..c14cc4c3 100644 --- a/pkg/runner.go +++ b/pkg/runner.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "os" + "sync" "github.com/golangci/golangci-lint/pkg/config" "github.com/golangci/golangci-lint/pkg/result" @@ -13,35 +14,87 @@ import ( ) type Runner interface { - Run(ctx context.Context, linters []Linter, exec executors.Executor, cfg *config.Run) ([]result.Issue, error) + Run(ctx context.Context, linters []Linter, exec executors.Executor, cfg *config.Config) ([]result.Issue, error) } type SimpleRunner struct { Processors []processors.Processor } -func (r SimpleRunner) Run(ctx context.Context, linters []Linter, exec executors.Executor, cfg *config.Run) ([]result.Issue, error) { - results := []result.Result{} +type lintRes struct { + linter Linter + err error + res *result.Result +} + +func runLinters(ctx context.Context, wg *sync.WaitGroup, tasksCh chan Linter, lintResultsCh chan lintRes, exec executors.Executor, cfg *config.Config) { + for i := 0; i < cfg.Common.Concurrency; i++ { + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + // XXX: if check it in a select with reading from tasksCh + // it's possible to not enter to this case until tasksCh is empty. + return + default: + } + + select { + case <-ctx.Done(): + return + case linter, ok := <-tasksCh: + if !ok { + return + } + res, lerr := linter.Run(ctx, exec, &cfg.Run) + lintResultsCh <- lintRes{ + linter: linter, + err: lerr, + res: res, + } + } + } + }() + } +} + +func (r SimpleRunner) Run(ctx context.Context, linters []Linter, exec executors.Executor, cfg *config.Config) ([]result.Issue, error) { savedStdout, savedStderr := os.Stdout, os.Stderr devNull, err := os.Open(os.DevNull) if err != nil { return nil, fmt.Errorf("can't open null device %q: %s", os.DevNull, err) } + os.Stdout, os.Stderr = devNull, devNull + + lintResultsCh := make(chan lintRes, len(linters)) + tasksCh := make(chan Linter, cfg.Common.Concurrency) + var wg sync.WaitGroup + wg.Add(cfg.Common.Concurrency) + runLinters(ctx, &wg, tasksCh, lintResultsCh, exec, cfg) + for _, linter := range linters { - os.Stdout, os.Stderr = devNull, devNull - res, err := linter.Run(ctx, exec, cfg) - os.Stdout, os.Stderr = savedStdout, savedStderr - if err != nil { - analytics.Log(ctx).Warnf("Can't run linter %s: %s", linter.Name(), err) + tasksCh <- linter + } + + close(tasksCh) + wg.Wait() + close(lintResultsCh) + + os.Stdout, os.Stderr = savedStdout, savedStderr + results := []result.Result{} + for res := range lintResultsCh { + if res.err != nil { + analytics.Log(ctx).Warnf("Can't run linter %s: %s", res.linter.Name(), res.err) continue } - if len(res.Issues) == 0 { + if res.res == nil || len(res.res.Issues) == 0 { continue } - results = append(results, *res) + results = append(results, *res.res) } results, err = r.processResults(results)