January 31, 2016 · Go Translate

Маленькая книга о Go – Глава 6: Конкурентность

О книге

Go часто описывают, как дружелюбный к параллельному программированию язык. Причина этому заключается в предоставлении простого синтаксиса для двух мощных механизмов: горутин и каналов.

Горутины

Горутины похожи на потоки, но они управляются самим Go, а не операционной системой. Код, который запускается как горутина, может работать одновременно с другим кодом. Давайте посмотрим на пример:

package main

import (
  "fmt"
  "time"
)

func main() {
  fmt.Println("старт")
  go process()
  time.Sleep(time.Millisecond * 10) // это плохо, не делайте так!
  fmt.Println("готово")
}

func process() {
  fmt.Println("обработка")
}

Здесь происходит несколько интересных вещей, но самое главное то, как мы запускаем горутину. Мы просто используем ключевое слово go, а затем пишем функцию, которую хотим выполнить. Если нужно выполнить немного кода, мы можем воспользоваться анонимной функцией. Хотя стоит отметить, что анонимные функции используются не только вместе с горутинами.

go func() {
  fmt.Println("обработка")
}()

Горутины просты в создании и не несут много накладных расходов. Несколько горутин будут работать по похожему на потоки операционной системы принципу. Это часто называют M:N поточной моделью, потому, что мы имеем M потоков приложения (горутин) запущенных в N потоков операционной системы. В результате этого горутины дают определённую долю оверхеда (несколько килобайт) по сравнению с потоками ОС. Современное железо позволяет запустить порядка миллиона горутин.

Кроме того, вся внутренняя сложность скрыта. Мы просто говорим, что этот код должен выполняться параллельно и позволяем Go позаботиться обо всём остальном.

Если мы вернёмся к примеру, то увидим, что выполняется Sleep на несколько миллисекунд. Это необходимо потому, что основной процесс завершается быстрее, чем успеет выполниться горутина (процесс не ждёт, пока все горутины выполнят свою работу перед выходом). Для того, чтобы решить эту проблему, нам нужно скоординировать наш код.

Синхронизация

Создание горутин тривиально и они дёшево стоят, поэтому мы можем запустить их много. Однако конкурентный код требует наличия согласованности. Для решения этой проблемы Go предоставляет каналы. Перед тем, как их рассмотреть, я думаю важным будет немного объяснить основы конкурентного программирования.

Написание параллельного кода требует особого внимания к тому, как и где вы будете читать и записывать значения. В некотором роде, это как программирование без сборщика мусора – оно требует, чтобы вы взглянули на данные под другим углом и всегда были бдительны перед возможной опасностью. Рассмотрим:

package main

import (
  "fmt"
  "time"
)

var counter = 0

func main() {
  for i := 0; i < 2; i++ {
    go incr()
  }
  time.Sleep(time.Millisecond * 10)
}

func incr() {
  counter++
  fmt.Println(counter)
}

Как вы считаете, что выведет этот код?

Если вы думаете, что результат будет 1, 2, вы одновременно правы и нет. Это правда, что, если вы запустите этот код, вы скорее всего получите такой результат. Однако реальность такова, что поведение такого кода неопределённо. Почему? Потому, что мы потенциально имеем несколько (в данном случае две) горутины, пишущие одну и ту же переменную counter в одно и то же время. Или, что хуже, одна горутина может читать counter в то же время, когда другая записывает в него значение.

Это действительно опасно? Определённо да. counter++ может выглядеть, как простая строка кода, но в действительности она преобразуется в множество инструкций ассемблера, конкретная реализация которого зависит от платформы, на которой запущен код. Это правда, что в этом примере скорее всего всё будет работать хорошо. Тем не менее, возможен случай, когда обе горутины обратятся к значению counter, когда оно будет равно 0 и вы получите 1, 1 на выходе. В более худшем варианте это приведет к системному сбою или доступу к произвольным данным и инкрементированию их!

Единственная параллельная вещь, которую вы можете делать безопасно с переменной – это читать её значение. Вы можете иметь столько читателей, сколько вам угодно, но запись необходимо синхронизировать. Есть несколько разных способов сделать это, в том числе некоторые действительно атомарные операции, которые опираются на специальные инструкции процессора. Тем не менее, наиболее распространённым подходом является использование мьютекса:

package main

