🏃 Конкурентность в Golang и WorkerPool [Часть 2]

В первой статье мы строили Worker Pool для оптимизации производительности. Во второй части мы создадим надежное решение для работы со структурами конкурентности.

Перевод публикуется с сокращениями, автор оригинальной статьи Ahad Hasan.

Горутины и каналы – это крутые языковые структуры, которые делают Golang мощным параллельным языком. Он отлично справляется с ограничениями использования ресурсов, что было продемонстрировано на простом примере в предыдущей публикации.

Для достижения цели можно использовать один общий канал. Давайте посмотрим, как все это реализовать.

Архитектура

Создаем пакет workerpool, который может обрабатывать задачи с воркерами на основе конкурентности. Рассмотрим структуру каталогов:

workerpool
├── pool.go
├── task.go
└── worker.go

Каталог workerpool находится в корне проекта. Теперь необходимо разобраться в терминах. Task – это самостоятельный элемент, который необходимо обработать. Worker – функция, обрабатывающая выполнение задачи. Pool фактически занимается созданием и управлением воркерами.

Реализация

Сначала напишем Task:

// workerpool/task.go

package workerpool

import (
	"fmt"
)

type Task struct {
	Err  error
	Data interface{}
	f    func(interface{}) error
}

func NewTask(f func(interface{}) error, data interface{}) *Task {
	return &Task{f: f, Data: data}
}

func process(workerID int, task *Task) {
	fmt.Printf("Worker %d processes task %v\n", workerID, task.Data)
	task.Err = task.f(task.Data)
}

Task содержит все необходимое для обработки задачи. Мы передаем ей Data и функцию f, которая должна быть выполнена, с помощью функции process. Функция f принимает Data в качестве параметра для обработки, а также храним возвращаемую ошибку. Давайте посмотрим, как Worker обрабатывает эти задачи:

// workerpool/worker.go

package workerpool

import (
	"fmt"
	"sync"
)

// Worker контролирует всю работу
type Worker struct {
	ID       int
	taskChan chan *Task
}

// NewWorker возвращает новый экземпляр worker-а
func NewWorker(channel chan *Task, ID int) *Worker {
	return &Worker{
		ID:       ID,
		taskChan: channel,
	}
}

// запуск worker
func (wr *Worker) Start(wg *sync.WaitGroup) {
	fmt.Printf("Starting worker %d\n", wr.ID)

	wg.Add(1)
	go func() {
		defer wg.Done()
		for task := range wr.taskChan {
			process(wr.ID, task)
		}
	}()
}

В коде Worker принимает идентификатор воркера и канал, в который должны быть записаны задачи. В методе Start входящие задачи распределяются по taskChan для обработки внутри goroutine.

Worker Pool

Мы реализовали Task и Worker для обработки задач, но здесь есть недостающая часть – порождение воркеров и отправка им заданий. Всем этим должен заведовать Worker Pool.

// workerpoo/pool.go

package workerpool

import (
	"fmt"
	"sync"
	"time"
)

// Pool воркера
type Pool struct {
	Tasks   []*Task

	concurrency   int
	collector     chan *Task
	wg            sync.WaitGroup
}

// NewPool инициализирует новый пул с заданными задачами и

func NewPool(tasks []*Task, concurrency int) *Pool {
	return &Pool{
		Tasks:       tasks,
		concurrency: concurrency,
		collector:   make(chan *Task, 1000),
	}
}

// Run запускает всю работу в Pool и блокирует ее до тех пор, 
// пока она не будет закончена.
func (p *Pool) Run() {
	for i := 1; i <= p.concurrency; i++ {
		worker := NewWorker(p.collector, i)
		worker.Start(&p.wg)
	}

	for i := range p.Tasks {
		p.collector <- p.Tasks[i]
	}
	close(p.collector)

	p.wg.Wait()
}

Рабочий пул содержит все задачи, которые ему необходимо обработать, и принимает число в качестве входных данных, чтобы породить нужное количество горутин для одновременного выполнения задач. Он имеет буферизованный коллектор каналов, общий для всех рабочих.

Когда мы запускаем этот Worker Pool, он порождает необходимое количество воркеров, которые используют общий коллектор каналов. Далее мы разбираем задачи, записываем их в канал и синхронизируем все с WaitGroup. Теперь, давайте проверим наше решение:

// main.go

package main

import (
	"fmt"
	"time"

	"github.com/Joker666/goworkerpool/workerpool"
)

func main() {
	var allTask []*workerpool.Task
	for i := 1; i <= 100; i++ {
		task := workerpool.NewTask(func(data interface{}) error {
			taskID := data.(int)
			time.Sleep(100 * time.Millisecond)
			fmt.Printf("Task %d processed\n", taskID)
			return nil
		}, i)
		allTask = append(allTask, task)
	}

	pool := workerpool.NewPool(allTask, 5)
	pool.Run()
}

Создали 100 тасков и используем 5 процессов для их обработки. Взглянем на результат:

Worker 3 processes task 98
Task 92 processed
Worker 2 processes task 99
Task 98 processed
Worker 5 processes task 100
Task 99 processed
Task 100 processed
Took ===============> 2.0056295s

