🕸️ Пять типичных ошибок при проектировании интеграции с помощью Kafka

Плотно поработав с Apache Kafka, мы собрали список типичных ошибок, которые появляются во время проектирования и разработки продюсеров и консьюмеров. Эти ошибки не зависят от используемой платформы, могут встретиться где угодно и могут быть совершены любым участником, как со стороны процесса приёма, так и передачи.

Всем привет! На связи Антон Воробьёв — архитектор в Альфа-Банке. Мы достаточно плотно работаем с Apache Kafka и за это время собрали список типичных ошибок, которые появляются во время проектирования и разработки продюсеров и консьюмеров. Эти ошибки не зависят от используемой платформы, могут встретиться где угодно и могут быть совершены любым участником, как со стороны процесса приёма, так и передачи.

Если вы уже давно работаете с Apache Kafka, то много нового для себя не узнаете. Но если вы не так давно начали изучать Apache Kafka и столкнулись с неправильным использованием ключа партицирования, сайзингом топика, с ошибками в параметрах топика, с нарушением идемпотентности и т.п., то статья сэкономит вам время и силы.

Начнём с простого.

№1. Неправильное использование ключа партицирования

Ошибка влечёт за собой проблемы, связанные с консистентностью данных и производительностью.

Упрощенно, ключ партицирования в Apache Kafka можно представить как атрибут сообщения, который определяет, в какую партицию топика оно будет записано. Ключ будет частью гарантии того, что связанные сообщения будут обработаны в правильном порядке. Ключ позволяет равномерно распределять данные по партиции и обеспечивать атомарность операции при работе с транзакциями.

Экономия на правильной настройке ключей выливается в дальнейшие доработки. На моей памяти был показательный случай, когда одна команда решила упростить свою систему и зафиксировать ключ партицирования на константу — решили не разбираться, потому что топик и партиция в единственном экземпляре.

Неожиданно произошел рост нагрузки, на что добавили партиций и, как следствие, консьюмеров. Но возникла проблема — из-за неверного, а точнее, фиксированного ключа партицирования, все сообщения попадали только в одну партицию. А так как к партиции может быть привязан только один консьюмер, то система не масштабировалась и осталась фактически однопоточной.

Чтобы избежать подобных ошибок:

  1. Стоит разобраться в сути ключа партиции и использовать его осознанно.
  2. Будьте аккуратнее с пустыми или null-ключами: когда вы его не используете, клиент Kafka отправляет сообщения в партиции рандомно.
  3. Используйте стабильные и уникальные идентификаторы ключа, например, Customer ID.

№2. Неверный сайзинг топика

Сайзинг топика — это процесс, где вы определяете оптимальное количество партиций, политики хранения и другие параметры для топика. Сайзинг топика необходим, чтобы избежать неожиданных сюрпризов на бою.

Самое главное, зачем нужен сайзинг, так это для того, чтобы ваши данные хранились в необходимом объёме и необходимое количество времени.

Kafka можно представить как бесконечный пергамент, на который пишутся сообщения, а хвост постоянно подрезается. С подрезанным хвостом Kafka поступает абсолютно беспощадно — если вы превышаете лимиты по сроку или объёму хранения, то автоматически теряете хвост данных.

Для примера расскажу про случай в проде. Как-то на январских праздниках с 7:00 до 7:02 утра одна система отправила 60 000 сообщений для списания комиссии. Но ответа не последовало, проводки не прошли, а сообщения пропали бесследно.

В чем причина? Оказалось, что из-за регламентных работ задержали запуск консьюмера и он запустился в 7:03 вместо 7:00. Политика по объёму была настроена на 10 Мб и когда объём был превышен, брокер удалил старые сообщения, захватив часть новых, пока консьюмер не работал.

Если бы консьюмер запустился в 7:00, как и планировалось, то заблокировал бы этот сегмент данных и не дал бы Kafka его удалить. Но консьюмер не работал и Kafka ликвидировала сегмент в течение нескольких секунд.

