07 сентября 2021

🏃 Пишем мессенджер на Go за час: 7 простых шагов от эхо-сервера к асинхронному обмену сообщениями

backend разработчик с 15 летним стажем специализируюсь на разработке архитектуры проектов под высокую нагрузку
Авторы большинства статей по сокетным соединениям в примерах ограничиваются реализацией эхо-сервера. Давайте разовьем эту тему и за 7 простых шагов сделаем вместе консольный мессенджер сообщений.
🏃 Пишем мессенджер на Go за час: 7 простых шагов от эхо-сервера к асинхронному обмену сообщениями

Шаг 1. Ищем код эхо-сервера

Типовой эхо-сервер, на который дал мне первую ссылку Yandex, описан в статье «Golang: простой сервер TCP и TCP-клиент». Если вы не имеете представления, как работают сокеты и соединения, стоит её почитать. Исходный код из этой статьи можно скачать тут и желательно сразу же его запустить.

Если скомпилировать исходники и запустить в отдельных консолях клиент и сервер, то можно обмениваться сообщениями. У этого сервера есть один значительный недостаток: он работает только с одним клиентом. Попробуем запустить в еще одной консоли новый клиент и увидим, как она зависнет. То же самое будет и с четвертым, и с пятым клиентом.

Шаг 2. Реализуем прием нескольких соединений

Чтобы принять несколько одновременных соединений, необходимо:

  • функцию приема соединения conn.Accept() заключить еще в один цикл for.
  • весь код, который был в цикле, вынести в отдельную функцию process().
  • запустить функцию process() как отдельную горутину в цикле for сразу после приема соединения conn.Accept()
Подробнее о горутинах и каналах рассказывается в статье «Параллельное программирование в Go». Стоит с ней ознакомиться, поскольку на этих механизмах основывается наш будущий проект.

