Golang error group

A repo to keep track of random things I learned. Most technical, many not. - today-i-learned/using-error-groups.md at main · clockworksoul/today-i-learned

Using error groups in Go

Source: [1][2][3]

The golang.org/x/sync/errgroup package provides synchronization, error propagation, and Context cancelation for groups of goroutines working on subtasks of a common task.

Most interestingly, it provides the Group type, which represents an error group: a collection of goroutines working on subtasks that are part of the same overall task.

errgroup.Group

The Group type represents a collection of goroutines working on subtasks that are part of the same overall task.

It has two attached methods: Go() and Wait().

Go()

func (g *Group) Go(f func() error)

Go() calls the given function in a new goroutine.

The first call to return a non-nil error cancels the group; its error will be returned by Wait().

Wait()

func (g *Group) Wait() error

Wait() blocks until all function calls from the Go() method have returned, then returns the first non-nil error (if any) from them.

Creating an errgroup.Group

An error group can be created in one of two ways: with or without a Context.

With context

A Group can be created using the WithContext() function, which returns a new Group and an associated Context derived from its ctx parameter.

The derived Context is canceled the first time a function passed to Go() returns a non-nil error or the first time Wait() returns, whichever occurs first.

func WithContext(ctx context.Context) (*Group, context.Context)

A zero Group

A zero Group is valid and does not cancel on error.

or simply

Using errgroup.Group

You can use a Group as follows:

  1. Create the Group, ideally using a Context.
  2. Pass the Group one or more functions of type func() error to the Go() method to execute concurrently.
  3. Call Wait() method, which blocks until one of the functions has an error or all functions complete.

Generation 1: Serial Service Calls

func main() {
    err := SendRequestToService()
    if err != nil {
        log.Fatal(err)
    }
}

Generation 2: Using an error group

func main() {
    g, ctx := errgroup.WithContext(context.Background())

    g.Go(func() error {
        return SendRequestToService()
    })

    err := g.Wait()
    if err != nil {
        // Note that ctx is automatically canceled!
        log.Fatal(err)
    }
}

Пакет Golang errgroup используется для предоставления инструментов для синхронизации, распространения ошибок и отмены контекста для группы горутин, выполняющих общую задачу.

Давайте посмотрим, как использовать пакет errgroup.

Импорт необходимого пакета

Чтобы использовать пакет errgroup, вам необходимо импортировать его с помощью предложения import. В приведенном ниже примере кода показано, как импортировать пакет errgroup.

import «golang.org/x/sync/errgroup»

Витконтекстная функция

Пакет errgroup предоставляет нам функцию WithContext. Функция возвращает новую группу и связанный с ней контекст.

Если переданная функция возвращает ненулевую ошибку или возвращается ожидание, контекст отменяется.

Перейти Func

Вторая функция, предоставляемая пакетом errgroup, — это функция Go. Эта функция вызывает указанную функцию для новой горутины.

Если первый вызов возвращает ненулевую ошибку, это отменяет контекст.

Wait Func

Третья функция — это функция ожидания. Эта функция ожидает блок, пока вызовы функции из метода Go не вернут значение.

Пример

Давайте посмотрим, как мы можем использовать пакет errgroup. Давайте начнем с набора подпрограмм go, которые просто печатают значение.

package main
import «fmt»
func main() {
for i := ; i < 10; i++ {
go fmt.Println(«Processing task: «, i)
}
}

Здесь у нас есть цикл for, который просто запускает кучу горутин и печатает текущее значение в итерации цикла.

Если мы запустим приведенный выше код, вы заметите, что он фактически ничего не печатает. Это связано с тем, что горутины находятся внутри основной функции, и как только основная функция завершится, Go завершит всю программу, независимо от того, завершились ли горутины.

Мы можем переопределить приведенный выше код, используя группы ожидания, как показано ниже:

package main
import (
«fmt»
«sync»
)

func main() {
wg := &sync.WaitGroup{}
for i := ; i < 10; i++ {
wg.Add(1)
go func (task int)  {
defer wg.Done()
fmt.Println(«Processing task: «, task)

}(i)
}
wg.Wait()
}

