Как работать с обратными вызовами и потоками в Kotlin
Базовые инструменты асинхронного программирования в Kotlin. Разбираем особенности обратных вызовов и потоков на примерах кода.
Асинхронность в разработке описывает возникновение событий, которые не зависят от главного потока приложения, и способы взаимодействия с ними без блокировки на ожидание.
В языках программирования без встроенной поддержки асинхронности её реализуют с помощью двух шаблонов: обратных вызовов и конструкций future
или promise
. Обратные вызовы – базовые примитивы, а future
в асинхронном программировании опираются на них.
Например, тип Java 5 Future
не поддерживает ожидание завершения без блокировки. Вы используете только метод get
и ждёте. Однако в Java 8 добавили расширенный тип CompletableFuture
с методом whenComplete
, который устанавливает обратный вызов для ожидания выполнения без блокировки, что подходит для асинхронного программирования.
Kotlin корутины поддерживают асинхронность, дают приостановку без блокировки. А также интегрируются с асинхронными библиотеками в JVM через обратные вызовы.
Однократный обратный вызов
Рассмотрим гипотетический интерфейс Operation
с методом для асинхронного выполнения операции. Он принимает в качестве параметра обратный вызов, чтобы сообщить о завершении либо с результирующим значением, либо с ошибкой:
interface Operation<T> { fun performAsync(callback: (T?, Throwable?) -> Unit) }
Определим приостанавливающую функцию-расширение в Kotlin для выполнения операции без блокировки с использованием suspendCoroutine
из стандартной библиотеки:
suspend fun <T> Operation<T>.perform(): T = suspendCoroutine { continuation -> performAsync { value, exception -> when { exception != null -> // ошибка операции continuation.resumeWithException(exception) else -> // успешно, есть значение continuation.resume(value as T) } } }
Обратите внимание, что этот perform
– холодный источник значений. Он бездействует до запуска и после возвращения, так как ожидает завершения задачи посредством обратного вызова.
Отменяемая операция
В инженерной практике поощряется предоставление некоторых средств отмены в асинхронном API. Например, в Operation
добавьте метод cancel
для этой цели:
interface Operation<T> { fun performAsync(callback: (T?, Throwable?) -> Unit) fun cancel() // отменяет текущую операцию }
Теперь определим perform
как отменяемую функцию приостановки с использованием suspendCancellableCoroutine
из библиотеки kotlinx.coroutines
:
suspend fun <T> Operation<T>.perform(): T = suspendCancellableCoroutine { continuation -> performAsync { /* ... как раньше ... */ } continuation.invokeOnCancellation { cancel() } }
Многократный обратный вызов
Но что, если Operation
передаёт асинхронный поток значений и использует указанный обратный вызов больше одного раза? Нужен сигнал о её завершении. Для нашего примера предположим, что это происходит путём запуска обратного вызова со значением null
.
Не используйте такие Operation
с функциями наподобие suspendCoroutine
, чтобы не получить IllegalStateException
при попытке возобновить работу во второй раз. Приостановка и продолжение выполнения в Kotlin – однократные.
Поток Kotlin спешит на помощь. Его разработали для представления холодного асинхронного потока с несколькими значениями. Используйте функцию callbackFlow
для преобразования многократного обратного вызова в поток:
fun <T : Any> Operation<T>.perform(): Flow<T> = callbackFlow { performAsync { value, exception -> when { exception != null -> // ошибка операции close(exception) value == null -> // операция успешна close() else -> // есть значение offer(value as T) } } awaitClose { cancel() } }
Взгляните на ряд отличий. Так, perform
больше не функция приостановки. Сам по себе ничего не ждёт. И возвращает холодный Flow
. Код внутри блока callbackFlow {...}
не запускается до сбора потока вызывающей функцией конечной операции.
Как и прежде, performAsync
устанавливает обратный вызов, но теперь вместо Continuation
работаем с горячим SendChannel
, который открыт для получения данных. Таким образом, функция offer
вызывается для каждого значения, а close
– для сообщения об ошибке или успешном завершении. Здесь awaitClose
заменяет invokeOnCancellation
и выполняет функцию приостановки блока внутри callbackFlow
в то время, когда поступают данные.
Обратное давление
Что произойдёт, если performAsync
поставляет значения в функцию обратного вызова быстрее, чем собирающая корутина обрабатывает? В дело вступает обратное давление, которое постоянно возникает при операциях с асинхронными потоками данных. Буфер хранит значения, но при его переполнении offers
возвращает false
, и они исчезают. Смотрите, как избежать потерь или контролировать их.
Замените offer(value)
на sendBlocking(value)
. В этом случае поток, запускающий обратный вызов, блокируется при переполнении буфера, пока в нём не появится место. Это типичный способ сигнализировать о появлении обратного давления в большинстве устаревших потоковых API на основе обратных вызовов. И он гарантирует, что никакое значение не потеряется.
При лимитированном количестве данных скорость поступления будет ниже. Тогда используйте оператор buffer
для настройки неограниченного размера буфера путём добавления вызова .buffer(Channel.UNLIMITED)
после callbackFlow {...}
. В таком случае offer
возвращает true
, поэтому значения никуда не исчезают, и нет блокировки. Тем не менее рискуем исчерпать память данными из буфера.
Объединение
Нередко поток значений представляет собой частичный результат операции или обновление состояния, так что интересует только последняя величина. Поэтому безопасно объединить данные с использованием оператора conflate
в результирующем потоке, который гарантирует, что offer
возвращает true
, а сборщик видит последнее значение, даже когда отбросили (соединили) промежуточные.
Реактивные потоки
Когда исходный источник асинхронных данных выступает реактивным потоком, соответствующим спецификации, то для преобразования типа Publisher
в Kotlin Flow
используйте встроенную функцию-расширение Publisher.asFlow
из модуля kotlinx-coroutines-reactive
. Не изобретайте велосипед.