import (
  "fmt"
  "time"
  "sync"
)

var (
  counter = 0
  lock sync.Mutex
)

func main() {
  for i := 0; i < 2; i++ {
    go incr()
  }
  time.Sleep(time.Millisecond * 10)
}

func incr() {
  lock.Lock()
  defer lock.Unlock()
  counter++
  fmt.Println(counter)
}

Мьютекс обеспечивает последовательный доступ с заблокированному коду. Причина по которой мы определяем нашу блокировку как lock sync.Mutex в том, что по умолчанию значение sync.Mutex разблокировано.

Выглядит достаточно просто? Но простота в этом примере обманчива. В нем есть целый класс ошибок, которые могут возникать во время параллельного программирования. Прежде всего, не всегда очевидно то, что код должен быть защищен. Хотя может быть и заманчиво использование грубых блокировок (которые охватывают почти весь код), это в первую очередь подрывает саму суть параллельного программирования. Как правило нужны небольшие блокировки, иначе мы превратим десятиполосное шоссе в однополосную дорогу.

Другая проблема в том, что делать со взаимными блокировками. С одной блокировкой проблем нет, но когда вы используете две или больше в одном коде, возникает опасная ситуация, когда горутина А имеет блокировку А и ей необходим доступ к блокировке Б, которую держит горутина Б потому, что ей нужен доступ к блокировке А.

В действительности взаимная блокировка может произойти и с одной, если вы забыли освободить её. Это не так опасно, как множественные блокировки (потому, что их действительно сложно определить), но вы можете увидеть как это происходит. Попробуйте выполнить:

package main

import (
  "time"
  "sync"
)

var (
  lock sync.Mutex
)

func main() {
  go func() { lock.Lock() }()
  time.Sleep(time.Millisecond * 10)
  lock.Lock()
}

При параллельном программировании происходит больше вещей, чем мы здесь видим. Поскольку мы имеем множество единовременных читателей, существует один общий мьютекс чтения-записи. Он запускает две блокирующие функции: одна для блокировки чтения и другая для блокировки записи. В Go, sync.RWMutex – это просто замок. В дополнении к методам Lock и Unlock обычного sync.Mutex, он предоставляет еще и методы RLock и RUnlock, где R означает Read (чтение). В то время, как мьютексы чтения-записи широко используются, они добавляют забот разработчикам. Мы должны обращать внимание не только на то, когда обращаться к данным, но и на то как.

Кроме того, часть параллельного программирования связана не только с обеспечением сериализации доступа между различными частями кода, но и с координацией работы множества горутин. Например, засыпание на 10 миллисекунд не является элегантным решением. Что будет, если выполнение горутины займёт больше, чем 10 миллисекунд? А что, если меньше, и мы просто зря потратим ресурсы в ожидании? Что если бы вместо простого ожидания горутины мы могли бы сказать: эй, у меня есть новые данные для твоей обработки?

Все эти вещи выполнимы и без каналов. Конечно для простых случаев я считаю, что вы должны использовать такие примитивы как sync.Mutex и sync.RWMutex, но как мы увидим в следующем разделе, каналы помогают сделать код параллельного программирования более чистым и защищенным от ошибок.

Каналы

Настоящим испытанием в конкурентном программировании является совместное использование данных. Если ваша горутина не предоставляет никаких данных, вам не нужно заботиться о её синхронизации. Хотя это и подходит не для всех систем. По факту, многие системы создаются с целью разделения данных между несколькими запросами. Кэширование в памяти или база данных будут хорошими примерами таких систем.

Каналы помогают сделать конкурентное программирование более разумным, выделяя совместные данные из общей картины. Канал – это труба для взаимодействия между горутинами, которая используется для передачи данных. Другими словами, горутина, которая имеет данные, может передать их в другую горутину с помощью канала. В результате, в любой момент времени только одна горутина имеет доступ к данным.

Канал, как и всё остальное, имеет тип. Это тип данных, передаваемых через канал. Например, чтобы создать канал для передачи целых чисел, мы делаем:

c := make(chan int)

Типом этого канала является chan int. Для передачи этого канала в функцию используем такое определение:

func worker(c chan int) { ... }

Каналы поддерживают две операции: приём и отправка. Мы отправляем в канал выполняя:

CHANNEL <- DATA

и получаем из него

VAR := <-CHANNEL

