Go

🏃 Параллельное программирование в Go

Изучаем основы параллельного программирования в Go, а также пытаемся разобраться на примерах, почему конкурентность в Go – это не совсем параллелизм.

Перевод публикуется с сокращениями, автор оригинальной статьи Stefan Nilsson.

Для начала необходимо разобраться, как писать базовые программы на Go. Освежим знания, чтобы помочь себе быстрее освоиться.

Основы

The Go Playground – интерактивный веб-сервис, который позволяет запускать в песочнице небольшие программы в духе «Hello world!». Попробуйте!

        package main

import "fmt"

func main() {
    fmt.Println("Hello, world!")
}
    

Изучите основы Go

A Tour of Go – еще один интерактивный учебник с кучей примеров. Он берет начало на официальном сайте и обучает вас основам программирования Go в браузере.

Установите инструменты Go

В Getting Started объясняется, как установить инструменты Go. Доступны бинарные пакеты для FreeBSD, Linux, Mac OS X и Windows, а также инструкции по развертыванию и настройке.

Начните проект Go

How to Write Go Code посвящен разработке простых пакетов Go. Он рассказывает про организацию и тестирование кода, а также про использование команд fetch, build и install.

Горутины

Вы можете создать новый поток (горутину) с помощью оператора go. Все горутины в одной программе используют одно и то же адресное пространство.

        go list.Sort() //запускается list.Sort параллельно, без ожидания
    

Программа выводит сообщение «Hello from main goroutine». Она также может напечатать «Hello from another goroutine», в зависимости от того, какая из двух горутин завершится первой.

        func main() {
    go fmt.Println("Hello from another goroutine")
    fmt.Println("Hello from main goroutine")

    // В этот момент выполнение программы останавливается и убиваются все
    // активные горутины
}
    

Следующая программа скорее всего выведет «Hello from main goroutine» и «Hello from another goroutine». Они могут появиться в любом порядке. Еще одна особенность заключается в том, что вторая горутина работает очень медленно и не печатает сообщение до завершения программы.

        func main() {
    go fmt.Println("Hello from another goroutine")
    fmt.Println("Hello from main goroutine")

    time.Sleep(time.Second) // дадим другой гороутине время завершиться
}
    

Вот более реалистичный пример, где определяется функция, которая использует concurrency для отсрочки события:

        // Publish печатает текст в stdout по истечении заданного времени.
// Он не блокируется и сразу же возвращается.
func Publish(text string, delay time.Duration) {
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
    }() // Обратите внимание на круглые скобки. Мы должны вызвать 
        // анонимную функцию.
}
    

Вот как вы можете использовать функцию Publish:

        func main() {
    Publish("A goroutine starts a new thread.", 5*time.Second)
    fmt.Println("Let’s hope the news will published before I leave.")

    // Дожидаемся публикации новостей
    time.Sleep(10 * time.Second)

    fmt.Println("Ten seconds later: I’m leaving now.")
}
    

Скорее всего программа напечатает три строки в заданном порядке с пятисекундными перерывами между ними.

        $ go run publish1.go
Let’s hope the news will published before I leave.
BREAKING NEWS: A goroutine starts a new thread.
Ten seconds later: I’m leaving now.
    

Невозможно реализовать ожидание потоков в процессе «сна», но есть метод синхронизации – использование каналов.

Реализация

Горутины имеют небольшой вес и стоят немногим больше, чем выделение места в стеке. Место в куче выделяется и освобождается по мере необходимости.

Внутри горутины действуют как корутины, которые мультиплексируются между несколькими потоками операционной системы. Если одна горутина блокирует поток ОС, например, ожидая ввода, другие горутины в этом потоке будут мигрировать, чтобы продолжать работать.

Каналы обеспечивают синхронизированную связь

Каналы – это механизм, с помощью которого горутины синхронизируют выполнение и обмениваются данными, передавая значения.

Новое значение канала можно задать с помощью встроенной функции make.

        // небуферизованный канал int-ов
ic := make(chan int)

// буферизованный канал на 10 строк
sc := make(chan string, 10)
    

Чтобы отправить значение в канал, используйте бинарный оператор «<-», а для получения – унарный оператор.

        ic <- 3   // отправляем 3 в канал
n := <-sc // получаем строку из канала
    

Оператор задает направление канала на отправку или получение. По умолчанию канал является двунаправленным.

        chan Sushi    // может использоваться для отправки и получения значений типа Sushi
chan<- string // может использоваться только для отправки строк
<-chan int    // может использоваться только для получения int
    

Буферизованные и небуферизованные каналы

  • Если пропускная способность канала равна нулю или отсутствует, канал не буферизуется и отправитель блокируется до тех пор, пока получатель не получит значение.
  • Если канал имеет буфер, отправитель блокируется только до тех пор, пока значение не будет скопировано в буфер. Если буфер заполнен, ждем пока какой-либо получатель не получит значение.
  • Приемники всегда блокируются, пока не появятся данные для приема.
  • Отправка или получение с nil-канала блокируется навсегда.

