eFusion 24 декабря 2020

🏃 Конкурентность в Golang и WorkerPool [Часть 1]

В современных языках программирования конкурентность стала безусловной потребностью. В этой статье речь пойдет об устройстве и использовании concurrency в Go.
🏃 Конкурентность в Golang и WorkerPool [Часть 1]

Перевод публикуется с сокращениями, автор оригинальной статьи Ahad Hasan.

В некоторых языках программирования имеются мощные конструкции, которые могут выгружать работу в разные потоки ОС (например, Java), а другие только имитируют это поведение в одном потоке (например, Ruby).

🏃 Конкурентность в Golang и WorkerPool [Часть 1]

У Golang есть мощная модель конкурентности – CSP (communicating sequential processes), которая разбивает проблему на более мелкие последовательные процессы, а затем планирует несколько экземпляров этих процессов, называемых Goroutines (горутины). Связь между горутинами осуществляется путем передачи неизменяемых сообщений через Channels.

Рассмотрим, как можно воспользоваться преимуществами конкурентности в Golang и как ограничить его использование с рабочими пулами.

Простой пример

Представим, что у нас есть внешний вызов API, выполняющийся около 100 мс. Если будет 1000 таких вызовов, и мы вызовем их синхронно, то для завершения потребуется около 100 секунд.

        //// model/data.go

package model

type SimpleData struct {
	ID int
}

//// basic/basic.go

package basic

import (
	"fmt"
	"github.com/Joker666/goworkerpool/model"
	"time"
)

func Work(allData []model.SimpleData) {
	start := time.Now()
	for i, _ := range allData {
		Process(allData[i])
	}
	elapsed := time.Since(start)
	fmt.Printf("Took ===============> %s\n", elapsed)
}

func Process(data model.SimpleData) {
	fmt.Printf("Start processing %d\n", data.ID)
	time.Sleep(100 * time.Millisecond)
	fmt.Printf("Finish processing %d\n", data.ID)
}

//// main.go

package main

import (
	"fmt"
	"github.com/Joker666/goworkerpool/basic"
	"github.com/Joker666/goworkerpool/model"
	"github.com/Joker666/goworkerpool/worker"
)

func main() {
	// Prepare the data
	var allData []model.SimpleData
	for i := 0; i < 1000; i++ {
		data := model.SimpleData{ ID: i }
		allData = append(allData, data)
	}
	fmt.Printf("Start processing all work \n")

	// Process
	basic.Work(allData)
}
    
        Start processing all work 
Took ===============> 1m40.226679665s
    

Здесь у нас простая модель, содержащая структуру данных только с целочисленными значениями. Массив данных мы обрабатываем массив синхронно: очевидно, что такое решение неоптимально решение, поскольку задачи можно выполнить одновременно. Давайте превратим это в асинхронный процесс с Goroutines и Channels.

Асинхронность

🏃 Конкурентность в Golang и WorkerPool [Часть 1]
        //// worker/notPooled.go

func NotPooledWork(allData []model.SimpleData) {
	start := time.Now()
	var wg sync.WaitGroup

	dataCh := make(chan model.SimpleData, 100)

	wg.Add(1)
	go func() {
		defer wg.Done()
		for data := range dataCh {
			wg.Add(1)
			go func(data model.SimpleData) {
				defer wg.Done()
				basic.Process(data)
			}(data)
		}
	}()

	for i, _ := range allData {
		dataCh <- allData[i]
	}

	close(dataCh)
	wg.Wait()
	elapsed := time.Since(start)
	fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.NotPooledWork(allData)
    
        Start processing all work 
Took ===============> 101.191534ms
    

Здесь мы создаем буферизованный канал 100 и добавляем в него все данные, переданные NoPooledWork. Поскольку канал буферизованный, нельзя ввести больше 100 экземпляров данных до полного извлечения из него, что и происходит внутри горутины. Мы перемещаемся по каналу, извлекаем из него данные, добавляем горутину и обрабатываем. Здесь нет ограничений на количество созданных горутин, как нет и ограничений на обработку задач (следует учитывать необходимые ресурсы) – сколько пришло, столько обработали. Если мы запустим такой код, то выполним 1000 задач примерно за 100 мс.

Проблема

Если у нас нет безграничных ресурсов, их нужно ограниченно распределять в течение определенного периода времени. Минимальный размер объекта Goroutine составляет 2 К, но он может достигать 1 ГБ. Приведенное выше решение выполняет все задачи параллельно, а для миллиона таких задач оно может быстро исчерпать память и ресурсы процессора. Придется либо модернизировать машину, либо найти другой подход.

Существует блестящее решение под названием Thread Pool или Worker Pool. Идея состоит в том, чтобы иметь ограниченный пул worker-ов для обработки задач. Как только "рабочий" закончит с одной из них, он переходит к следующей. Это уменьшает нагрузку на процессор и память, а также оперативнее распределяет задачи с течением времени.

Решение: Worker Pool

🏃 Конкурентность в Golang и WorkerPool [Часть 1]

Исправим описанную проблему и реализуем рабочий пул:

        //// worker/pooled.go

func PooledWork(allData []model.SimpleData) {
	start := time.Now()
	var wg sync.WaitGroup
	workerPoolSize := 100

	dataCh := make(chan model.SimpleData, workerPoolSize)

	for i := 0; i < workerPoolSize; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()

			for data := range dataCh {
				basic.Process(data)
			}
		}()
	}

	for i, _ := range allData {
		dataCh <- allData[i]
	}

	close(dataCh)
	wg.Wait()
	elapsed := time.Since(start)
	fmt.Printf("Took ===============> %s\n", elapsed)
}

