24 августа 2022

🧀🐁 Быстрый старт в распределенные вычисления: 7 фундаментальных концепций

Разработчик ПО (системы PDM/PLM) с 1993 года, компания "ИНТЕРМЕХ" (www.intermech.ru). В 2020-м успешно закончил курсы "Основы Data Science" (минская IT Academy) Референт-переводчик технической литературы с английского языка.
Семь концепций распределенных вычислений с примерами кода на Python: кластеры, планировщики, очевидная параллельность и так далее.
🧀🐁 Быстрый старт в распределенные вычисления: 7 фундаментальных концепций
Данная статья является переводом. Ссылка на оригинал.

Добро пожаловать во вселенную распределенности

Но есть и хорошие новости: вам не нужно знать все о распределенных вычислениях, чтобы начать.

Это похоже на отдых в другой стране, языка которой вы не знаете. Пожалуй, учиться разговаривать на таком уровне, чтобы поддерживать разговор о тонкостях местной политики еще до посадки в самолет, будет чересчур. Однако стоит выучить столько, чтобы передвигаться и просить о помощи, когда потребуется.

Эта статья описывает семь фундаментальных концепций, которые вам понадобятся, чтобы начать распределенные вычисления. Раннее усвоение этих базовых концепций сэкономит вам часы будущих исследований и дорогостоящих ошибок. Мы продемонстрируем эти концепции с помощью библиотеки Dask. Давайте перейдем к ним!

1. Отложенное (Lazy) вычисление значений

Отложенное вычисление значения (lazy evaluation) – это стратегия программирования, откладывающая вычисление значения выражения или переменной до тех пор, пока это значение не понадобится. Это антипод прямого, или немедленного вычисления, при котором значения вычисляются сразу, когда вызывается расчет. Отложенные вычисления позволяют повысить уровень оптимизации вычислений и сократить нагрузку на память, избегая ненужных повторений расчетов.

Давайте продемонстрируем это с помощью мысленного эксперимента. Познакомьтесь с Меркуцио, нашим Мышкетером Распределенных Вычислений:

<span>Изображение из </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a><span> (лицензия CC-BY 4.0)</span>
Изображение из twemoji (лицензия CC-BY 4.0)

Меркуцио примет участие в эксперименте. По условиям эксперимента, Меркуцио должен найти кратчайший путь в лабиринте к конечной цели: вкусному кусочку сыра Чеддер. Путь в лабиринте будет отмечаться хлебными крошками, чтобы Меркуцио знал, куда идти. Цель эксперимента – добраться до сыра как можно быстрее, поэтому Меркуцио будет терять по 1 очку за каждую съеденную крошку.

В сценарии 1 Меркуцио начинает идти (в компьютерных терминах – «вычислять») сразу же, когда вы положите первую крошку, и последует за вами от одной крошки до другой. При этом он, конечно, в конце концов доберется до своего Сыра Назначения. Но при этом он получит 5 отрицательных очков, по одной за каждую крошку, которую он прошел (и съел). Это верное решение задачи, но не оптимальное.

<span>Изображение автора, эмоджи от </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a>
Изображение автора, эмоджи от twemoji

А вот если бы Меркуцио задержал свой бег, пока вы не положите сыр, а потом оценил ситуацию, он сумел бы увидеть всю задачу целиком и нашел бы самый быстрый способ решить ее.

<span>Изображение автора, эмоджи от </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a>
Изображение автора, эмоджи от twemoji

Пример кода

Давайте продемонстрируем это с помощью кода, использующего Pandas (прямые вычисления) и Dask (отложенные вычисления). В этом коде мы создадим DataFrame, вызовем его и зададим вычисление по группировке (groupby) по заданному столбцу. Заметьте, что Pandas возвращает результат немедленно, тогда как Dask делает это лишь тогда, когда вы явно скомандуете ему начать вычисления.

        import pandas as pd