Закрытие канала

Функция закрытия помечает, что никакие значения больше не будут отправляться по каналу. Обратите внимание, что закрывать канал необходимо только в том случае, если приемник этого ожидает.

  • После вызова close и после получения любых ранее отправленных значений, операции приема вернут нулевое значение без блокировки.
  • Операция приема множества значений дополнительно возвращает состояние канала.
  • Отправка в закрытый канал или его закрытие, а также закрытие nil-канала, вызовут run-time panic.
        ch := make(chan string)
go func() {
    ch <- "Hello!"
    close(ch)
}()

fmt.Println(<-ch) // напечатает «Hello!»
fmt.Println(<-ch) // выведет нулевое значение «» без блокировки
fmt.Println(<-ch) // еще раз напечатает «»
v, ok := <-ch     // v - это «», ok – false

// получать значения от ch до закрытия
for v := range ch {
    fmt.Println(v) // не выполнится
}
    

Пример

В следующем примере функция Publish вернет канал, который используется для броадкастинга сообщения после публикации текста:

        // Publish напечатает текст в stdout по истечении заданного времени.
// Когда текст будет опубликован, закрываем канал, который на «паузе».
func Publish(text string, delay time.Duration) (wait <-chan struct{}) {
	ch := make(chan struct{})
	go func() {
		time.Sleep(delay)
		fmt.Println(text)
		close(ch)
	}()
	return ch
}
    

Обратите внимание: мы используем канал пустых структур для указания, что канал будет использоваться только для сигнализации, а не для передачи данных. Выглядит это так:

        wait := Publish("important news", 2 * time.Minute)
// выполним что-нибудь
<-wait // в блоке, пока текст не будет опубликован
    

Select ожидает группы каналов

Оператор select одновременно ожидает нескольких операций отправки или получения.

  • Оператор блокируется до тех пор, пока одна из операций не будет разблокирована.
  • Если выполняется несколько операций, то одна из них будет выбрана случайным образом.
        // блокируется до тех пор, пока данные не появятся в ch1 или ch2
select {
case <-ch1:
    fmt.Println("Received from ch1")
case <-ch2:
    fmt.Println("Received from ch2")
}
    

Операции отправки и приема в nil-канале блокируются навсегда. Это можно использовать для отключения канала в инструкции select:

        ch1 = nil // отключает этот канал
select {
case <-ch1:
    fmt.Println("Received from ch1") // не произойдет
case <-ch2:
    fmt.Println("Received from ch2")
}
    

Вариант по умолчанию

Вариант по умолчанию будет выполнен, если все остальные заблокированы.

        // никогда не заблокируется
select {
case x := <-ch:
    fmt.Println("Received", x)
default:
    fmt.Println("Nothing available")
}
    

Примеры

Бесконечная случайная двоичная последовательность

В качестве примера можно использовать случайный выбор вариантов, которые могут генерировать случайные биты.

        rand := make(chan int)
for {
    select {
    case rand <- 0: // no statement
    case rand <- 1:
    }
}
    

Операция блокировки по таймауту

Функция time.After входит в стандартную библиотеку. Она ожидает истечения указанного времени, а затем отправляет текущее время в возвращаемый канал:

        select {
case news := <-AFP:
    fmt.Println(news)
case <-time.After(time.Minute):
    fmt.Println("Time out: No news in one minute")
}
    

Оператор select блокируется до тех пор, пока по крайней мере один case не сможет выполниться. С нулевыми кейсами этого никогда не произойдет:

        select {}
    

Гонки данных

Гонка данных происходит, когда две горутины одновременно обращаются к одной и той же переменной и хотя бы одно из обращение является записью.

Такая ситуация возникает часто и может усложнить отладку.

Показанная ниже функция приводит к гонке данных, и ее поведение не определено – она может, например, напечатать число 1. Попробуем выяснить, как это происходит:

        func race() {
    wait := make(chan struct{})
    n := 0
    go func() {
        n++ // чтение, увеличение, запись
        close(wait)
    }()
    n++ // конфликтующий доступ
    <-wait
    fmt.Println(n) // Вывод: <unspecified>
}
    

Две горутины g1 и g2, участвуют в гонке, и нет никакого способа узнать, в каком порядке будут выполняться операции. Ниже приведен один из нескольких возможных вариантов:

Как избежать гонки данных?

Единственный способ избежать гонки – синхронизировать доступ ко всем mutable-данным, которые используются потоками совместно. Есть несколько способов добиться этого. В Go обычно используется канал или блокировка (низкоуровневые механизмы доступны в пакетах sync и sync/atomic).

