Изучаете Java и Kotlin? Тогда вам будет полезно узнать о различных типах синхронизации, блокировки и обеспечения безопасности потоков.
☕ Подтянуть свои знания по Java вы можете на нашем телеграм-канале «Библиотека Java для собеса»
Что такое синхронизация?
В многопоточном мире нам необходим доступ к объектам между потоками, при отсутствии синхронизации могут возникнуть проблемы. Вот базовый пример, который объясняет, почему это нужно:
import kotlinx.coroutines.* fun main() = runBlocking { var sharedCounter = 0 val scope = CoroutineScope(newFixedThreadPoolContext(4, "synchronizationPool")) scope.launch { val coroutines = 1.rangeTo(1000).map { launch { for(i in 1..1000){ sharedCounter++ } } } coroutines.forEach { corotuine-> corotuine.join() } }.join() println("The number of shared counter should be 10000000, but actually is $sharedCounter") }
Здесь мы запускаем 1000 корутинов (можете считать их легковесными потоками) в 4 потоках, и каждый из них увеличивает величину sharedCounter 1 000 раз, так, чтобы в итоге оно стало равным 1 000 000. Шансов, что у вас получится это сделать, практически нет.
Прежде чем мы объясним происходящее, представьте, что есть одна комната для размышлений (counter) и очередь из людей (threads), где каждый хочет зайти внутрь, при этом, она рассчитана только на одного человека. В этой комнате есть дверь, которую закрывают, если кто-то уже внутри. Но есть проблема − прямо сейчас эту дверь можно открыть снаружи, потому что в ней нет замка.
В приведенном выше коде для увеличения значения sharedCounter каждый поток пытается сделать следующее:
- Получить текущее значение.
- Сохранить его во временной переменной и увеличьте временную переменную на 1.
- Сохранить значение временной переменной в sharedCounter.
Но что если пока один поток получает текущее значение, подключится другой и тоже попытается получить текущее значение? Они оба получат одно и то же значение. Таким образом, каждый из них увеличивает это значение на 1 и сохраняет его.
Эта проблема может возникать и в других ситуациях: например, когда поток проходит второй шаг, другие потоки увеличивают и сохраняют значение sharedCounter, а при переходе на следующий шаг, первый поток сохраняет неактуальную версию. Тогда итоговое значение будет не таким, каким должно.
Чтобы решить эту проблему, нужно синхронизировать потоки для работы над этим значением. Далее рассказываем, как это сделать.
🧩☕ Интересные задачи по Java для практики можно найти на нашем телеграм-канале «Библиотека задач по Java»
Volatile
В Java и Kotlin есть ключевое слово volatile (в Kotline в виде аннотации @volatile), которые применяются к полям и гарантируют, что считываемое значение поступает из основной памяти, а не из кэша процессора, поэтому все участники процесса будут ожидать окончания параллельной записи, прежде чем считать значение.
import kotlinx.coroutines.* @Volatile var sharedCounter: Int = 0 fun main() = runBlocking { val scope = CoroutineScope(newFixedThreadPoolContext(4, "synchronizationPool")) scope.launch { val coroutines = 1.rangeTo(1000).map { launch { for (i in 1..1000) { sharedCounter++ } } } coroutines.forEach { corotuine -> corotuine.join() } }.join() println("The number of shared counter is $sharedCounter") }
Как видно из кода, в нашем сценарии volatiles не помогают, потому что volatile не спасает от проблемы считывания устаревшего значения (между чтением переменной и записью нового значения в одном потоке может произойти действие в другом). Дело в том, что ++ здесь − не атомарная операция.
Volatiles используются для создания потоко-безопасных синглетонов в Java путем двойной проверки экземпляра синглтона. Подробнее об этих методах написано здесь.
Synchronized
Одним из решений является использование synchronized из Java. В Java и Kotlin есть два типа синхронизации − синхронизированные методы и синхронизированные блоки.
Для понимания − синхронизированные методы обозначается, как synchronized в Java и @Synchronized в Kotlin.
Чтобы использовать это решение, мы можем перенести увеличение счётчика в отдельную функцию, помеченную как synchronized.
import kotlinx.coroutines.* var sharedCounter = 0 @Synchronized fun updateCounter(){ sharedCounter++ } fun main() = runBlocking { val scope = CoroutineScope(newFixedThreadPoolContext(4, "synchronizationPool")) scope.launch { val coroutines = 1.rangeTo(1000).map { launch { for(i in 1..1000){ updateCounter() } } } coroutines.forEach { corotuine-> corotuine.join() } }.join() println("The number of shared counter is $sharedCounter") }
Как видим, выходное значение верное. Если провести аналогию с комнатами, то @Synchronized будет являться замком на двери, который открывается единственным существующим ключом. Один человек (поток) берёт ключ, заходит, закрывается изнутри. Никто другой не может зайти в этот момент. Только после того, как ключ вернут на место, это станет возможным. Именно так работает synchronized в Java и Kotlin.
Следует отметить ещё один момент, который касается синхронизированных методов: доступ ко всему методу ограничен, даже если происходят промежуточные действия, не требующие синхронизации. Например, при использовании synchronized, updateCounter синхронизируется даже тогда, когда sharedCounter не обновляется.
Синхронизация может происходить и на более низком уровне, например, с помощью синхронизированных блоков. Давайте попробуем увеличивать число только тогда, когда наш номер итерации четный.
import kotlinx.coroutines.* class Incrementor() { var sharedCounter: Int = 0 private set fun updateCounterIfNecessary(shouldIActuallyIncrement: Boolean) { if (shouldIActuallyIncrement) { synchronized(this) { sharedCounter++ } } } } fun main() = runBlocking { val incrementor = Incrementor() val scope = CoroutineScope(newFixedThreadPoolContext(4, "synchronizationPool")) scope.launch { val coroutines = 1.rangeTo(1000).map { launch { for (i in 1..1000) { incrementor.updateCounterIfNecessary(it % 2 == 0) } } } coroutines.forEach { corotuine -> corotuine.join() } }.join() println("The number of shared counter is ${incrementor.sharedCounter}") }
Синхронизированные блоки хороши тем, что позволяют использовать любой объект в качестве замка.
В приведенном выше коде, synchronized определяет this (экземпляр Instance) как объект блокировки. Любой поток, который достигает этой точки, блокирует Incrementor, выполняет работу в блоке и снимает блокировку synchronized.
Атомарные примитивы
Атомарные примитивы позволяют синхронизировать действия над переменными базовых типов. Например, в примере ниже класс AtomicInteger предоставляет свою реализацию функции ++.
import kotlinx.coroutines.* import java.util.concurrent.atomic.AtomicInteger class Incrementor() { val sharedCounter: AtomicInteger = AtomicInteger(0) fun updateCounterIfNecessary(shouldIActuallyIncrement: Boolean) { if (shouldIActuallyIncrement) { sharedCounter.incrementAndGet() } } fun getSharedCounter():Int { return sharedCounter.get() } } fun main() = runBlocking { val incrementor = Incrementor() val scope = CoroutineScope(newFixedThreadPoolContext(4, "synchronizationPool")) scope.launch { val coroutines = 1.rangeTo(1000).map { launch { for (i in 1..1000) { incrementor.updateCounterIfNecessary(it % 2 == 0) } } } coroutines.forEach { corotuine -> corotuine.join() } }.join() println("The number of shared counter is ${incrementor.getSharedCounter()}") }
Lock
Класс Lock выполняет примерно те же функции, что и synchronized, только более гибко. synchronized даёт возможность синхронизировать блоки кода, тогда как с Lock можно реализовывать более сложную логику.
Примерно так:
import kotlinx.coroutines.* import java.util.concurrent.locks.ReentrantLock class Incrementor() { private val sharedCounterLock = ReentrantLock() var sharedCounter: Int = 0 private set fun updateCounterIfNecessary(shouldIActuallyIncrement: Boolean) { if (shouldIActuallyIncrement) { try { sharedCounterLock.lock() sharedCounter++ } finally { sharedCounterLock.unlock() } } } } fun main() = runBlocking { val incrementor = Incrementor() val scope = CoroutineScope(newFixedThreadPoolContext(4, "synchronizationPool")) scope.launch { val coroutines = 1.rangeTo(1000).map { launch { for (i in 1..1000) { incrementor.updateCounterIfNecessary(it % 2 == 0) } } } coroutines.forEach { corotuine -> corotuine.join() } }.join() println("The number of shared counter is ${incrementor.sharedCounter}") }
Благодаря «замкам» можно, например, организовать последовательную блокировку-разблокировку объектов LinkedList или ввести иерархическую блокировку. Взаимные блокировки − большая область для отдельного изучения.
Кроме того, классы Lock предоставляют удобные функции, например Lock.tryLock(long, TimeUnit), которая попытается получить блокировку над объектом, но не будет ожидать вечность.
Средства параллелизма
Существуют также и другие классы для обеспечения параллельности и синхронизации. Рассмотрим на примерах.
1. Semaphore
Semaphore, как и замки, могут быть использованы для синхронизации. Lock (можно рассматривать его как мьютекс) используется, когда нужно что-то сделать линейным и атомарным способом.
Semaphore принимают общее количество разрешений в своем конструкторе, которое может использоваться для ограничения одновременного доступа к ресурсу. Они используются тогда, когда требуются сигналы. Существует некоторый паттерн: producer − consumer. Одни потоки ведут запись и подают об этом сигнал. Другие следят за состоянием сигналов, только чтобы осуществлять чтение данных.
Посмотрите на пример кода ниже: сейчас мы адаптируем его так, чтобы можно было использовать semaphore. Хотя в данном случае можно найти и более удачные способы обеспечения синхронизации.
import kotlinx.coroutines.* import java.util.concurrent.Semaphore class Incrementor() { private val sharedCounterLock = Semaphore(1) var sharedCounter: Int = 0 private set fun updateCounterIfNecessary(shouldIActuallyIncrement: Boolean) { if (shouldIActuallyIncrement) { try { sharedCounterLock.acquire() sharedCounter++ } finally { sharedCounterLock.release() } } } } fun main() = runBlocking { val incrementor = Incrementor() val scope = CoroutineScope(newFixedThreadPoolContext(4, "synchronizationPool")) scope.launch { val coroutines = 1.rangeTo(1000).map { launch { for (i in 1..1000) { incrementor.updateCounterIfNecessary(it % 2 == 0) } } } coroutines.forEach { corotuine -> corotuine.join() } }.join() println("The number of shared counter is ${incrementor.sharedCounter}") }
Ещё, функция semaphore обеспечивает справедливую очередность исполнения потоков, если задать второй параметр в конструкторе − fair. Тогда поток, ожидающий очереди дольше всего, получит доступ первым. То есть, поддерживается FIFO очередь. Однако использование tryAcquire напрямую не сохраняет эту «честность». Если требуется соблюдение правила FIFO, можно писать tryAcquire(0, TimeUnit.SECONDS).
2. CyclicBarrier
CyclicBarrier используется, когда есть фиксированное количество участников − потоков или методов, ожидающих окончания выполнения процесса каждым из участников для продолжения работы.
Представим, что у нас есть несколько исполнителей, генерирующих по одному числу, и мы хотим суммировать все числа после окончания работы.
import java.util.* import java.util.concurrent.CyclicBarrier fun main() { val createdValues = mutableListOf<Int>() /** * create a cyclic barrier that waits for 5 threads to finish their jobs, and after that, * prints the sum of all the values. */ val cyclicBarrier = CyclicBarrier(3) { println("Sum of all values is ${createdValues.sum()}") } val threads = 1.rangeTo(3).map { number -> Thread { Thread.sleep(Random().nextInt(500).toLong()) createdValues.add(number) cyclicBarrier.await() println("I am thread ${Thread.currentThread().name} and I finished my Job!") }.apply { start() } } threads.forEach { thread -> thread.join() } }
В приведенном примере, блок кода, который был передан конструктору CyclicBarrier, запускается после выполнения 3 потоками await(). После выполнения действия CyclicBarrier потоки продолжают работать с тех мест, где остановились.
Еще один момент: createdValues не является потоко-безопасным, и для параллельной работы его следует заменить на synchronizedList, речь о котором пойдёт далее. Если пара потоков попытается одновременно добавить новый элемент в список, то один из них может быть утерян!
3. Параллельные коллекции
Существует несколько способов обработки синхронизированных коллекций в Java.
- Collections.synchronizedList(List<K>) является классом, который может использоваться для выполнения регулярных операций со списками (add, addAll, get, set, ...) методом синхронизации. Существуют также аналогичные реализации Map, Set и другие.
- CopyOnWriteArrayList может использоваться для обеспечения поточно-ориентированных операций модификации в List. Любая операция изменения сначала копирует весь список в локальную переменную с помощью итератора, выполняет действие, а затем заменяет его исходным списком. Эта коллекция примечательна тем, что обход списка не требует синхронизации.
- HashTable обеспечивает синхронизированный доступ к Map. Следует отметить, что синхронизируются только отдельные функции Map. Например, put не в их числе, поэтому безопасность потока на всей карте не гарантируется.
- ConcurrentHashMap<K,V> можно использовать для обеспечения безопасности потоков во всех методах HashMap. Важно отметить, что операции чтения (например, get) не блокируют Map целиком.
Deadlock
Код ниже не работает, а точнее никогда не остановится. Догадаетесь почему?
data class Human(val name:String) { @Synchronized fun sayHi(to: Human){ println("$name saying hi to ${to.name}") Thread.sleep(500) to.sayHiBack(this) } @Synchronized fun sayHiBack(to: Human){ println("$name saying hi back to ${to.name}") } } fun main() { val adam = Human("adam") val eve = Human("eve") val adamThread = Thread { adam.sayHi(eve) }.apply { start() } val eveThread = Thread { eve.sayHi(adam) }.apply { start() } adamThread.join() eveThread.join() }
У нас есть два человека − Адам и Ева. Когда один говорит другому привет, второй вынужден ответить ему тем же. Когда поток Адама запускается, он пытается сказать «привет» Еве, блокируя собственный объект с помощью @Synchronized и ждёт 500 мс. Тем временем начинается поток Евы, по той же схеме. По истечении 500мс, Адам вызывает Eve.sayHiBack, то есть пытается получить блокировку ещё и над объектом eve. Ева также ждёт освобождения доступа к adam.sayHiBack. Таким образом, оба потока просто останавливаются в бесконечном ожидании.
При написании синхронизированного кода нужно подумать обо всех возможных ситуациях в блокировках-разблокировках. Вы можете создать такую же взаимную блокировку, как в примере выше, используя другие средства синхронизации, замки.
Заключение
В этой статье были рассмотрены различные методы синхронизации и обеспечения потокобезопасности, коллекции для параллельной работы и вспомогательные классы Java и Kotlin.
Если этот материал был для вас интересным, посмотрите похожие:
- Советы и трюки для программирования на языке Kotlin
- Подробно разбираем популярные функции Kotlin
- ТОП-25 трюков, советов и лучших практ ик программирования на Java
Источник: Синхронизация, потоковая безопасность, блокирование в Java и Kotlin на Pro Android Dev
Комментарии