В результате небольших изменений наш код примет следующий вид:

           // функция process запускается как горутина
    func process(conn net.Conn){
      // определим, что перед выходом из функции, мы закроем соединение
      defer conn.Close
      for {
         // Будем прослушивать все сообщения разделенные \n
         message, _ := bufio.NewReader(conn).ReadString('\n')
         // Распечатываем полученое сообщение
         fmt.Print("Message Received:", string(message))
         // Отправить новую строку обратно клиенту
         conn.Write([]byte(message + "\n")){
        }
    } 
    

Код в main:

             // Устанавливаем прослушивание порта
    ln, _ := net.Listen("tcp", ":8081")
    // выполнение цикла обработки соединений
    for {    
      // Принимаем входящее соединение
      conn, _ := ln.Accept()
      // запускаем функцию process(conn)   как горутину
      go process(conn)
    }
    
Внешний цикл (строки 31-36, код доступен в репозитории) будет принимать входящее соединение, а внутренний цикл (строки 15-19) будет обрабатывать входные данные. Исходный код клиента мы не меняем.

Если мы запустим в одной консоли сервер, а в нескольких других консолях программу клиента, то можно видеть, что сервер может обрабатывать уже несколько соединений с клиентами параллельно. На самом деле он их обрабатывает асинхронно: сперва один запрос, потом другой, осуществляя переключения между горутинами.

Шаг 3. Обрабатываем ошибки соединений

Давайте попробуем отсоединить один из клиентов, убив его процесс: наш сервер зациклится. Если что-нибудь набрать в клиенте, то данные куда-то уходят, и клиент ни о чём не подозревает.

Мы забыли обработать ошибки ввода-вывода. Функция вывода в сокет Write имеет два выходных параметра: кол-во считанных байт и ошибку:

        data_len, err := conn.Write(b []byte)
    

Если ошибка не пустая (т.е. не равна nil ), значит мы не смогли принять данные. Какая ошибка произошла, можно узнать с помощью функции err.Error()

Заменим conn.Write(b []byte) на следующий код:

        _ , err := conn.Write(b []byte)
if err != nil {
          fmt.Println(err.Error)
          break  // выходим из цикла и функции
} 
    

Аналогичный код пропишем в клиенте. Еще в клиенте отсутствует отложенное закрытие соединения, которое срабатывает при выходе из функции defer conn.Close().

Если вы не смогли внести изменения самостоятельно, готовый код можно подглядеть на GitHub.

Теперь при закрытие клиента или сервера, у нас будет выдаваться сообщение:

        write tcp 127.0.0.1:8081->127.0.0.1:40296: write: broken pipe
    

Шаг 4. Простой прототип мессенджера

К этому моменту мы допилили эхо-сервер, а теперь осталось сделать простой из него мессенджер. Пусть логика мессенджера будет следующей:

  • Клиент коннектится к серверу и получает номер сообщения. Каждай клиент имеет свой уникальный номер.
  • Клиент отправляет сообщение серверу с указанием номера клиента, кому адресовано это сообщение.
  • Сервер принимает сообщение от клиента, декодирует его и отправляет тому клиенту, которому адресовано это сообщение.
Нам осталось усовершенствовать сервер так, чтобы определить, с какого клиента было отправлено сообщение. Для простоты пусть каждый клиент будет иметь номер, соответствующий порядковому номеру соединения начиная с нуля (индексы массивов в Go начинаются с нуля).

Введем счетчик входящих соединений, а каждое новое соединение сохраним в xeштаблице, организовав таким образом пул:

        conns := make( map[int] net.Conn, 1024)
    

Каждое соединение после conn.Accept() мы сохраним в conns, а в функцию process() будем передавать весь пул (хештаблицу) и номер текущего соединения. В функции обработки соединения process() мы можем иметь доступ ко всем активным соединениям. Не забываем увеличивать на единицу счетчик текущих соединений.

В функции process() мы принимаем не текущее соединение, а пул и номер текущего соединения. Следовательно, чтоб получить доступ к текущему соединению, мы можем его взять из пула:

        conn := conn[n]
    

Новый код сервера:

        func process(conns map[int]net.Conn, n int) {
  // получаем доступ к текущему соединению
  conn := conns[n]
  // определим, что перед выходом из функции, мы закроем соединение
  defer conn.Close()
  for {
    // Будем прослушивать все сообщения разделенные \n
    message, _ := bufio.NewReader(conn).ReadString('\n')
    // Распечатываем полученое сообщение
    fmt.Print("Message Received:", string(message))
    // Отправить новую строку обратно клиенту
    _, err := conn.Write([]byte(strconv.Itoa(n) + "->> " + message + "\n"))
    // анализируем на ошибку
    if err != nil {
      fmt.Print(err.Error())
      break // выходим из цикла
    }
  }
}
func main() {
  fmt.Println("Start server...")
  // создаем пул соединений
  conns := make(map[int]net.Conn, 1024)
  i := 0
  // Устанавливаем прослушивание порта
  ln, _ := net.Listen("tcp", ":8081")
  // объвляем пул соединений на 1024 соединения
  conns := make(map[int]net.Conn, 1024)
  // Запускаем цикл обработки соединений
  for {
    // Принимаем входящее соединение
    conn, _ := ln.Accept()
    // сохраняем соединение в пул
    conns[i] = conn
    // запускаем функцию process(conn)   как горутину
    go process(conns, i)
    i++
  }
}
    
Полный код примера можно найти в этом репозитории. Попробуем запустить сервер и отправить сообщение.

При тестировании мы видим, что в каждом ответном сообщении сервер возвращает клиенту номер текущего соединения:

        ./client
Text to send: msg from 1
Message from server: 1->> msg from 1
    

Шаг 5. Реализация протокола обмена

Остается малость: определить как, кто и кому будет отправлять сообщения. Создадим простейший протокол обмена, в котором первые байты сообщения будут содержать номер клиента и текст через пробел.

Пример:

        2 Оправляем сообщение второму клиенту
    

Для реализации этого протокола, необходимо сделать парсинг сообщения. Номер клиента, мы вытащим, используя fmt.Scanf(), а само сообщение с использованием слайса:

        // парсинг полученного сообщения
fmt.Sscanf(message, "%d", &clientNo) // определи номер клиента
pos := strings.Index(message, " ") // нашли позицию разделителя
out_message := message[pos:] // взяли хвост сообщение после пробела

    

Дальше все очень просто: зная номер соединения (clientNo) клиента, мы будем отправлять ответ в нужное соединение. Сообщение было немного изменено, и теперь мы выводим, от какого клиента оно исходит:

        conns[clientNo].Write([]byte(strconv.Itoa(clientNo) + "->> " + out_message + "\n"))
    
Запускаем, проверяем и видим некоторые баги. Если мы направляем сообщение самому себе, то вроде бы все работает. Если отправить сообщение другому клиенту, оно где-то теряется, а если этим клиентом отправить сообщение кому-то еще, оно появляется из ниоткуда. Что же пошло не так?

Шаг 6. Распараллеливание кода

Наш клиент работает синхронно: после отправления сообщения он переходит в режим ожидания чтения сокета. Все работает согласно алгоритму: записал и жди ответа. Если мы отправим сообщение самому себе, то примем его без проблем. А если отправим другому клиенту, то после этого встаем в режим ожиданий. Второй клиент тоже стоит в режиме ожидания и примет наше сообщение, только после того, как сам что-нибудь отправит. Как же выйти из этой ситуации?

Выход довольно простой: нужно чтение из сокета и чтение с консоли сделать асинхронными, т.е. сделать так, чтобы ввод из сокета и ввод из консоли не блокировали друг друга. Как говорилось выше, горутины позволяют коду выполняться асинхронно. Следовательно, должны быть запущены две разные горутины: одна должна осуществлять чтение с консоли, а вторая – чтение из сокета.

Первая горутина читает данные из сокета, и если в сокете есть данные, выводит их на печать. Чтение и вывод должны быть заключены в цикл:

        // прием данных из сокета и вывод на печать
func readSock(conn net.Conn ) {
    buf := make([]byte,256)
    for {
        readed_len, _ := conn.Read(buf)
        if readed_len > 0 {            
            fmt.Print(string(buf))
        }
    }
}

    

Со второй горутиной все немного сложнее, поскольку она должна передать данные в основную программу. Почему нельзя сразу писать их в сокет, как это делает первая горутина? Увы, операция conn.Write() – блокирующая, и если мы так сделаем, то можем заблокировать другие операции ввода-вывода. Все блокирующие операции нужно разнести по разным асинхронным частям программы.

        // ввод данных с консоли и вывод их в канал
func readConsole(ch chan string) {
    for {
        fmt.Print(">")
        line, _ := bufio.NewReader(os.Stdin).ReadString('\n')
        out :=line[:len(line)-1] // убираем символ возврата каретки
        ch <- out // отправляем данные в канал
    }
}

    

Основная программа должна запустить две асинхронных горутины: чтение с консоли и из сокета (в цикле читать канал и если в нем есть данные, то записать их в сокет). Чтобы наше консольное приложение не «съело» все ресурсы CPU, необходимо ввести некоторую задержку: time.Sleep(time.Seconds * 2)

Должно получиться примерно следующее:

        func main(){
    ch := make(chan string)

    defer close(ch) // закрываем канал при выходе из программы

    conn, _ := net.Dial("tcp", "127.0.0.1:8080")

    go readConsole(ch)
    go readSock(conn)

    for {
        val, ok := <- ch
        if ok { // если есть данные, то их пишем в сокет

            _, err := conn .Write([]bytes(val))
            if err != nil {
                fmt.Println("Write:", err.Error())
                break
            }
        } else {
            // данных в канале нет, задержка на 2 секунды
            time.Sleep(time.Second * 2)
        }

    }
    fmt.Println("Finished...")
    conn.Close()
}

    

Шаг 7. Повышаем надежность выполнения кода

Наше приложение должно работать при любых входных данных, даже если они некорректные. Есть несколько простых правил, которые придется соблюдать при построении любых приложений:

  • необходимо проверять все входящие данные и если их длинна больше принимающего буфера, то либо обрезать их, либо генерировать ошибку;
  • необходимо обрабатывать все функции ввода-вывода на возможность возникновения ошибки.

Например, в коде было много сокращений и специально была опущена обработка функций conn.Accept() и net.Dial():

            ls,err := conn.Accept()
    if err != nil {
        fmt.Println(err)
        panic("accept")
    }

    

Также был опущен код обработки объема данных с консоли:

            // ввод данных с консоли
    for {
        fmt.Print(">")
        line, _ := bufio.NewReader(os.Stdin).ReadString('\n')
        if len(line) > 250 {
            fmt.Println("Error: message is very lagre")
            continue
        }
        ch <- b
    }    
    

Почти готовое и работоспособное решение можно найти в репозитории: вам остается самостоятельно дописать обработку всех ошибок ввода-вывода. Если возникнет желание сделать pull request в репозиторий, то я смогу указать в комментариях на ошибки или просто похвалить. Сделайте свой код достоянием общественности. Удачи!

МЕРОПРИЯТИЯ

Комментарии

ЛУЧШИЕ СТАТЬИ ПО ТЕМЕ