Предпочтительный способ обработки одновременного доступа к данным в Go – использовать канал для передачи данных от одной горутины к следующей.

        func sharingIsCaring() {
    ch := make(chan int)
    go func() {
        n := 0 // Локальная переменная видна только для первой горутины
        n++
        ch <- n // Данные отправляются из первой горутины
    }()
    n := <-ch // ...и благополучно прибывают во вторую
    n++
    fmt.Println(n) // Вывод: 2
}
    

В этом коде канала происходят два события:

  • передаются данные от одной горутины к другой – точка синхронизации;
  • отправляющая горутина будет ждать, пока другая получит данные и наоборот.
Модель памяти Go довольно сложна: переменная в одной горутине может гарантированно наблюдать значения, полученные при записи в ту же переменную в другой горутине, но до тех пор, пока вы делитесь всеми mutable-данными между горутинами по каналам, вы защищены от гонки данных.

Как обнаружить гонку данных?

Гонки данных могут легко появляться, но обнаружить их трудно. К счастью среда выполнения Go может помочь и в этом. Используйте ключ -race для включения встроенного детектора гонки данных.

        $ go test -race [packages]
$ go run -race [packages]
    

Пример

Программа с гонкой данных:

        package main
import "fmt"

func main() {
    i := 0
    go func() {
        i++ // запись
    }()
    fmt.Println(i) // конкурентное чтение
}
    

Запуск этой программы с параметром -race покажет нам, что существует гонка между записью в строке 7 и чтением в строке 9:

        $ go run -race main.go
0
==================
WARNING: DATA RACE
Write by goroutine 6:
  main.main.func1()
      /tmp/main.go:7 +0x44

Previous read by main goroutine:
  main.main()
      /tmp/main.go:9 +0x7e

Goroutine 6 (running) created at:
  main.main()
      /tmp/main.go:8 +0x70
==================
Found 1 data race(s)
exit status 66
    

Подробности

Детектор гонки не выполняет никакого статического анализа. Он проверяет доступ к памяти во время выполнения только для фактически работающего кода.

Он работает на darwin/amd64, freebsd/amd64, linux/amd64 и Windows/amd64.

Накладные расходы варьируются, но обычно происходит увеличение использования памяти в 5-10 раз и увеличение времени выполнения в 2-20 раз.

Как отлаживать deadlock-и

Дэдлоки возникают, когда горутины ждут друг друга и ни одна из них не может завершиться.

Взглянем на пример:

        func main() {
	ch := make(chan int)
	ch <- 1
	fmt.Println(<-ch)
}
    

Программа застрянет на операции отправки, ожидая вечно, пока кто-то прочитает значение. Go способен обнаруживать подобные ситуации во время выполнения. Вот результат нашей программы:

        fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
	.../deadlock.go:7 +0x6c
    

Советы по отладке

Горутина может застрять:

  • когда она ждет канал;
  • либо когда она ждет одну из блокировок в пакете sync.

Общие причины:

  • ни одна горутина не имеет доступа к каналу или блокировке;
  • горутины ждут друг друга.
Сейчас Go обнаруживает только зависание всей программы в целом, а не когда застревает некое подмножество горутин.

С помощью каналов легко понять, что вызвало дедлок. С другой стороны, интенсивно использующие мьютексы программы могут быть заведомо трудными для отладки.

Ожидание горутин

Группа sync.WaitGroup ожидает завершения работы группы горутин:

        var wg sync.WaitGroup
wg.Add(2)
go func() {
    // Do work.
    wg.Done()
}()
go func() {
    // Do work.
    wg.Done()
}()
wg.Wait()
    
  • сначала основная горутина вызывает Add, чтобы установить количество ожидающих горутин;
  • затем запускаются две новые горутины и вызывают Done при завершении.

В то же время Wait используется для блокировки до тех пор, пока эти две горутины не завершатся.

Замечание: группа ожидания не должна копироваться после первого использования.

Трансляция сигнала по каналу

В этом примере функция Publish возвращает канал, который используется для передачи сигнала при публикации сообщения.

        // печать текста по истечении заданного времени
// когда это будет выполнено, канал ожидания будет закрыт
func Publish(text string, delay time.Duration) (wait <-chan struct{}) {
    ch := make(chan struct{})
    go func() {
        time.Sleep(delay)
        fmt.Println("BREAKING NEWS:", text)
        close(ch) // трансляция на все приемники
    }()
    return ch
}
    

Обратите внимание, что мы используем канал пустых структур: struct{}. Это явно указывает на то, что канал предназначен только для сигнализации, а не для передачи данных.

Вот как можно это использовать:

        func main() {
    wait := Publish("Channels let goroutines communicate.", 5*time.Second)
    fmt.Println("Waiting for news...")
    <-wait
    fmt.Println("Time to leave.")
}
    
        Waiting for news...