# Создаем dataframe
df = pd.DataFrame({
   "Name": ["Mercutio", "Tybalt", "Lady Montague"],
   "Age": [3, 2, 4],
   "Fur": ["Grey", "Grey", "White"]}
 )
# вызовем наш DataFrame
df
    
<span>Pandas немедленно возвращает содержимое Dataframe (изображение автора)</span>
Pandas немедленно возвращает содержимое Dataframe (изображение автора)
        # Зададим вычисление по groupby
df.groupby('Fur').Age.mean()
    
<span>Pandas немедленно возвращает результат группировки (изображение автора)</span>
Pandas немедленно возвращает результат группировки (изображение автора)

Мы видим, что Pandas немедленно выполняет каждую инструкцию, которую мы определяем. Как результат определения DataFrame, так и результат операции над groupby возвращаются немедленно. Именно такое поведение ожидается от Pandas, и оно вас устраивает, пока вы работаете над сравнительно небольшими наборами данных, полностью помещающимися в оперативную память вашего компьютера.

Проблема возникает в том случае, если ваш DataFrame содержит больше данных, чем может поместиться в память вашего компьютера. У Pandas нет другого выбора, кроме как попытаться загрузить эти данные в память… и потерпеть неудачу.

Вот почему библиотеки распределенных вычислений, вроде Dask, используют отложенное вычисление:

        import dask.dataframe as dd
# Превращаем df в dataframe Dask
dask_df = dd.from_pandas(df, npartitions=1)
# Вызываем этот dataframe (содержимое не возвращается)
dask_df
    
<span>Dask </span>«<span>отложенно</span>»<span> возвращает только схему dataframe, но не его содержимое (изображение автора)</span>
Dask «отложенно» возвращает только схему dataframe, но не его содержимое (изображение автора)
        # Задаем тот же расчет на основе groupby, что и прежде (не возвращает результатов)
dask_df.groupby('Fur').Age.mean()
    
<span>Dask отложенно возвращает лишь схему groupby, но не его результаты (изображение автора)</span>
Dask отложенно возвращает лишь схему groupby, но не его результаты (изображение автора)

Dask не возвращает результатов ни при вызове DataFrame, ни при определении вычисления groupby. Он возвращает только схему, или описание результата. Лишь тогда, когда мы явно вызываем compute(), Dask фактически производит вычисления и возвращает результаты. Таким образом, он может подождать, чтобы рассчитать оптимальный путь к результату, как это делал Меркуцио в нашем втором сценарии.

        # запустить вычисления
dask_df.groupby('Fur').Age.mean().compute()
    
<span>Изображение автора</span>
Изображение автора

Отложенные вычисления – это именно то, что позволяет библиотекам вроде Dask оптимизировать крупномасштабные вычисления, определяя «очевидно параллельные» (embarrassingly parallel) части расчета.

Больше полезных материалов вы найдете на нашем телеграм-канале «Библиотека программиста»

2. Очевидная параллельность

Термин «очевидно параллельные» (embarassingly parallel) применяется к расчетам или задачам, которые легко разбиваются на более мелкие задачи, каждую из которых можно рассчитывать независимо от других. Это означает, что между такими задачами нет зависимостей, и их можно рассчитывать параллельно или в любом порядке. Иногда такие задачи также называются «идеально параллельными» или «отлично параллелизуемыми».

<span>Изображение взято с </span><a href="https://freesvg.org/straight-lines-of-alternating-black-and-white-squares-illustration" target="_blank" rel="noopener noreferrer nofollow"><span>freesvg.org</span></a><span>, находится в публичном доступе</span>
Изображение взято с freesvg.org, находится в публичном доступе

Давайте вернемся к нашему мысленному эксперименту. К нашему старому другу Меркуцио присоединились еще два товарища, которые также примут участие в миссии поиска сыра с отложенными вычислениями. Мы проведем два разных эксперимента с нашими Тремя Мышкетерами.