После этого консьюмер пытался читать партицию с последней запомненной точки, но не смог этого сделать, потому что offset уже не валиден (его подрезали). Соответственно, он убежал в конец партиции (присутствовали такие настройки) и часть важных сообщений была потеряна.

Вывод: правильно настраивайте параметры хранения топика, иначе можете столкнуться с чем-то подобным.

Чтобы избежать ошибки…

№1. Не привязывайте алгоритмы к числу партиций:

  1. Слишком малое количество партиций ведет к ограничению параллелизма.
  2. Излишнее партицирование тратит ресурсы процессора впустую: следуйте интеграционной документации Kafka по количеству партиций.

№2. Важно рассчитывать параметры политик хранения (retention). Количество партиций должно быть достаточно для минимального горизонтального масштабирования:

  1. retention.bytes должен быть достаточной глубины хранения,
  2. retention.bytes в идеале должен позволять хранить все сообщения за период retention.ms,
  3. отслеживайте распределение сообщений по партициям.

Также стоит учитывать возможный «взрывной» краткосрочный рост нагрузки при инициирующих отправках (первичная или повторная после сбоя). Естественно, в разумных рамках, которые обычно зависят от SLA консьюмеров, потому что если вы в текущий топик отправите очень большое количество сообщений, а SLA консьюмер такой, что читает сообщения очень медленно, то вы просто приостановите текущую онлайн-работу на некоторое время. Следовательно, здесь нужно искать баланс или выделить отдельные топики для массовых переотправок.

♾️ Библиотека девопса
Больше полезных материалов вы найдете на нашем телеграм-канале «Библиотека devops’a»

№3. Ошибки в параметрах топика

С неймингом некоторых параметров происходит интересная ситуация. Дело в том, что названия параметров совершенно не соответствуют их сути. Для примера рассмотрим параметр max.message.bytes. На первый взгляд кажется, что он содержит максимальный размер одного сообщения.

Но это не так. Параметр связан с ProduceRequest — сообщением-контейнером, которое продюсер отправляет на ноду брокера. В сообщении может содержаться много различных батчей (пакетов сообщений), направляемых в различные партиции и топики. А max.message.bytes — это предельный размер одного из этих батчей внутри ProduceRequest.

Но и это не всё! Если max.message.bytes — это настройка со стороны топика, то со стороны клиента у нас есть batch.size. Это параметр, указывающий максимальный размер батча в байтах, который можно формировать данному продюсеру в конкретную партицию топика.

Два данных параметра связаны важным условием: batch.size не должен превышать max.message.bytes. Если превысить, то можно увидеть сообщение Message size too large.

К сожалению, такой случай в проде у нас тоже был один раз из-за некорректных настроек топика.

Теперь тема посложнее.

№4. «Идемпотентность есть гарантия порядка»

Откуда пошло убеждение, что «идемпотентность = порядок»? Как мне кажется, из статей двухминуток на тему «Как нам сохранить порядок сообщений в Kafka?». В целом там пишут правильные вещи, но не делают акцент на скоупе, на области действия.

Объясню: когда у сервиса есть идемпотентность и он делает ретраи, то вы получаете реализацию семантики exactly-once (один раз отправили — один раз доставили):

  1. Представим, что сеть «моргнула», продюсер отправил пачку сообщений, не получил ответа о её доставке и произвёл отправку снова.
  2. Если сообщение ранее получено и обработано брокером, то он просто отбросит вторую пачку как дублирующее сообщение. Если же первая пачка сообщений не была получена, то обработает её.

Вы можете спросить, а причём здесь вообще порядок? Где он здесь?

Дело в том, что включение механизма идемпотентности (enable.idempotence=true) автоматически ведёт к трём условиям.

№1. acks=-1. Означает, что ответ продюсеру от брокера отдаётся только после подтверждения фиксации сообщения как минимум по количеству in-sync-реплик. Например, наш общебанковской кластер Kafka состоит из трёх узлов, и min.insync.replicas = 2. Это значит, что минимум две ноды должны ответить о том, что сообщение принято и обработано, и только после этого можно сказать, что пачка доставлена.