BREAKING NEWS: Channels let goroutines communicate.
Time to leave.
    

Как убить горутину

Чтобы горутина остановилась, ей необходимо прослушивать сигнал остановки на выделенном выходном канале и проверять его.

        quit := make(chan bool)
go func() {
    for {
        select {
        case <-quit:
            return
        default:
            // …
        }
    }
}()
// …
quit <- true
    

Вот более полный пример, где используется один канал как для передачи данных, так и для сигнализации:

        // генератор возвращает канал, который производит числа 1, 2, 3…
// чтобы остановить основную горутину, необходимо отправить 
// номер этому каналу
func Generator() chan int {
    ch := make(chan int)
    go func() {
        n := 1
        for {
            select {
            case ch <- n:
                n++
            case <-ch:
                return
            }
        }
    }()
    return ch
}

func main() {
    number := Generator()
    fmt.Println(<-number)
    fmt.Println(<-number)
    number <- 0           // остановка основной горутины
    fmt.Println(<-numberм) // ошибка, больше никто не отправляет
   
}
    
        1
2
fatal error: all goroutines are asleep - deadlock!
    

Timer и Ticker

Таймеры и тикеры позволяют выполнять код по расписанию один или несколько раз.

Timeout (Timer)

time.After ожидает в течение заданного промежутка, а затем отправляет текущее время по возвращаемому каналу:

        select {
case news := <-AFP:
	fmt.Println(news)
case <-time.After(time.Hour):
	fmt.Println("No news in an hour.")
}
    

time.Timer не будет обработан сборщиком мусора до тех пор, пока таймер не сработает. Используйте time.NewTimer вместо вызова метода Stop, когда таймер больше не нужен:

        for alive := true; alive; {
	timer := time.NewTimer(time.Hour)
	select {
	case news := <-AFP:
		timer.Stop()
		fmt.Println(news)
	case <-timer.C:
		alive = false
		fmt.Println("No news in an hour. Service aborting.")
	}
}
    

Repeat (Ticker)

time.Tick возвращает канал, который обеспечивает тиканье часов с четными интервалами:

        go func() {
	for now := range time.Tick(time.Minute) {
		fmt.Println(now, statusUpdate())
	}
}()
    

time.Ticker не будет обработан сборщиком мусора до тех пор, пока таймер не сработает. Используйте time.NewTicker вместо вызова метода Stop, когда тикер больше не нужен:

        func Foo() {
    timer = time.AfterFunc(time.Minute, func() {
        log.Println("Foo run for more than a minute.")
    })
    defer timer.Stop()

    // Do heavy work
}
    

Блокировка взаимного исключения (мьютекс)

Иногда удобнее синхронизировать доступ к данным с помощью явной блокировки, а не с помощью каналов. Стандартная библиотека Go предлагает для этой цели блокировку взаимного исключения sync.Mutex.

Используйте с осторожностью

Чтобы этот тип блокировки был безопасным, крайне важно, чтобы все обращения к общим данным выполнялись только тогда, когда горутина находится в блокировке. Одной ошибки в одной горутине достаточно, чтобы ввести гонку данных и сломать программу.

Из-за этого вам следует подумать о разработке кастомной структуры данных с чистым API и убедиться, что вся синхронизация выполняется внутри.

В этом примере мы создаем безопасную и простую в использовании конкурентную структуру данных AtomicInt, в которой хранится integer. Любое количество горутин может безопасно получить доступ к этому числу с помощью методов Add и Value.

        // AtomicInt – это параллельная структура данных, содержащая int
// его значение равно 0
type AtomicInt struct {
    mu sync.Mutex // блокировка может удерживаться одной горутиной за раз
    n  int
}

// добавляет n к AtomicInt
func (a *AtomicInt) Add(n int) {
    a.mu.Lock() //  ждем пока блокировка освободится
    a.n += n
    a.mu.Unlock() // освобождение блокировки
}

// Value возвращает значение a
func (a *AtomicInt) Value() int {
    a.mu.Lock()
    n := a.n
    a.mu.Unlock()
    return n
}

func main() {
    wait := make(chan struct{})
    var n AtomicInt
    go func() {
        n.Add(1) // один доступ
        close(wait)
    }()
    n.Add(1) // другой конкурентный доступ
    <-wait
    fmt.Println(n.Value()) // 2
}
    

Заключение

Мы рассмотрели распространенные проблемы, относящиеся к конкурентности в Go. Это не весь материал по теме – остальное вам придется самостоятельно изучать на официальном сайте. Не ленитесь, развивайтесь и удачи в обучении!

Дополнительные материалы:



Источники

МЕРОПРИЯТИЯ

Комментарии

ВАКАНСИИ

Добавить вакансию

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