//// main.go

// Process
worker.PooledWork(allData)
    
        Start processing all work 
Took ===============> 1.002972449s
    

Здесь у нас есть ограниченное количество worker-ов (100) и ровно 100 горутин для обработки задач. Каналы можно рассматривать как очереди, а каждый worker – как клиента. Несколько горутин могут прослушивать один и тот же канал, но каждый элемент в нем будет обработан только один раз.

Это хорошее решение, и если мы запустим его, то увидим, что для завершения всех задач требуется 1 секунда. Не совсем 100 мс, но нам это и не нужно. Мы получаем гораздо лучшее решение, которое распределяет нагрузку во времени.

Обработка ошибок

Код уже выглядит как готовый продукт, но это не так, поскольку мы не обрабатываем ошибки. Давайте создадим сценарий, в котором посмотрим, как можно это реализовать:

        //// worker/pooledError.go

func PooledWorkError(allData []model.SimpleData) {
	start := time.Now()
	var wg sync.WaitGroup
	workerPoolSize := 100

	dataCh := make(chan model.SimpleData, workerPoolSize)
	errors := make(chan error, 1000)

	for i := 0; i < workerPoolSize; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()

			for data := range dataCh {
				process(data, errors)
			}
		}()
	}

	for i, _ := range allData {
		dataCh <- allData[i]
	}

	close(dataCh)

	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case err := <-errors:
				fmt.Println("finished with error:", err.Error())
			case <-time.After(time.Second * 1):
				fmt.Println("Timeout: errors finished")
				return
			}
		}
	}()

	defer close(errors)
	wg.Wait()
	elapsed := time.Since(start)
	fmt.Printf("Took ===============> %s\n", elapsed)
}

func process(data model.SimpleData, errors chan<- error) {
	fmt.Printf("Start processing %d\n", data.ID)
	time.Sleep(100 * time.Millisecond)
	if data.ID % 29 == 0 {
		errors <- fmt.Errorf("error on job %v", data.ID)
	} else {
		fmt.Printf("Finish processing %d\n", data.ID)
	}
}

//// main.go

// Process
worker.PooledWorkError(allData)
    

Мы модифицировали метод обработки некоторых ошибок и добавили его в канал переданных ошибок. Таким образом для обработки ошибок в параллельной модели нам нужен канал для хранения данных о них. После того, как все задачи завершены, мы его проверяем. Объект error содержит идентификатор задачи, так что при необходимости мы можем обработать их снова.

Это лучшее решение, чем то, которое вообще не учитывало ошибки. Во второй части туториала рассмотрим, как сделать выделенный и надежный пакет рабочих пулов, который сможет обрабатывать параллельные задачи с ограниченным количеством Worker pool.

Заключение

Мы рассмотрели синхронный и асинхронный подходы, обработку ошибок, горутины и функционирование worker-ов. Модель конкурентности Golang достаточно мощна, чтобы просто построить решение Worker pool без особых накладных расходов, поэтому она не включена в стандартную библиотеку. Однако мы всегда можем создать собственное решение, которое соответствует нашим потребностям. Скоро будет следующая статья, а пока следите за обновлениями.

***

Дополнительные материалы:

Источники

МЕРОПРИЯТИЯ

Комментарии

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