Здесь мы вводим «параллелизм» с помощью группы ожидания. В двух словах, группа ожидания — это счетчик, который позволяет нам заблокировать выход основной функции до тех пор, пока не завершится выполнение всех горутин.

Группа ожидания работает, создавая счетчик, который отслеживает количество горутин в очереди. Как только горутина завершается, мы удаляем ее из очереди. Как только очередь становится равной 0, группа ожидания разблокирует выполнение и возвращается к основной.

Обратите внимание на функцию «Добавить»? Мы используем это, чтобы добавить значение к счетчику очереди группы ожидания. Как только выполнение завершено, мы удаляем завершенную горутину с помощью метода Done.

Using Errgroup

В приведенном выше примере мы обрабатываем группу горутин, используя пакет синхронизации в Go. Однако нет никакого механизма для обработки любых ошибок. Хотя это и не обязательно для простого примера, как показано выше, это важно для многих приложений.

Для этого мы можем использовать функцию errgroup, которая позволяет обрабатывать ошибки в группе ожидания. Взгляните на пример ниже:

package main
import (
«fmt»
«log»
«math/rand»

«golang.org/x/sync/errgroup»
)
func Task(task int) error {
if rand.Intn(10) == task {
return fmt.Errorf(«Task %v failed», task)
}
fmt.Printf(«Task %v completed», task)
return nil
}
func main() {
eg := &errgroup.Group{}
for i := ; i < 10; i++ {
task := i
eg.Go(func() error {
return Task(task)
})
}
if err := eg.Wait(); err != nil {
log.Fatal(«Error», err)
}
fmt.Println(«Completed successfully!»)
}

В этом примере мы вводим простую функцию, которая генерирует случайное целое число. Если случайное значение равно заданию, мы возвращаем ошибку.

Затем мы используем группу ошибок для обработки ошибок с помощью функций Group{} и Go.

Заключение

В этом руководстве рассказывается, как внедрять группы ожидания и работать с ними, а также обрабатывать ошибки с помощью пакета errgroup.

Preface

Concurrency is one of Go’s
strong points and I love working with the paradigm that the Go team has built.
It is a big topic with lots to talk about. I recommend reading through the Effective Go documentation
about concurrency in Go to learn about goroutines, channels, and how they all work together.

Error handling is also done differently in Go than other languages, thanks to multiple return values.
I recommend reading their blog post on error handling.

ErrGroup

If you don’t need to do any further work off of the errors, use an ErrGroup!

An ErrGroup is essentially a wrapped sync.WaitGroup to catch errors out of
the started goroutines.

WaitGroup

Here is an example without errors using a WaitGroup (from godoc):

package main

import (
	"sync"
)

type httpPkg struct{}

func (httpPkg) Get(url string) {}

var http httpPkg

func main() {
	var wg sync.WaitGroup
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somename.com/",
	}
	for _, url := range urls {
		// Increment the WaitGroup counter.
		wg.Add(1)
		// Launch a goroutine to fetch the URL.
		go func(url string) {
			// Decrement the counter when the goroutine completes.
			defer wg.Done()
			// Fetch the URL.
			http.Get(url)
		}(url)
	}
	// Wait for all HTTP fetches to complete.
	wg.Wait()
  fmt.Println("Successfully fetched all URLs.")
}

To use a WaitGroup, first create the group:

var wg sync.WaitGroup

Next, for every goroutine, add that number to the group:

wg.Add(1)

Then whenever a goroutine is done, tell the group:

defer wg.Done()

The defer keyword:

It defers the execution of the statement following the keyword until the surrounding function returns.

Read more about it in the tour of go.

Finally, wait for the group to complete:

wg.Wait()

In this case, there are no errors that can occur. Let’s look at how it changes if we needed to catch errors, using an ErrGroup.

ErrGroup

Here is the same example as above but using an ErrGroup (from godoc):

package main

import (
	"fmt"
	"net/http"

	"golang.org/x/sync/errgroup"
)

func main() {
	g := new(errgroup.Group)
	var urls = []string{
		"http://www.golang.org/",
		"http://www.google.com/",
		"http://www.somename.com/",
	}
	for _, url := range urls {
		// Launch a goroutine to fetch the URL.
		url := url // https://golang.org/doc/faq#closures_and_goroutines
		g.Go(func() error {
			// Fetch the URL.
			resp, err := http.Get(url)
			if err == nil {
				resp.Body.Close()
			}
			return err
		})
	}
	// Wait for all HTTP fetches to complete.
	if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
	}
}