Теперь у нас не один лабиринт, а целых три. В Эксперименте 1 кусок сыра чеддер разделен на равные части, и каждая часть положена в собственный лабиринт. В Эксперименте 2 весь кусок кладется в конце Лабиринта 3, а в Лабиринтах 1 и 2 лежат закрытые ящики, в которых лежат ключи к ящикам из следующих лабиринтов. Это значит, что Мышь 1 должна будет достать ключ в Лабиринте 1, передать его Мыши 2, которая передаст уже второй ключ Мыши 3. Цель обоих Экспериментов – съесть весь сыр целиком.

<span>Рисунок автора, эмоджи взяты из </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a>
Рисунок автора, эмоджи взяты из twemoji

Эксперимент 1 (выше) – пример очевидно параллельной задачи: каждый Мышкетер может разбираться со своим лабиринтом независимо от других, так что их общая задача (съесть весь сыр) будет выполнена параллельно.

Эксперимент 2 (ниже) – пример задачи, которую нельзя параллелизовать: решение каждой задачи зависит от решения предыдущей.

Пример кода

Широко встречающийся пример очевидно параллельной задачи – это цикл for: каждую итерацию цикла можно выполнять независимо от других. Более сложный пример – метод Монте-Карло: это моделирование, использующее повторение выборок случайной величины для оценки ее распределения. Каждая выборка генерируется независимо от других, и никак на них не влияет.

        # Простой цикл for - это очевидно параллельная задача
for i in range(0,5):
   x = i + 5
   print(x)
    

Еще один пример очевидно параллельной задачи – чтение разделенного Parquet-файла в DataFrame Dask:

        df = dd.read_parquet("test.parquet")
df.visualize()
    
<span>Изображение автора</span>
Изображение автора

Для сравнения, вот вычислительный граф при расчете groupby на том же DataFrame df:

<span><span>Изображение автора</span></span>
Изображение автора

Ясно, что это не очевидно параллельная задача: некоторые шаги графа зависят от результатов предыдущих шагов. Но это не значит, что она вообще не может быть параллелизована; Dask все еще может распараллелить части этого расчета, разделив ваши данные на разбиения (partitions).

3. Разбиения (partitions)

Разбиение – это логическое разделение данных, каждую часть которых можно обрабатывать независимо от других разбиений. Разбиения используются во многих областях сферы распределенных вычислений: файлы Parquet разделены на разбиения, так же как DataFrame в Dask и RDD в Spark. Такие пакеты данных иногда называются «кусками» (chunks).

В очевидно параллельном Эксперименте 1, приведенном выше, мы «разбили» цель нашего эксперимента (большой кусок сыра) на три независимых разбиения (или «куска»). Каждый Мышкетер мог после этого выполнить необходимую работу над своим разбиением, и они вместе достигали своей общей цели – съесть весь сыр.

<span>Рисунок автора, эмоджи взяты из </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a>
Рисунок автора, эмоджи взяты из twemoji

DataFrame Dask также разделены на разбиения. Каждое разбиение DataFrame Dask – это независимый DataFrame Pandas, который можно отправить для обработки отдельному обработчику (worker).

<span>Рисунок автора</span>
Рисунок автора

Когда вы пишете DataFrame Dask’а в Parquet, каждое разбиение DataFrame будет записано в отдельное разбиение Parquet.

        df = dask.datasets.timeseries(
 "2000-01-01",
 "2000-01-08",
 freq="1h",
 partition_freq="1d"
)

df.npartitions
>> 7

df
    
<span>Изображение автора</span>
Изображение автора
        # Пишем каждое разбиение dask в отдельное разбиение Parquet
df.to_parquet("test.parquet")
    

И чтобы собрать воедино все концепции, о которых мы говорили, загрузка разбитого файла Parquet в DataFrame Dask будет очевидно параллельной задачей, поскольку каждое разбиение Parquet можно загрузить в отдельное разбиение DataFrame независимо от других разбиений.

        df = dd.read_parquet("test")
df.visualize()
    
