Перевод публикуется с сокращениями, автор оригинальной статьи Ahad Hasan.
Горутины и каналы – это крутые языковые структуры, которые делают Golang мощным параллельным языком. Он отлично справляется с ограничениями использования ресурсов, что было продемонстрировано на простом примере в предыдущей публикации.
Для достижения цели можно использовать один общий канал. Давайте посмотрим, как все это реализовать.
Архитектура
Создаем пакет workerpool, который может обрабатывать задачи с воркерами на основе конкурентности. Рассмотрим структуру каталогов:
Каталог workerpool находится в корне проекта. Теперь необходимо разобраться в терминах. Task – это самостоятельный элемент, который необходимо обработать. Worker – функция, обрабатывающая выполнение задачи. Pool фактически занимается созданием и управлением воркерами.
Реализация
Сначала напишем Task:
Task содержит все необходимое для обработки задачи. Мы передаем ей Data и функцию f, которая должна быть выполнена, с помощью функции process. Функция f принимает Data в качестве параметра для обработки, а также храним возвращаемую ошибку. Давайте посмотрим, как Worker обрабатывает эти задачи:
В коде Worker принимает идентификатор воркера и канал, в который должны быть записаны задачи. В методе Start входящие задачи распределяются по taskChan для обработки внутри goroutine.
Worker Pool
Мы реализовали Task и Worker для обработки задач, но здесь есть недостающая часть – порождение воркеров и отправка им заданий. Всем этим должен заведовать Worker Pool.
Рабочий пул содержит все задачи, которые ему необходимо обработать, и принимает число в качестве входных данных, чтобы породить нужное количество горутин для одновременного выполнения задач. Он имеет буферизованный коллектор каналов, общий для всех рабочих.
Когда мы запускаем этот Worker Pool, он порождает необходимое количество воркеров, которые используют общий коллектор каналов. Далее мы разбираем задачи, записываем их в канал и синхронизируем все с WaitGroup. Теперь, давайте проверим наше решение:
Создали 100 тасков и используем 5 процессов для их обработки. Взглянем на результат:
Для обработки 100 задач нам потребуется две секунды, а если мы поменяем 5 на 10, то увидим, что для обработки всех задач потребуется всего около одной секунды.
Мы создали надежное решение для Worker Pool, которое может обрабатывать concurrency, хранить ошибки в задаче и отправлять данные для обработки. Этот подход является универсальным и не связан с конкретной реализацией. Мы можем использовать его и для решения более серьезных проблем.
Дальнейшее расширение: обработка задач в фоне
Попробуем расширить наше приложение: worker-ы продолжают ждать новые зaдачи в фоновом режиме, а мы отправляем им новые для обработки. Для этого нужно будет немного изменить Worker:
Мы добавляем канал выхода и два новых метода в структуру Worker. StartBackgorund запускает бесконечный цикл for с select для чтения из taskChan и обработки задачи. Если StartBackgorund читает из данного канала, то данные возвращаются из функции. Метод Stop записывает данные в канал quit.
Вооружившись двумя новыми методами, добавим в Pool несколько новых штук:
Структура Pool теперь содержит воркеров и имеет канал runBackground, который помогает ему держаться на плаву. У нас появилось 3 новых метода. AddTask добавляет таску в коллектор канала.
Метод RunBackground работает бесконечно и порождает goroutine, чтобы поддерживать Pool живым вместе с каналом runBackground. Эта техника, позволяет запускать вечное выполнение чтения из пустого канала.
Метод Stop, останавливает воркеров и пишет в runBackground, чтобы завершить его. Посмотрим, как все это работает сейчас.
Если бы у нас был реальный пример из жизни, он работал бы вместе с HTTP-сервером и выполнял бы задачи. Повторим подобное поведение с бесконечным циклом и определенным условием:
После запуска кода, будет видно, что в очередь вставляется случайная задача, пока воркеры работают в фоновом режиме, но в итоге один из них эту задачу возьмет. Он остановится, когда будет выполнено соответствующее условие.
Вывод
Мы изучили, как можно построить надежное решение с Worker Pool из первой части цикла. Кроме того мы расширили возможности реализации пула, работающего в фоновом режиме для выполнения дальнейших входящих задач.
Комментарии