Introduction of Conc, a Concurrency Library in Go

conc offers a more concise and user-friendly concurrency handling library compared to the standard library, making it an excellent tool for beginners. It simplifies the structure of concurrent code and reduces the potential for errors.

conc, a library designed to improve concurrency handling in Go.

Sourcegraph encountered issues when using Go's native concurrency mechanisms in their daily development, leading to the creation of conc. Compared to the standard concurrency patterns, conc offers a more elegant and streamlined approach, resulting in less code. Below is an example demonstrating the reduced code complexity.

type propagatedPanic struct {
    val  interface{}
    stack []byte
}

func main() {
    done := make(chan *propagatedPanic)
    go func() {
        defer func() {
            if v := recover(); v != nil {
                done <- &propagatedPanic{val: v, stack: debug.Stack()}
            } else {
                done <- nil
            }
        }()
        doSomethingThatMightPanic()
    }()

    if val := <-done; val != nil {
        panic(val)
    }
}

// Using conc

func main() {
    var wg conc.WaitGroup
    wg.Go(doSomethingThatMightPanic)
    // Panics with a nice stack trace
    wg.Wait()
}

To install conc, use the following command:

go get github.com/sourcegraph/conc

Using conc.WaitGroup

The main difference between conc.WaitGroup and the standard library's sync.WaitGroup is that panics in child goroutines are propagated to the caller of the Wait method, eliminating the need to recover panics in the goroutine.

Example:

func main() {
    var count atomic.Int64
    var wg conc.WaitGroup
    for i := 1; i < 100; i++ {
        wg.Go(func() {
            count.Add(1)
        })
    }
    wg.Wait()
    fmt.Println(count.Load())
}

If you want to recover from a panic in a goroutine, you can use the WaitAndRecover method:

func main() {
    var count atomic.Int64
    var wg conc.WaitGroup
    for i := 0; i < 10; i++ {
        wg.Go(func() {
            if i == 7 {
                panic("bad thing")
            }
            count.Add(1)
        })
    }
    wg.WaitAndRecover()
    fmt.Println(count.Load())
}

Goroutine Pool (Pool)

A Pool is a goroutine pool for handling concurrent tasks. Goroutines in the Pool are lazily started, so creating a new Pool is inexpensive. The number of goroutines created will never exceed the number of submitted tasks. The pool is efficient but not cost-free. It should not be used for very short tasks. The overhead for starting and tearing down is about 1μs, and the overhead per task is about 300ns.

Example:

func main() {
    p := pool.New().WithMaxGoroutines(3)
    for i := 0; i < 5; i++ {
        p.Go(func() {
            fmt.Println("conc")
        })
    }
    p.Wait()
}

Using WithContext to Create a Pool

You can create a Pool that passes a Context by using WithContext. By default, the Pool's Context will not be canceled until the parent Context is canceled. If you need to cancel the Context in the pool in the event of a panic or error, you can configure WithCancelOnError.

Example:

func main() {
    p := pool.New().
        WithMaxGoroutines(4).
        WithContext(context.Background()).
        WithCancelOnError()
    for i := 0; i < 3; i++ {
        i := i
        p.Go(func(ctx context.Context) error {
            if i == 2 {
                return errors.New("I will cancel all other tasks!")
            }
            <-ctx.Done()
            return nil
        })
    }
    err := p.Wait()
    fmt.Println(err)
}

Result Pool

A ResultPool is a task pool that executes tasks and returns generic results. Use Go() to execute tasks in the pool, and Wait() to return the results of the tasks.

Example:

func main() {
    p := pool.NewWithResults[int]()
    for i := 0; i < 10; i++ {
        p.Go(func() int {
            return i * 2
        })
    }
    res := p.Wait()
    // Result order is non-deterministic, so sort them first
    sort.Ints(res)
    fmt.Println(res)
}

Stream Pool

A StreamPool executes tasks, and the order of the returned results is unordered. If you want ordered results, you can use Stream. To use Stream, you need to submit a certain number of tasks, each of which returns a callback. Each task will be executed concurrently in the task pool, and the callbacks will be executed in the order the tasks were submitted. After submitting the tasks, you must use the Wait() method to wait for the tasks to complete and propagate panics.

Like Pool, Stream is also not suitable for very short tasks. Starting and tearing down adds a few microseconds of overhead, and the overhead per task is about 500ns.

Example:

func main() {
    times := []int{20, 52, 16, 45, 4, 80}
    stream := stream2.New()
    for _, millis := range times {
        dur := time.Duration(millis) * time.Millisecond
        stream.Go(func() stream2.Callback {
            time.Sleep(dur) // This will print in the order the tasks were submitted
            return func() {
                fmt.Println(dur)
            }
        })
    }
    stream.Wait()
}

Summary

conc offers a more concise and user-friendly concurrency handling library compared to the standard library, making it an excellent tool for beginners. It simplifies the structure of concurrent code and reduces the potential for errors.

References: