🏃 Конкурентность в Golang и WorkerPool [Часть 1]
В современных языках программирования конкурентность стала безусловной потребностью. В этой статье речь пойдет об устройстве и использовании concurrency в Go.
Перевод публикуется с сокращениями, автор оригинальной статьи Ahad Hasan.
В некоторых языках программирования имеются мощные конструкции, которые могут выгружать работу в разные потоки ОС (например, Java), а другие только имитируют это поведение в одном потоке (например, Ruby).
У Golang есть мощная модель конкурентности – CSP (communicating sequential processes), которая разбивает проблему на более мелкие последовательные процессы, а затем планирует несколько экземпляров этих процессов, называемых Goroutines (горутины). Связь между горутинами осуществляется путем передачи неизменяемых сообщений через Channels.
Рассмотрим, как можно воспользоваться преимуществами конкурентности в Golang и как ограничить его использование с рабочими пулами.
Простой пример
Представим, что у нас есть внешний вызов API, выполняющийся около 100 мс. Если будет 1000 таких вызовов, и мы вызовем их синхронно, то для завершения потребуется около 100 секунд.
//// model/data.go package model type SimpleData struct { ID int } //// basic/basic.go package basic import ( "fmt" "github.com/Joker666/goworkerpool/model" "time" ) func Work(allData []model.SimpleData) { start := time.Now() for i, _ := range allData { Process(allData[i]) } elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func Process(data model.SimpleData) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) fmt.Printf("Finish processing %d\n", data.ID) } //// main.go package main import ( "fmt" "github.com/Joker666/goworkerpool/basic" "github.com/Joker666/goworkerpool/model" "github.com/Joker666/goworkerpool/worker" ) func main() { // Prepare the data var allData []model.SimpleData for i := 0; i < 1000; i++ { data := model.SimpleData{ ID: i } allData = append(allData, data) } fmt.Printf("Start processing all work \n") // Process basic.Work(allData) }
Start processing all work Took ===============> 1m40.226679665s
Здесь у нас простая модель, содержащая структуру данных только с целочисленными значениями. Массив данных мы обрабатываем массив синхронно: очевидно, что такое решение неоптимально решение, поскольку задачи можно выполнить одновременно. Давайте превратим это в асинхронный процесс с Goroutines и Channels.
Асинхронность
//// worker/notPooled.go func NotPooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup dataCh := make(chan model.SimpleData, 100) wg.Add(1) go func() { defer wg.Done() for data := range dataCh { wg.Add(1) go func(data model.SimpleData) { defer wg.Done() basic.Process(data) }(data) } }() for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.NotPooledWork(allData)
Start processing all work Took ===============> 101.191534ms
Здесь мы создаем буферизованный канал 100 и добавляем в него все данные, переданные NoPooledWork. Поскольку канал буферизованный, нельзя ввести больше 100 экземпляров данных до полного извлечения из него, что и происходит внутри горутины. Мы перемещаемся по каналу, извлекаем из него данные, добавляем горутину и обрабатываем. Здесь нет ограничений на количество созданных горутин, как нет и ограничений на обработку задач (следует учитывать необходимые ресурсы) – сколько пришло, столько обработали. Если мы запустим такой код, то выполним 1000 задач примерно за 100 мс.
Проблема
Если у нас нет безграничных ресурсов, их нужно ограниченно распределять в течение определенного периода времени. Минимальный размер объекта Goroutine составляет 2 К, но он может достигать 1 ГБ. Приведенное выше решение выполняет все задачи параллельно, а для миллиона таких задач оно может быстро исчерпать память и ресурсы процессора. Придется либо модернизировать машину, либо найти другой подход.
Существует блестящее решение под названием Thread Pool или Worker Pool. Идея состоит в том, чтобы иметь ограниченный пул worker-ов для обработки задач. Как только "рабочий" закончит с одной из них, он переходит к следующей. Это уменьшает нагрузку на процессор и память, а также оперативнее распределяет задачи с течением времени.
Решение: Worker Pool
Исправим описанную проблему и реализуем рабочий пул:
//// worker/pooled.go func PooledWork(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { basic.Process(data) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } //// main.go // Process worker.PooledWork(allData)
Start processing all work Took ===============> 1.002972449s
Здесь у нас есть ограниченное количество worker-ов (100) и ровно 100 горутин для обработки задач. Каналы можно рассматривать как очереди, а каждый worker – как клиента. Несколько горутин могут прослушивать один и тот же канал, но каждый элемент в нем будет обработан только один раз.
Это хорошее решение, и если мы запустим его, то увидим, что для завершения всех задач требуется 1 секунда. Не совсем 100 мс, но нам это и не нужно. Мы получаем гораздо лучшее решение, которое распределяет нагрузку во времени.
Обработка ошибок
Код уже выглядит как готовый продукт, но это не так, поскольку мы не обрабатываем ошибки. Давайте создадим сценарий, в котором посмотрим, как можно это реализовать:
//// worker/pooledError.go func PooledWorkError(allData []model.SimpleData) { start := time.Now() var wg sync.WaitGroup workerPoolSize := 100 dataCh := make(chan model.SimpleData, workerPoolSize) errors := make(chan error, 1000) for i := 0; i < workerPoolSize; i++ { wg.Add(1) go func() { defer wg.Done() for data := range dataCh { process(data, errors) } }() } for i, _ := range allData { dataCh <- allData[i] } close(dataCh) wg.Add(1) go func() { defer wg.Done() for { select { case err := <-errors: fmt.Println("finished with error:", err.Error()) case <-time.After(time.Second * 1): fmt.Println("Timeout: errors finished") return } } }() defer close(errors) wg.Wait() elapsed := time.Since(start) fmt.Printf("Took ===============> %s\n", elapsed) } func process(data model.SimpleData, errors chan<- error) { fmt.Printf("Start processing %d\n", data.ID) time.Sleep(100 * time.Millisecond) if data.ID % 29 == 0 { errors <- fmt.Errorf("error on job %v", data.ID) } else { fmt.Printf("Finish processing %d\n", data.ID) } } //// main.go // Process worker.PooledWorkError(allData)
Мы модифицировали метод обработки некоторых ошибок и добавили его в канал переданных ошибок. Таким образом для обработки ошибок в параллельной модели нам нужен канал для хранения данных о них. После того, как все задачи завершены, мы его проверяем. Объект error содержит идентификатор задачи, так что при необходимости мы можем обработать их снова.
Это лучшее решение, чем то, которое вообще не учитывало ошибки. Во второй части туториала рассмотрим, как сделать выделенный и надежный пакет рабочих пулов, который сможет обрабатывать параллельные задачи с ограниченным количеством Worker pool.
Заключение
Мы рассмотрели синхронный и асинхронный подходы, обработку ошибок, горутины и функционирование worker-ов. Модель конкурентности Golang достаточно мощна, чтобы просто построить решение Worker pool без особых накладных расходов, поэтому она не включена в стандартную библиотеку. Однако мы всегда можем создать собственное решение, которое соответствует нашим потребностям. Скоро будет следующая статья, а пока следите за обновлениями.
Дополнительные материалы:
- Язык Go: как стать востребованным программистом
- Программирование на Go с нуля: 9 полезных видеоуроков
- ТОП-10 книг по языку программирования Go: от новичка до профессионала
- Чем хорош язык Go и зачем его изучать? Все плюшки Golang
- Более 200 избранных ссылок на материалы о языке Go