Для обработки 100 задач нам потребуется две секунды, а если мы поменяем 5 на 10, то увидим, что для обработки всех задач потребуется всего около одной секунды.

Мы создали надежное решение для Worker Pool, которое может обрабатывать concurrency, хранить ошибки в задаче и отправлять данные для обработки. Этот подход является универсальным и не связан с конкретной реализацией. Мы можем использовать его и для решения более серьезных проблем.

Дальнейшее расширение: обработка задач в фоне

Попробуем расширить наше приложение: worker-ы продолжают ждать новые зaдачи в фоновом режиме, а мы отправляем им новые для обработки. Для этого нужно будет немного изменить Worker:

// workerpool/worker.go

// Worker контролирует всю работу
type Worker struct {
	ID       int
	taskChan chan *Task
	quit     chan bool
}

// NewWorker возвращает новый экземпляр worker-а
func NewWorker(channel chan *Task, ID int) *Worker {
	return &Worker{
		ID:       ID,
		taskChan: channel,
		quit:     make(chan bool),
	}
}

....

// StartBackground запускает worker-а в фоне
func (wr *Worker) StartBackground() {
	fmt.Printf("Starting worker %d\n", wr.ID)

	for {
		select {
		case task := <-wr.taskChan:
			process(wr.ID, task)
		case <-wr.quit:
			return
		}
	}
}

// Остановка quits для воркера
func (wr *Worker) Stop() {
	fmt.Printf("Closing worker %d\n", wr.ID)
	go func() {
		wr.quit <- true
	}()
}

Мы добавляем канал выхода и два новых метода в структуру Worker. StartBackgorund запускает бесконечный цикл for с select для чтения из taskChan и обработки задачи. Если StartBackgorund читает из данного канала, то данные возвращаются из функции. Метод Stop записывает данные в канал quit.

Вооружившись двумя новыми методами, добавим в Pool несколько новых штук:

// workerpool/pool.go

type Pool struct {
	Tasks   []*Task
	Workers []*Worker

	concurrency   int
	collector     chan *Task
	runBackground chan bool
	wg            sync.WaitGroup
}

// AddTask добавляет таски в pool
func (p *Pool) AddTask(task *Task) {
	p.collector <- task
}

// RunBackground запускает pool в фоне
func (p *Pool) RunBackground() {
	go func() {
		for {
			fmt.Print("⌛ Waiting for tasks to come in ...\n")
			time.Sleep(10 * time.Second)
		}
	}()

	for i := 1; i <= p.concurrency; i++ {
		worker := NewWorker(p.collector, i)
		p.Workers = append(p.Workers, worker)
		go worker.StartBackground()
	}

	for i := range p.Tasks {
		p.collector <- p.Tasks[i]
	}

	p.runBackground = make(chan bool)
	<-p.runBackground
}

// Stop останавливает запущенных в фоне worker-ов
func (p *Pool) Stop() {
	for i := range p.Workers {
		p.Workers[i].Stop()
	}
	p.runBackground <- true
}

Структура Pool теперь содержит воркеров и имеет канал runBackground, который помогает ему держаться на плаву. У нас появилось 3 новых метода. AddTask добавляет таску в коллектор канала.

Метод RunBackground работает бесконечно и порождает goroutine, чтобы поддерживать Pool живым вместе с каналом runBackground. Эта техника, позволяет запускать вечное выполнение чтения из пустого канала.

Метод Stop, останавливает воркеров и пишет в runBackground, чтобы завершить его. Посмотрим, как все это работает сейчас.

Если бы у нас был реальный пример из жизни, он работал бы вместе с HTTP-сервером и выполнял бы задачи. Повторим подобное поведение с бесконечным циклом и определенным условием:

// main.go

...

pool := workerpool.NewPool(allTask, 5)
go func() {
	for {
		taskID := rand.Intn(100) + 20

		if taskID%7 == 0 {
			pool.Stop()
		}

		time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
		task := workerpool.NewTask(func(data interface{}) error {
			taskID := data.(int)
			time.Sleep(100 * time.Millisecond)
			fmt.Printf("Task %d processed\n", taskID)
			return nil
		}, taskID)
		pool.AddTask(task)
	}
}()
pool.RunBackground()

После запуска кода, будет видно, что в очередь вставляется случайная задача, пока воркеры работают в фоновом режиме, но в итоге один из них эту задачу возьмет. Он остановится, когда будет выполнено соответствующее условие.

Вывод

Мы изучили, как можно построить надежное решение с Worker Pool из первой части цикла. Кроме того мы расширили возможности реализации пула, работающего в фоновом режиме для выполнения дальнейших входящих задач.

Источники

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ

Библиотека программиста
23 ноября 2018

Go vs Python: изучение основ языка Go в сравнении с Python

Это не соревнование двух языков, а просто еще один способ обучения. Рассмат...
admin
19 сентября 2018

TOП-3 языка программирования, которые нужно выучить до 2019

Это не просто три лучших языка программирования, а в некотором смысле попыт...