Стрелка указывает направление потока данных. Когда происходит отправка, данные передаются в канал. При получении данные извлекаются из канала.

Последняя вещь, которую необходимо знать перед тем, как посмотреть на первый пример, это то, что прием и отправка данных в и из канала являются блокирующими операциями. Это значит, что во время получения данных из канала выполнение горутины останавливается пока данные не доступны. Аналогично когда мы отправляем в канал, выполнение не продолжается пока данные не получены.

Рассмотрим систему, в которой входящие данные нам необходимо обработать в отдельных горутинах. Это обычное требование. Если мы делаем тяжелую обработку в горутине, которая принимает входящие данные, мы рискуем тем, что клиент может отключиться по тайм-ауту. Сначала мы напишем обработчик. Он может быть простой функцией, но я сделаю его частью структуры, так как до этого мы еще не видели горутин используемых таким образом:

type Worker struct {
  id int
}

func (w Worker) process(c chan int) {
  for {
    data := <-c
    fmt.Printf("обработчик %d получил %d\n", w.id, data)
  }
}

Наш обработчик прост. Он ждет пока данные не станут доступны, затем “обрабатывает” их. Он послушно делает это в бесконечном цикле ожидая данных для обработки.

Для начала, запустим несколько обработчиков:

c := make(chan int)
for i := 0; i < 4; i++ {
  worker := Worker{id: i}
  go worker.process(c)
}

А затем дадим им немного работы:

for {
  c <- rand.Int()
  time.Sleep(time.Millisecond * 50)
}

Полный код для запуска:

package main

import (
  "fmt"
  "time"
  "math/rand"
)

func main() {
  c := make(chan int)
  for i := 0; i < 5; i++ {
    worker := &Worker{id: i}
    go worker.process(c)
  }

  for {
    c <- rand.Int()
    time.Sleep(time.Millisecond * 50)
  }
}

type Worker struct {
  id int
}

func (w *Worker) process(c chan int) {
  for {
    data := <-c
    fmt.Printf("обработчик %d получил %d\n", w.id, data)
  }
}

Мы не знаем какой именно обработчик какие данные получает. Но мы знаем то, что Go гарантирует, что данные, отправляемые в канал, будут приняты только одним получателем.

Обратите внимание на то, что единственное общее состояние, которое мы можем безопасно получать и отправлять одновременно – это канал. Каналы предоставляют весь код, необходимый для синхронизации, и позволяют убедиться в том, что только одна горутина имеет доступ к определенному участку данных.

Буферизированные каналы

А что произойдёт с кодом выше, если он получит больше данных, чем сможет обработать? Вы можете симулировать такую ситуацию изменив обработчик, добавив задержку после получения данных:

for {
  data := <-c
  fmt.Printf("обработчик %d получил %d\n", w.id, data)
  time.Sleep(time.Millisecond * 500)
}

Произойдёт то, что наш код, принимающий пользовательские данные (которые мы симулировали с помощью генератора случайных чисел) перестанет их отправлять в каналы потому, что не будет доступных получателей.

В тех случаях, когда вам необходимо гарантировать обработку данных, вы, возможно, захотите заблокировать их получение на некоторое время. В других случаях, вы можете пойти на уменьшение гарантий обработки. Существуют несколько популярных методов это сделать. Первый – это буфер данных. Если нет доступных обработчиков, нам нужно временно сохранить данные в какого-либо рода очередь. Каналы имеют встроенную поддержку буферизации. Когда мы создаём канал с помощью make, мы можем передать длину:

c := make(chan int, 100)

Вы можете сделать это изменение, но вы увидите, что обработка всё ещё прерывается. Буферизация каналов не увеличивает их ёмкость, она просто обеспечивает очередь для ожидания обработки и хороший способ справляться с резкими скачками. В нашем примере мы постоянно отправляем больше данных, чем обработчики могут принять.

Тем не менее, мы можем посмотреть на то, что представляет собой буферизированный канал и, фактически, буферизация, посмотрев на длину канала len:

for {
  c <- rand.Int()
  fmt.Println(len(c))
  time.Sleep(time.Millisecond * 50)
}

Вы можете увидеть, как она растёт и растет, пока не заполнится, и с этого момента отправка в канал снова будет заблокирована.

Select