<span>Изображение автора</span>
Изображение автора

Графы задач вроде приведенных выше создаются и рассылаются планировщиком (scheduler).

4. Планировщики (schedulers)

Планировщик – это компьютерный процесс, управляющий распределением данных и синхронизацией вычислений, производимых с этими данными в вашей системе распределенных вычислений. Он следит за тем, чтобы данные обрабатывались эффективно и безопасно многими процессами одновременно, а также распределяет нагрузку на основе имеющихся ресурсов.

<span>Изображение взято с giphy.com</span>
Изображение взято с giphy.com

В простой очевидно параллельной задаче вроде распараллеливания цикла for, достаточно просто отследить, кто что должен делать. Но это становится намного труднее делать в задачах, содержащих миллионы или даже миллиарды строк данных. На какой машине лежит каждая часть данных? Как мы будем избегать дубликатов? Если мы допускаем дубликаты, какую копию мы будем использовать? Как мы соберем все разбиения снова в единое целое?

Возвращаясь к Меркуцио и его друзьям, давайте представим, что каждому из наших Трех Мышкетеров поручена своя независимая часть общего блюда: Меркуцио жарит лук, Тибальт натирает сыр, а Леди Монтекки тушит брокколи.

<span>Рисунок автора, эмоджи взяты из </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a>
Рисунок автора, эмоджи взяты из twemoji

А теперь представьте, что на нашей кухне работают не 3, а 30 мышей, и их деятельность надо четко синхронизировать и координировать, чтобы использовать имеющиеся кухонные принадлежности наилучшим образом. Если мы оставим эту координацию самим мышам, вскоре наступит хаос: каждая мышь слишком занята своей частью работы, независимой от других, чтобы иметь четкую картину общей ситуации и распределять задачи и ресурсы наиболее эффективно. Мыши будут требовать одни и те же сковородки и ножи, и скорее всего, блюдо не будет приготовлено вовремя к обеду.

Познакомьтесь с Мастер-Шефом.

<span>Эмоджи взят из </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a>
Эмоджи взят из twemoji

Мастер-Шеф (также известный как «Планировщик») хранит рецепт блюда, не терпящий отклонений, и будет распределять задачи и выделять ресурсы каждой готовящей мыши по необходимости. Когда индивидуальные компоненты блюда будут готовы, каждая мышь вернет результат своей работы Мастер-Шефу, который объединит их в готовый продукт.

<span>Рисунок автора, эмоджи взяты из </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a>
Рисунок автора, эмоджи взяты из twemoji

Существует множество различных видов планировщиков, и может быть полезно изучить доступные опции той платформы распределенных вычислений, которую вы собираетесь использовать. В зависимости от того, работаете вы на локальном или удаленном кластере, планировщики могут быть отдельными процессами на одной машине либо полностью автономными компьютерами.

Планировщики – это центральная часть любого кластера распределенных вычислений.

5. Кластеры

Кластер – это группа компьютеров или компьютерных процессов, работающих вместе как единое целое. Кластеры – это основа архитектуры любой системы распределенных вычислений.

Независимо от конкретной реализации архитектуры кластера, все кластеры имеют общие элементы: клиентов, планировщиков и рабочих.

Клиент – это то место, где вы пишете код, содержащий инструкции для вычисления. В случае Dask – это ваша сессия iPython или Jupyter Notebook (или любая среда, в которой вы пишете и запускаете свой код Python).

Планировщик – это компьютерный процесс, координирующий вашу систему распределенных вычислений. Для локального кластера на вашем ноутбуке это просто отдельный процесс Python. Для большого кластера суперкомпьютеров или удаленного облачного кластера планировщик часто располагается на отдельном компьютере.

Рабочие – это компьютерные процессы, выполняющие фактическую работу, запуская вычисления над разбиениями данных. В локальном кластере на вашем ноутбуке каждый рабочий – это процесс, занимающий одно из ядер вашего процессора. В удаленном кластере каждый рабочий зачастую представляет собой отдельную (возможно, виртуальную) машину.