It looks very similar, here are the differences:

First, create the group:

var wg sync.WaitGroup

// VVV BECOMES VVV

g := new(errgroup.Group)

Next, instead of adding every goroutine to the group, call g.Go with the function to be a goroutine.
The only requirement is it must have the following signature: func() error.
Also, since the ErrGroup will handle when goroutines are completed, there is no need to call wg.Done().

go func(arg string) {
			// Decrement the counter when the goroutine completes.
			defer wg.Done()
			// ... work that can return error here
}(arg)

// VVV BECOMES VVV

g.Go(func() error {
  // ... work that can return error here
})

Finally, wait for the group to finish and handle the errors as needed:

wg.Wait()

// VVV BECOMES VVV

if err := g.Wait(); err == nil {
		fmt.Println("Successfully fetched all URLs.")
}

ErrGroups provide lots of opportunities on handling errors in goroutines.
That being said, ErrGroup is just another tool in the toolbox that should be used when the use case fits.
If some more complex decisions and work needs to be made based off of the errors, a channel is probably
better fit.

What do you think? Let me know @bstncartwright

If you’ve used Go for a while you’re probably aware of some of the basic Go concurrency primitives:

  • The go keyword for spawning goroutines
  • Channels, for communicating between goroutines
  • The context package for propagating cancellation
  • The sync and sync/atomic packages for lower-level primitives such as mutexes and atomic memory access

These language features and packages combine to provide a very rich set of tools for building concurrent applications.
What you might not have discovered yet is a set of higher-level concurrency primitives available in the «extended standard library» available at golang.org/x/sync.
We’ll be taking a look at these in this article.

Package singleflight

As the package documentation states, this package provides a duplicate function call suppression mechanism.

This package is extremely useful for cases where you are doing something computationally expensive (or just slow, like network access) in response to user activity.
For example, let’s say you have a database with weather information per city and you want to expose this as an API.
In some cases you might have multiple users ask for the weather for the same city at the same time.

When that happens, wouldn’t it be great if you could just query the database, and then share the result to all the waiting requests?
That’s exactly what the singleflight package does!

To use it, create a singleflight.Group somewhere. It needs to be shared across all the requests to work correctly.
Then wrap the slow or expensive operation in a call to group.Do(key, fn). Multiple concurrent requests for the same key
will only call fn once, and the result will be returned to all callers once fn returns.

Here’s how it looks in practice:

package weather type Info struct { TempC, TempF int // temperature in Celsius and Farenheit Conditions string // "sunny", "snowing", etc } var group singleflight.Group func City(city string) (*Info, error) { results, err, _ := group.Do(city, func() (interface{}, error) { info, err := fetchWeatherFromDB(city) // slow operation return info, err }) if err != nil { return nil, fmt.Errorf("weather.City %s: %w", city, err) } return results.(*Info), nil }

Note that the closure we pass to group.Do must return (interface{}, error) to work with the Go type system.
The third return value from group.Do, which is ignored in the example above, indicates whether the result was
shared between multiple callers or not.

Package errgroup

Another invaluable package is the errgroup package.
It is best described as a sync.WaitGroup but where the tasks return errors that are propagated back to the waiter.

This package is useful when you have multiple operations that you want to wait for, but you also want to determine
if they all completed successfully.
For example, to build on the weather example from above, let’s say you want to lookup the weather for multiple cities
at once, and fail if any of the lookups fails.

Start by defining an errgroup.Group, and use the group.Go(fn func() error) method for each city.
This method spawns a goroutine to run the task. When you’ve spawned all the tasks you want, use
group.Wait() to wait for them to complete. Note that this method returns an error, unlike sync.WaitGroup‘s equivalent.
The error is nil if and only if all the tasks returned a nil error.

In practice it looks like this:

func Cities(cities ...string) ([]*Info, error) { var g errgroup.Group var mu sync.Mutex res := make([]*Info, len(cities)) // res[i] corresponds to cities[i] for i, city := range cities { i, city := i, city // create locals for closure below g.Go(func() error { info, err := City(city) mu.Lock() res[i] = info mu.Unlock() return err }) } if err := g.Wait(); err != nil { return nil, err } return res, nil }

Here we are allocating an slice of results so that each goroutine can write to its own index.
While the above code is safe even without the mu mutex, since each goroutine is writing to its own entry in the slice, we use one anyway in case the code is changed over time.

Bounded concurrency

The code above will lookup weather information for all the given cities concurrently.
That’s fine when the number of cities is small, but can cause performance issues if the number of cities is massive.
In those cases it’s useful to introduce bounded concurrency.

Go makes it really easy to create bounded concurrency with the use of semaphores. A semaphore is a concurrency primitive that you might have come across if you studied Computer Science, but if not, don’t worry. You can use semaphores for several purposes, but we’re just going to use them to keep track of how many tasks are running, and to block until there is room for another task to start.

In Go we can accomplish this through a clever use of channels! If we want to allow up to 10 tasks to run at once,
we create a channel with space for 10 items: semaphore := make(chan struct{}, 10). You can picture this as a pipe that can fit 10 balls.

To start a new task, blocking if too many tasks are already running, we simply attempt to send a value on the channel: semaphore <- struct{}{}. This is analogous to trying to push another ball into the pipe. If the pipe is full, it waits until there is room.

When a task completes, mark it as such by taking a value out of the channel: <-semaphore. This is analogous to pulling a ball out at the other end of the pipe, which leaves room for another ball to be pushed in (another task started).

And that’s it! Our modified Cities looks like this:

func Cities(cities ...string) ([]*Info, error) { var g errgroup.Group var mu sync.Mutex res := make([]*Info, len(cities)) // res[i] corresponds to cities[i] sem := make(chan struct{}, 10) for i, city := range cities { i, city := i, city // create locals for closure below sem <- struct{}{} g.Go(func() error { info, err := City(city) mu.Lock() res[i] = info mu.Unlock() <-sem return err }) } if err := g.Wait(); err != nil { return nil, err } return res, nil }

Weighted bounded concurrency

And finally, sometimes you want bounded concurrency, but not all tasks are equally expensive.
In that case the amount of resources we’ll consume will vary drastically depending on the distribution
of cheap and expensive tasks and how they happen to start.

A better solution for this use case is to use weighted bounded concurrency.
How this works is simple: instead of reasoning about the number of tasks we want to run concurrently,
we come up with a «cost» for every task and acquire and release that cost from a semaphore.

We can’t model this with channels any longer since we need the whole cost acquired and released at once.
Fortunately the «extended standard library» comes to our rescue once more!
The golang.org/x/sync/sempahore
package provides a weighted semaphore implementation exactly for this purpose.

The sem <- struct{}{} operation is called «Acquire» and the <-sem operation is called «Release».
You will note that the semaphore.Acquire method returns an error; that is because it can be used
with the context package to abort the operation early. For the purpose of this example we will ignore it.

The weather lookup example is realistically too simple to warrant a weighted semaphore,
but for the sake of simplicity let’s pretend the cost varies with the length of the city name.
Then we arrive at the following:

func Cities(cities ...string) ([]*Info, error) { ctx := context.TODO() // replace with a real context var g errgroup.Group var mu sync.Mutex res := make([]*Info, len(cities)) // res[i] corresponds to cities[i] sem := semaphore.NewWeighted(100) // 100 chars processed concurrently for i, city := range cities { i, city := i, city // create locals for closure below cost := int64(len(city)) if err := sem.Acquire(ctx, cost); err != nil { break } g.Go(func() error { info, err := City(city) mu.Lock() res[i] = info mu.Unlock() sem.Release(cost) return err }) } if err := g.Wait(); err != nil { return nil, err } else if err := ctx.Err(); err != nil { return nil, err } return res, nil }

Conclusion

The above examples show how easy it is to add concurrency to a Go program, and then fine-tune it based on your needs.

Feedback, corrections and suggestions on how this article can be improved are very welcome! Please reach out to me on Twitter.

Понравилась статья? Поделить с друзьями:
  • Golang custom error type
  • Golang create new error
  • Golang catch error
  • Going into standby mode как исправить
  • Gog ошибка 0xc000007b