№2. Ретраи стремятся к бесконечности: retries = INT_MAX.

№3. max.in.flight.requests.per.connection не выше 5. Это необходимо, чтобы продюсер не отправлял одновременно более 5 батчей без ответа (доставлены, не доставлены, ошибка доставки). Число 5 взято из-за того, что Kafka использует окно последовательности (sequence window) именно такого размера для отслеживания корректности порядка следования. Размер фиксирован — прибит гвоздями. Это внутренний механизм Kafka, с которым ничего нельзя поделать. Как следствие, получается, что в этом окне у вас не должно быть батчей, идущих не по порядку, иначе, к сожалению, мы увидим ошибку доставки.

Кажется, можно сделать вывод, что всё это нам не позволяет нарушать порядок? Ведь у нас нет дубликатов, а значит, мы ничего не теряем, а консьюмер принёс сообщение один раз и в необходимой последовательности. Никаких проблем нет?

Но они есть:

  1. Дело в том, что гарантии порядка работают и даются только в рамках одной партиции одного экземпляра активного продюсера. Два продюсера не могут гарантировать ни идемпотентность, ни порядок.
  2. По нашим нефункциональным требованиям продюсеры и консьюмеры могут быть в любой момент перезапущены с сопровождением: могут быть перезапущены с текущей точки, могут быть перезапущены с точки в прошлом (с точки системного журнала в прошлом текущего опердня).
  3. При возникновении инфраструктурных проблем сообщение может оказаться не доставленным, и тогда оно также выпадает из этих гарантий. Надо знать, что с ним дальше делать.
  4. Кроме того, producer exactly-once semantic не гарантирует порядок прочтений, то есть должен быть ещё и consumer exactly-once semantic.
  5. Установка max.in.flight.requests.per.connection = 1 без consumer EOS тоже не поможет сохранить идеальный порядок (и идемпотентность тоже).

Теперь немного про вторую сторону вопроса…

№5. Неприятности, связанные с консьюмерами и консьюмер-группами

Что такое консьюмер-группа (consumer-группа)? Это механизм для горизонтального масштабирования потребления данных из топиков.

Технически это специальный топик, который называется _consumer_offsets, с большим количеством партиций (обычно 40+). В топик консьюмеры пишут сообщения для сохранения позиции чтения (своего оффсета).

Ключом сообщения является склейка из имени консьюмер-группы, топика и номера партиции → (group_id, topic, partition). В значении же хранится оффсет, метка времени, и метаданные, которые меняются от версии к версии → (offset, timestamp, metadata).

Есть два неприятных момента, связанных с консьюмер-группами.

№1. Важно понимать, что у топика может быть много консьюмер-групп. Технически возможно читать без использования консьюмер-групп, но тогда придётся самостоятельно реализовывать специфическую логику управления этим курсором.

№2. Если допущены ошибки в этих алгоритмах, можно перегрузить брокер, что вызовет проблемы у других потребителей. Поэтому у нас в банке с Kafka запрещено работать без консьюмер-групп.

А как консьюмеры управляют своими оффсетами? Есть определённые хитрости, но всё сводится к тому, что консьюмеры могут писать свои оффсеты не на каждое сообщение — могут писать, а могут и не писать. Они обычно делают это в течение какого-то интервала времени или через какое-то количество сообщений.

Почему? Потому что коммит — очень медленная операция.

В позапрошлом году мы проводили некоторые технические тесты и выявили, что разрыв между асинхронным коммитом раз в 5 секунд (автокоммит) и синхронным коммитом на каждое сообщение разница составляет порядка 45 раз (прочитали в 45 раз медленнее) по астрономическому времени.

Консьюмеры работают с оффсетами по-разному:

  1. Есть автоматический коммит оффсета раз в 5 секунд (enable.auto.commit=true).
  2. Есть ручной коммит, когда в коде вызывают определённую функцию: это может быть как синхронный вызов, так и асинхронный. Синхронное (commitSync) ручное управление — блокирующий вызов, оффсет сохранится только после успешной обработки брокером. Асинхронное (commitAsync) ручное управление — нет гарантии, что следующий коммит не перезапишет предыдущий по ошибке.
  3. Также существует смешанный режим «синхронно-асинхронно», когда оба типа можно использовать одновременно.

Самое важное, что необходимо помнить — консьюмер может упасть или перезапуститься в любой момент времени и не успеть закоммитить свой оффсет коммитов, а коммит оффсета при определённых проблемах может быть отвергнут координатором: брокер его отвергнет и вы потеряете свою текущую позицию.

Гарантий, что вы 100% никогда не потеряете свой оффсет в такой системе нет.

Поэтому при рассмотрении решения бизнес-задачи перед нами появляется дилемма — когда и как консьюмер будет выполнять коммит оффсета.

  1. До или после обработки сообщений?
  2. Синхронно, асинхронно или смешанно?

Фактически, это выбор между скоростью и надежностью. Мы сами выбираем надежность. Поэтому в АБС Equation, для максимального соответствия семантике поведения IBM MQ коммит делается на каждое сообщение. Мы используем самый медленный вариант, тем самым получаем гарантию at most once, потому что опасаемся повторных обработок.

Был интересный случай в проде, связанный с консьюмингом. Дело было так: система получала батч сообщений в количестве 300 штук, и на каждое полученное сообщение вызывала некий REST-сервис внешней системы. Причём нужно было это делать специально с очень низким RPS — 10–20 сообщений в секунду. Всё шло нормально, но при старте консьюмера заметили, что он всё получает и получает эти 300 сообщений и раз за разом вызывает внешний сервис с одними и теми же данными. Зациклился.

Произошло то, что называется ложная ребалансировка.

Дело в том, что по архитектуре консьюмеров они должны отправлять координатору консьюмер-групп сообщения хартбиты (heartbeat). Если координатор их не получает, то считает консьюмер мёртвым. По документации Kafka эти хартбиты отправляются только во время полинга сообщений — так спроектированы клиентские библиотеки.

Случилось так, что обработка 300 сообщений заняла много времени, что превысило тот самый тайм-аут сессии. И тогда:

  1. координатор консьюмер-групп посчитал консьюмер мёртвым,
  2. когда консьюмер пытался закоммитить и отправил коммит на брокер, тот его отверг, потому что консьюмер считался мёртвым,
  3. Kafka-клиент переподключился,
  4. получил заново стартовый оффсет и начал всё заново,
  5. получил старый номер,
  6. начал отправлять сообщения повторно
  7. повторить.

Этот цикл повторялся много раз.

В Java Kafka Client есть нерекомендуемый способ как отправить heartbeat вне полинга, но в librdkafka и его врапперах для других ЯП (Go/Rust/Python/…) такой возможности нет.

Вместо заключения

Apache Kafka — мощный и гибкий инструмент для построения распределённых систем обмена сообщениями. Но, как и любой сложный инструмент, она требует глубокого понимания внутренних механизмов и внимательного подхода к проектированию.

Ошибки, которые мы рассмотрели, объединяет одно: они проявляются не сразу, а в момент нагрузки, сбоя или масштабирования. И именно поэтому они так опасны — система может работать месяцами, пока однажды не «выстрелит» в самый неподходящий момент.

Главный вывод: проектируйте систему так, чтобы она корректно обрабатывала не только happy path, но и всевозможные сценарии сбоёв. Потому что в распределённых системах сбои — это не исключение, а норма.

Если статья помогла вам избежать хотя бы одной из описанных ошибок — значит, время на её написание потрачено не зря.

Спасибо за внимание!

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ

eFusion
08 января 2020

11 типов современных баз данных: краткие описания, схемы и примеры БД

Любые данные где-то хранятся. Будь это интернет вещей или пароли в *nix. По...
admin
23 февраля 2017

SQL за 20 минут

Предлагаем вашему вниманию статью с кричащим названием "SQL за 20 минут". К...