<span>Изображение взято с </span><a href="http://www.dask.org" target="_blank" rel="noopener noreferrer nofollow"><span>dask.org</span></a>
Изображение взято с dask.org

Кластеры могут существовать либо локально, на одной машине, либо удаленно, распределенными между несколькими различными (возможно, виртуальными) машинами, либо на облачном сервере. Таким образом, кластер может состоять из:

  • Нескольких ядер внутри одной машины;
  • Нескольких машин внутри одного физического пространства (высокопроизводительный суперкомпьютер);
  • Нескольких виртуальных машин, распределенных в физическом пространстве (облачный кластер).

6. Улучшение (Scaling Up) против расширения (Scaling Out)

При работе в мире распределенных вычислений вы будете часто слышать, как люди используют термины «улучшение» (Scaling Up) и «расширение» (Scaling Out). Это обычно относится к различиям между использованием локального и распределенного кластеров. Улучшение означает использование большего количества локальных ресурсов, а расширение – использование большего количества удаленных ресурсов.

<span>Рисунок автора, эмоджи взяты из </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a>
Рисунок автора, эмоджи взяты из twemoji

Например, переход от вычислений с помощью pandas (использующей только одно ядро вашего компьютера) к использованию локального кластера Dask – это пример улучшения. А перенос тех же вычислений с pandas на удаленный кластер Dask через Coiled – это пример расширения.

        # РАСШИРЕНИЕ вычислений на облачный кластер

# Запустить облачный кластер через Coiled
import coiled
cluster = coiled.Cluster(n_workers=20)

# подключить кластер к Dask
from dask.distributed import Client
client = Client(cluster)

# запустить вычисления над более чем 40 Гб данных на удаленном облачном кластере
ddf = dd.read_parquet('s3://coiled-datasets/timeseries/20-years/parquet/')
ddf.groupby('name').x.mean().compute()
    

Улучшение может также включать переход с CPU на GPU, то есть «аппаратное ускорение». Dask может использовать cudf вместо pandas, чтобы перенести операции над вашими DataFram’ами на ваши GPU, чтобы ускорить эти операции во много раз. (Спасибо Джейкобу Томлинсону за уточнение этого вопроса).

7. Параллельные и распределенные вычисления

Различие между параллельными и распределенными вычислениями состоит в том, используют ли вычислительные процессы одну и ту же память.

<span>Рисунок автора, эмоджи взяты из </span><a href="https://twemoji.twitter.com/" target="_blank" rel="noopener noreferrer nofollow"><span>twemoji</span></a>
Рисунок автора, эмоджи взяты из twemoji

Параллельные вычисления выполняют задачи, используя множество процессоров, разделяющих одну и ту же память. Эта разделяемая память необходима, поскольку отдельные процессы работают вместе над одной и той же задачей. Параллельные вычислительные системы ограничены количеством процессоров, способных работать с разделяемой памятью.

С другой стороны, распределенные вычисления выполняют задачи на разных компьютерах, не имеющих общей памяти; эти компьютеры общаются друг с другом путем передачи сообщений. Это означает, что общая задача разбивается на несколько отдельных задач, центральный планировщик объединяет результаты всех индивидуальных задач и возвращает общий результат пользователю. Распределенные вычислительные системы теоретически можно расширять до бесконечности.

Поехали!

Семь концепций, представленных и разобранных в этой статье, дали вам необходимую базу для высадки в Великой Вселенной Распределенных Вычислений. Теперь для вас настало время пристегнуть ракетный ранец и продолжать исследования самостоятельно.

Отличный путь для серьезного исследователя возможностей распределенных вычислений – это изучение Dask Tutorial. Это обучающий курс, который вы можете пройти за 1-2 часа в удобном для вас темпе. Спасибо за внимание!

***

Материалы по теме

Источники

МЕРОПРИЯТИЯ

Комментарии

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