concurrency implemented

This commit is contained in:
golangci 2018-05-06 12:08:57 +03:00
parent 16a24dc92b
commit 1fa57347aa
6 changed files with 76 additions and 16 deletions

2
.gitignore vendored
View File

@ -0,0 +1,2 @@
/*.txt
/*.pprof

View File

@ -2,7 +2,6 @@ package main
import (
"log"
"runtime"
"github.com/golangci/golangci-lint/internal/commands"
"github.com/golangci/golangci-shared/pkg/analytics"
@ -12,7 +11,6 @@ import (
func main() {
log.SetFlags(0) // don't print time
analytics.SetLogLevel(logrus.WarnLevel)
runtime.GOMAXPROCS(runtime.NumCPU())
e := commands.NewExecutor()
if err := e.Execute(); err != nil {

View File

@ -1,6 +1,8 @@
package commands
import (
"runtime"
"github.com/spf13/cobra"
)
@ -15,5 +17,6 @@ func (e *Executor) initRoot() {
}
rootCmd.PersistentFlags().BoolVarP(&e.cfg.Common.IsVerbose, "verbose", "v", false, "verbose output")
rootCmd.PersistentFlags().StringVar(&e.cfg.Common.CPUProfilePath, "cpu-profile-path", "", "Path to CPU profile output file")
rootCmd.PersistentFlags().IntVarP(&e.cfg.Common.Concurrency, "concurrency", "j", runtime.NumCPU(), "Concurrency")
e.rootCmd = rootCmd
}

View File

@ -6,6 +6,7 @@ import (
"fmt"
"log"
"os"
"runtime"
"runtime/pprof"
"strings"
@ -48,6 +49,8 @@ func (e *Executor) initRun() {
func (e Executor) executeRun(cmd *cobra.Command, args []string) {
f := func() (error, int) {
runtime.GOMAXPROCS(e.cfg.Common.Concurrency)
if e.cfg.Common.CPUProfilePath != "" {
f, err := os.Create(e.cfg.Common.CPUProfilePath)
if err != nil {
@ -82,7 +85,7 @@ func (e Executor) executeRun(cmd *cobra.Command, args []string) {
},
}
issues, err := runner.Run(ctx, golinters.GetSupportedLinters(), exec, &e.cfg.Run)
issues, err := runner.Run(ctx, golinters.GetSupportedLinters(), exec, e.cfg)
if err != nil {
return err, 1
}
@ -112,7 +115,7 @@ func outputIssues(format string, issues []result.Issue) error {
if format == config.OutFormatColoredLineNumber {
outStr = color.GreenString(outStr)
}
log.Print(outStr)
fmt.Fprint(os.Stdout, outStr)
}
for _, i := range issues {
@ -120,7 +123,7 @@ func outputIssues(format string, issues []result.Issue) error {
if format == config.OutFormatColoredLineNumber {
text = color.RedString(text)
}
log.Printf("%s:%d: %s", i.File, i.LineNumber, text)
fmt.Fprintf(os.Stdout, "%s:%d: %s\n", i.File, i.LineNumber, text)
}
return nil
}
@ -130,7 +133,7 @@ func outputIssues(format string, issues []result.Issue) error {
if err != nil {
return err
}
log.Print(string(outputJSON))
fmt.Fprint(os.Stdout, string(outputJSON))
return nil
}

View File

@ -15,6 +15,7 @@ var OutFormats = []string{OutFormatColoredLineNumber, OutFormatLineNumber, OutFo
type Common struct {
IsVerbose bool
CPUProfilePath string
Concurrency int
}
type Run struct {

View File

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"sync"
"github.com/golangci/golangci-lint/pkg/config"
"github.com/golangci/golangci-lint/pkg/result"
@ -13,35 +14,87 @@ import (
)
type Runner interface {
Run(ctx context.Context, linters []Linter, exec executors.Executor, cfg *config.Run) ([]result.Issue, error)
Run(ctx context.Context, linters []Linter, exec executors.Executor, cfg *config.Config) ([]result.Issue, error)
}
type SimpleRunner struct {
Processors []processors.Processor
}
func (r SimpleRunner) Run(ctx context.Context, linters []Linter, exec executors.Executor, cfg *config.Run) ([]result.Issue, error) {
results := []result.Result{}
type lintRes struct {
linter Linter
err error
res *result.Result
}
func runLinters(ctx context.Context, wg *sync.WaitGroup, tasksCh chan Linter, lintResultsCh chan lintRes, exec executors.Executor, cfg *config.Config) {
for i := 0; i < cfg.Common.Concurrency; i++ {
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
// XXX: if check it in a select with reading from tasksCh
// it's possible to not enter to this case until tasksCh is empty.
return
default:
}
select {
case <-ctx.Done():
return
case linter, ok := <-tasksCh:
if !ok {
return
}
res, lerr := linter.Run(ctx, exec, &cfg.Run)
lintResultsCh <- lintRes{
linter: linter,
err: lerr,
res: res,
}
}
}
}()
}
}
func (r SimpleRunner) Run(ctx context.Context, linters []Linter, exec executors.Executor, cfg *config.Config) ([]result.Issue, error) {
savedStdout, savedStderr := os.Stdout, os.Stderr
devNull, err := os.Open(os.DevNull)
if err != nil {
return nil, fmt.Errorf("can't open null device %q: %s", os.DevNull, err)
}
for _, linter := range linters {
os.Stdout, os.Stderr = devNull, devNull
res, err := linter.Run(ctx, exec, cfg)
lintResultsCh := make(chan lintRes, len(linters))
tasksCh := make(chan Linter, cfg.Common.Concurrency)
var wg sync.WaitGroup
wg.Add(cfg.Common.Concurrency)
runLinters(ctx, &wg, tasksCh, lintResultsCh, exec, cfg)
for _, linter := range linters {
tasksCh <- linter
}
close(tasksCh)
wg.Wait()
close(lintResultsCh)
os.Stdout, os.Stderr = savedStdout, savedStderr
if err != nil {
analytics.Log(ctx).Warnf("Can't run linter %s: %s", linter.Name(), err)
results := []result.Result{}
for res := range lintResultsCh {
if res.err != nil {
analytics.Log(ctx).Warnf("Can't run linter %s: %s", res.linter.Name(), res.err)
continue
}
if len(res.Issues) == 0 {
if res.res == nil || len(res.res.Issues) == 0 {
continue
}
results = append(results, *res)
results = append(results, *res.res)
}
results, err = r.processResults(results)