Introduction of Conc, a Concurrency Library in Go
conc offers a more concise and user-friendly concurrency handling library compared to the standard library, making it an excellent tool for beginners. It simplifies the structure of concurrent code and reduces the potential for errors.
conc
, a library designed to improve concurrency handling in Go.
Sourcegraph encountered issues when using Go's native concurrency mechanisms in their daily development, leading to the creation of conc
. Compared to the standard concurrency patterns, conc
offers a more elegant and streamlined approach, resulting in less code. Below is an example demonstrating the reduced code complexity.
type propagatedPanic struct {
val interface{}
stack []byte
}
func main() {
done := make(chan *propagatedPanic)
go func() {
defer func() {
if v := recover(); v != nil {
done <- &propagatedPanic{val: v, stack: debug.Stack()}
} else {
done <- nil
}
}()
doSomethingThatMightPanic()
}()
if val := <-done; val != nil {
panic(val)
}
}
// Using conc
func main() {
var wg conc.WaitGroup
wg.Go(doSomethingThatMightPanic)
// Panics with a nice stack trace
wg.Wait()
}
To install conc
, use the following command:
go get github.com/sourcegraph/conc
Using conc.WaitGroup
The main difference between conc.WaitGroup
and the standard library's sync.WaitGroup
is that panics in child goroutines are propagated to the caller of the Wait
method, eliminating the need to recover panics in the goroutine.
Example:
func main() {
var count atomic.Int64
var wg conc.WaitGroup
for i := 1; i < 100; i++ {
wg.Go(func() {
count.Add(1)
})
}
wg.Wait()
fmt.Println(count.Load())
}
If you want to recover from a panic in a goroutine, you can use the WaitAndRecover
method:
func main() {
var count atomic.Int64
var wg conc.WaitGroup
for i := 0; i < 10; i++ {
wg.Go(func() {
if i == 7 {
panic("bad thing")
}
count.Add(1)
})
}
wg.WaitAndRecover()
fmt.Println(count.Load())
}
Goroutine Pool (Pool)
A Pool
is a goroutine pool for handling concurrent tasks. Goroutines in the Pool
are lazily started, so creating a new Pool
is inexpensive. The number of goroutines created will never exceed the number of submitted tasks. The pool is efficient but not cost-free. It should not be used for very short tasks. The overhead for starting and tearing down is about 1μs, and the overhead per task is about 300ns.
Example:
func main() {
p := pool.New().WithMaxGoroutines(3)
for i := 0; i < 5; i++ {
p.Go(func() {
fmt.Println("conc")
})
}
p.Wait()
}
Using WithContext to Create a Pool
You can create a Pool
that passes a Context
by using WithContext
. By default, the Pool
's Context
will not be canceled until the parent Context
is canceled. If you need to cancel the Context
in the pool in the event of a panic or error, you can configure WithCancelOnError
.
Example:
func main() {
p := pool.New().
WithMaxGoroutines(4).
WithContext(context.Background()).
WithCancelOnError()
for i := 0; i < 3; i++ {
i := i
p.Go(func(ctx context.Context) error {
if i == 2 {
return errors.New("I will cancel all other tasks!")
}
<-ctx.Done()
return nil
})
}
err := p.Wait()
fmt.Println(err)
}
Result Pool
A ResultPool
is a task pool that executes tasks and returns generic results. Use Go()
to execute tasks in the pool, and Wait()
to return the results of the tasks.
Example:
func main() {
p := pool.NewWithResults[int]()
for i := 0; i < 10; i++ {
p.Go(func() int {
return i * 2
})
}
res := p.Wait()
// Result order is non-deterministic, so sort them first
sort.Ints(res)
fmt.Println(res)
}
Stream Pool
A StreamPool
executes tasks, and the order of the returned results is unordered. If you want ordered results, you can use Stream
. To use Stream
, you need to submit a certain number of tasks, each of which returns a callback. Each task will be executed concurrently in the task pool, and the callbacks will be executed in the order the tasks were submitted. After submitting the tasks, you must use the Wait()
method to wait for the tasks to complete and propagate panics.
Like Pool
, Stream
is also not suitable for very short tasks. Starting and tearing down adds a few microseconds of overhead, and the overhead per task is about 500ns.
Example:
func main() {
times := []int{20, 52, 16, 45, 4, 80}
stream := stream2.New()
for _, millis := range times {
dur := time.Duration(millis) * time.Millisecond
stream.Go(func() stream2.Callback {
time.Sleep(dur) // This will print in the order the tasks were submitted
return func() {
fmt.Println(dur)
}
})
}
stream.Wait()
}
Summary
conc
offers a more concise and user-friendly concurrency handling library compared to the standard library, making it an excellent tool for beginners. It simplifies the structure of concurrent code and reduces the potential for errors.