Как работать с обратными вызовами и потоками в Kotlin

Базовые инструменты асинхронного программирования в Kotlin. Разбираем особенности обратных вызовов и потоков на примерах кода.

Асинхронность в разработке описывает возникновение событий, которые не зависят от главного потока приложения, и способы взаимодействия с ними без блокировки на ожидание.

В языках программирования без встроенной поддержки асинхронности её реализуют с помощью двух шаблонов: обратных вызовов и конструкций future или promise. Обратные вызовы – базовые примитивы, а future в асинхронном программировании опираются на них.

Например, тип Java 5 Future не поддерживает ожидание завершения без блокировки. Вы используете только метод get и ждёте. Однако в Java 8 добавили расширенный тип CompletableFuture с методом whenComplete, который устанавливает обратный вызов для ожидания выполнения без блокировки, что подходит для асинхронного программирования.

Kotlin корутины поддерживают асинхронность, дают приостановку без блокировки. А также интегрируются с асинхронными библиотеками в JVM через обратные вызовы.

Однократный обратный вызов

Рассмотрим гипотетический интерфейс Operation с методом для асинхронного выполнения операции. Он принимает в качестве параметра обратный вызов, чтобы сообщить о завершении либо с результирующим значением, либо с ошибкой:

interface Operation<T> {
    fun performAsync(callback: (T?, Throwable?) -> Unit)
}

Определим приостанавливающую функцию-расширение в Kotlin для выполнения операции без блокировки с использованием suspendCoroutine из стандартной библиотеки:

suspend fun <T> Operation<T>.perform(): T =
    suspendCoroutine { continuation ->
        performAsync { value, exception ->
            when {
                exception != null -> // ошибка операции 
                    continuation.resumeWithException(exception)
                else -> // успешно, есть значение
                    continuation.resume(value as T)
            }
        }
    }

Обратите внимание, что этот performхолодный источник значений. Он бездействует до запуска и после возвращения, так как ожидает завершения задачи посредством обратного вызова.

Отменяемая операция

В инженерной практике поощряется предоставление некоторых средств отмены в асинхронном API. Например, в Operation добавьте метод cancel для этой цели:

interface Operation<T> {
    fun performAsync(callback: (T?, Throwable?) -> Unit)
    fun cancel() // отменяет текущую операцию
}

Теперь определим perform как отменяемую функцию приостановки с использованием suspendCancellableCoroutine из библиотеки kotlinx.coroutines:

suspend fun <T> Operation<T>.perform(): T =
    suspendCancellableCoroutine { continuation ->
        performAsync { /* ... как раньше ... */ }
        continuation.invokeOnCancellation { cancel() }
    }

Многократный обратный вызов

Но что, если Operation передаёт асинхронный поток значений и использует указанный обратный вызов больше одного раза? Нужен сигнал о её завершении. Для нашего примера предположим, что это происходит путём запуска обратного вызова со значением null.

Не используйте такие Operation с функциями наподобие suspendCoroutine, чтобы не получить IllegalStateException при попытке возобновить работу во второй раз. Приостановка и продолжение выполнения в Kotlin – однократные.

Поток Kotlin спешит на помощь. Его разработали для представления холодного асинхронного потока с несколькими значениями. Используйте функцию callbackFlow для преобразования многократного обратного вызова в поток:

fun <T : Any> Operation<T>.perform(): Flow<T> =
    callbackFlow {
        performAsync { value, exception ->
            when {
                exception != null -> // ошибка операции
                    close(exception)
                value == null -> // операция успешна
                    close()
                else -> // есть значение
                    offer(value as T)
            }
        }
        awaitClose { cancel() }
    }

Взгляните на ряд отличий. Так, perform больше не функция приостановки. Сам по себе ничего не ждёт. И возвращает холодный Flow. Код внутри блока callbackFlow {...} не запускается до сбора потока вызывающей функцией конечной операции.

Как и прежде, performAsync устанавливает обратный вызов, но теперь вместо Continuation работаем с горячим SendChannel, который открыт для получения данных. Таким образом, функция offer вызывается для каждого значения, а close – для сообщения об ошибке или успешном завершении. Здесь awaitClose заменяет invokeOnCancellation и выполняет функцию приостановки блока внутри callbackFlow в то время, когда поступают данные.

Обратное давление

Что произойдёт, если performAsync поставляет значения в функцию обратного вызова быстрее, чем собирающая корутина обрабатывает? В дело вступает обратное давление, которое постоянно возникает при операциях с асинхронными потоками данных. Буфер хранит значения, но при его переполнении offers возвращает false, и они исчезают. Смотрите, как избежать потерь или контролировать их.

Замените offer(value) на sendBlocking(value). В этом случае поток, запускающий обратный вызов, блокируется при переполнении буфера, пока в нём не появится место. Это типичный способ сигнализировать о появлении обратного давления в большинстве устаревших потоковых API на основе обратных вызовов. И он гарантирует, что никакое значение не потеряется.

При лимитированном количестве данных скорость поступления будет ниже. Тогда используйте оператор buffer для настройки неограниченного размера буфера путём добавления вызова .buffer(Channel.UNLIMITED) после callbackFlow {...}. В таком случае offer возвращает true, поэтому значения никуда не исчезают, и нет блокировки. Тем не менее рискуем исчерпать память данными из буфера.

Объединение

Нередко поток значений представляет собой частичный результат операции или обновление состояния, так что интересует только последняя величина. Поэтому безопасно объединить данные с использованием оператора conflate в результирующем потоке, который гарантирует, что offer возвращает true, а сборщик видит последнее значение, даже когда отбросили (соединили) промежуточные.

Реактивные потоки

Когда исходный источник асинхронных данных выступает реактивным потоком, соответствующим спецификации, то для преобразования типа Publisher в Kotlin Flow используйте встроенную функцию-расширение Publisher.asFlow из модуля kotlinx-coroutines-reactive. Не изобретайте велосипед.

С какими нюансами обратных вызовов и потоков в Kotlin сталкивались вы?

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