Даже при использовании буферизации со временем наступает момент, когда нам нужно начинать отбрасывать входящие сообщения. Мы не можем бесконечно занимать память, в надежде, что обработчик освободится. Для таких целей в Go используется select.

Синтаксически select похож на switch. С его помощью, мы можем описать действия, выполняемые при недоступности канала для отправки. Сначала, давайте уберём буферизацию канала, чтобы увидеть работу select более наглядно:

c := make(chan int)

Затем, мы изменим наш цикл for:

for {
  select {
  case c <- rand.Int():
    //опциональный код здесь
  default:
    //тут можно ничего не писать, чтобы данные молча отбрасывались
    fmt.Println("выброшено")
  }
  time.Sleep(time.Millisecond * 50)
}

Мы посылаем 20 сообщений в секунду, в то время как обработчики могут принять только 10, поэтому половина будет отброшена.

Это первое из того, что можно сделать используя select. Главным его назначением является управление множеством каналов. Получая несколько каналов, select блокируется до тех пор, пока один из них не освободится. Если доступных каналов нет, будет выполнен необязательный блок default. Если доступно несколько каналов, канал будет выбран случайным образом.

Трудно придумать простой пример для демонстрации такого поведения, поскольку это довольно продвинутая особенность. Возможно следующая секция сможет помочь показать это.

Тайм-Аут

Мы рассмотрели буферные сообщения и простое отбрасывание их. Другой популярный метод – это тайм-аут. Мы будем блокировать выполнение на какое-то время, но не навсегда. Это очень легко реализовать в Go. Правда синтаксис может оттолкнуть, но это такой аккуратный и полезный способ, что я не мог его опустить.

Для блокировки на максимально возможное время мы можем использовать функцию time.After. Давайте посмотрим на пример и попробуем разобраться в этой магии. Для этого изменим отправку данных так:

for {
  select {
  case c <- rand.Int():
  case <-time.After(time.Millisecond * 100):
    fmt.Println("тайм-аут", )
  }
  time.Sleep(time.Millisecond * 50)
}

time.After возвращает канал, так что мы можем использовать select для его выбора. Канал будет завершен после истечения указанного времени. Вот и всё. Больше никакой магии нет. Если вам интересно, реализация after может выглядеть как-то так:

func after(d time.Duration) chan bool {
  c := make(chan bool)
  go func() {
    time.Sleep(d)
    c <- true
  }()
  return c
}

Возвращаясь к нашему select, есть пара вещей, с которыми можно поиграть. Первая, что если мы вернём обратно блок default? Можете угадать? Попробуйте. Если вы не уверены, что произойдёт, вспомните, что default выполняется немедленно, если ни один из каналов не доступен.

Также time.After это канал типа chan time.Time. В приведённом выше примере мы просто не использовали отправленное в канал значение. Если хотите, вы можете получить его:

case t := <-time.After(time.Millisecond * 100):
  fmt.Println("тайм-аут после ", t)

Обратите особое внимание на наш select. Заметьте, что мы отправляем в c, но получаем из time.After. select работает независимо от того, откуда мы получаем и что мы отправляем, в любой комбинации каналов:

Наконец, часто можно увидеть select внутри for:

for {
  select {
  case data := <-c:
    fmt.Printf("обработчик %d получил %d\n", w.id, data)
  case <-time.After(time.Millisecond * 10):
    fmt.Println("Перерыв")
    time.Sleep(time.Second)
  }
}

Перед тем как продолжить

Если вы новичок в мире конкурентного программирования, оно может показаться вам довольно тягостным. Оно принципиально требует значительно больше внимания и забот. Двигайтесь дальше и будет легче.

Горутины являются эффективной абстракцией всего того, что необходимо для запуска конкурентного кода. Каналы помогают устранить различные серьезные ошибки, возникающие во время обмена данными или удаления общих данных. Они не просто избавляют от ошибок, но они изменяют то, как вы обращаетесь с конкурентным программированием. Вы начинаете думать о согласованности по отношению к передаче сообщений больше, чем об опасных участках кода.

Я уже говорил, что до сих пор использую различные примитивы для синхронизации из пакетов sync и sync/atomic. Я призываю вас сначала сделать упор на каналы, но когда вы встречаете простой случай, в котором необходима кратковременная блокировка, подумайте об использовании мьютекса или мьютекса чтения-записи.

Далее: Заключение

Comments powered by Disqus