Создание микросервисного приложения на Go по принципу CQRS
Перевод статьи Building a Microservices Application in Go Following the CQRS Pattern от Tin Rabzelj.
Эта статья является пошаговым руководством по разработке упрощенного приложения социальной сети, в которой любой может публиковать анонимные сообщения.
Исходный код доступен на GitHub
Архитектура
Приложение построено с использованием шаблона Command Query Responsibility Segregation (CQRS). Цель состоит в том, чтобы разделить команды и запросы на отдельные сервисы. Команды должны выполнять запись в базу данных, а запросы считывать конечные данные. Такое разделение позволяет независимо масштабировать обе стороны, что дает преимущество, так как операций чтения обычно больше, чем записи. Также это значит, что мы можем иметь разные модели данных для каждого сервиса. Сторона запросов может возвращать данные в материализованых представлениях, которые создаются независимо и асинхронно от командной стороны.
Приложение, описанное в этой статье, называется Meower – социальная сеть для кошек.
Здесь есть три сервиса: Pusher
, Meow
и Query
. Сервис Meow
обрабатывает командную часть, предоставляя конечную точку в виде HTTP POST для создания “Мяу”-сообщений. Сервис Query
слушает события и вставляет сообщения в базу данных Elasticsearch. Он предоставляет конечные точки для чтения сообщений упорядоченных по времени и выполнения полнотекстового поиска. Сервис Pusher
отправляет только что созданные сообщения клиентам по протоколу WebSocket.
Заметьте, что сервисы Meow
и Query
не сильно ограничены, так как используют одну и ту же базу данных. Хоть это и противоречит всей идее, но сделает многие вещи немного проще. Эта структура только для разработки (здесь нет SSL, нет репликаций, все хранилища эфемерны и т. д.).
Предварительные требования
Если вы еще этого не сделали, установите Docker, Go и менеджер зависимостей golang/dep.
Создайте директорию для проекта внутри $GOPATH
.
На протяжении этой статьи предполагается, что директорией проекта является
github.com/tinrab/meower
. Каждый раз, когда вы добавляете пакет в файл с кодом Go, убедитесь, что все обновлено с помощью командыdep ensure
.
Утилиты
Для начала, сделаем несколько утилит и фасад для работы со сторонними сервисами.
Создадим папку util
и файл util/util.go
в ней.
package util
import (
"encoding/json"
"net/http"
)
func ResponseOk(w http.ResponseWriter, body interface{}) {
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(body)
}
func ResponseError(w http.ResponseWriter, code int, message string) {
w.WriteHeader(code)
w.Header().Set("Content-Type", "application/json")
body := map[string]string{
"error": message,
}
json.NewEncoder(w).Encode(body)
}
PostgreSQL
Создадим папку schema
и файл schema/model.go
в ней.
package schema
import (
"time"
)
type Meow struct {
ID string `json:"id"`
Body string `json:"body"`
CreatedAt time.Time `json:"created_at"`
}
Создадим папку db
и файл db/repository.go
в ней.
package db
import (
"context"
"github.com/tinrab/meower/schema"
)
type Repository interface {
Close()
InsertMeow(ctx context.Context, meow schema.Meow) error
ListMeows(ctx context.Context, skip uint64, take uint64) ([]schema.Meow, error)
}
var impl Repository
func SetRepository(repository Repository) {
impl = repository
}
func Close() {
impl.Close()
}
func InsertMeow(ctx context.Context, meow schema.Meow) error {
return impl.InsertMeow(ctx, meow)
}
func ListMeows(ctx context.Context, skip uint64, take uint64) ([]schema.Meow, error) {
return impl.ListMeows(ctx, skip, take)
}
Это простой способ для достижения инверсии управления. Используя интерфейс Repository
вы получаете возможность внедрять конкретную реализацию во время выполнения, и все вызовы функций будут делегированы объекту impl
.
Вы можете реализовать базу данных в памяти удовлетворяющую интерфейсу Repository
и использовать её во время разработки и тестирования.
Создадим файл docker-compose.yaml
в корневой директории проекта и определим в нем сервис postgres
.
version: "3.6"
services:
postgres:
build: "./postgres"
restart: "always"
environment:
POSTGRES_DB: "meower"
POSTGRES_USER: "meower"
POSTGRES_PASSWORD: "123456"
Создадим папку postgres
и файл postgres/up.sql
, который будет содержать определения таблиц.
DROP TABLE IF EXISTS meows;
CREATE TABLE meows (
id VARCHAR(32) PRIMARY KEY,
body TEXT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL
);
Создадим файл postgres/Dockerfile
и скопируем файл postgres/up.sql
в контейнер как 1.sql
.
FROM postgres:10.3
COPY up.sql /docker-entrypoint-initdb.d/1.sql
CMD ["postgres"]
SQL-файлы внутри папки /docker-entrypoint-initdb.d
будут выполнены в алфавитном порядке.
Реализуем интерфейс Repository
для базы данных PostgreSQL в файле db/postgres.go
используя пакет lib/pq.
package db
import (
"context"
"database/sql"
_ "github.com/lib/pq"
"github.com/tinrab/meower/schema"
)
type PostgresRepository struct {
db *sql.DB
}
func NewPostgres(url string) (*PostgresRepository, error) {
db, err := sql.Open("postgres", url)
if err != nil {
return nil, err
}
return &PostgresRepository{
db,
}, nil
}
func (r *PostgresRepository) Close() {
r.db.Close()
}
func (r *PostgresRepository) InsertMeow(ctx context.Context, meow schema.Meow) error {
_, err := r.db.Exec("INSERT INTO meows(id, body, created_at) VALUES($1, $2, $3)", meow.ID, meow.Body, meow.CreatedAt)
return err
}
func (r *PostgresRepository) ListMeows(ctx context.Context, skip uint64, take uint64) ([]schema.Meow, error) {
rows, err := r.db.Query("SELECT * FROM meows ORDER BY id DESC OFFSET $1 LIMIT $2", skip, take)
if err != nil {
return nil, err
}
defer rows.Close()
// Parse all rows into an array of Meows
meows := []schema.Meow{}
for rows.Next() {
meow := schema.Meow{}
if err = rows.Scan(&meow.ID, &meow.Body, &meow.CreatedAt); err == nil {
meows = append(meows, meow)
}
}
if err = rows.Err(); err != nil {
return nil, err
}
return meows, nil
}
Сообщения Meow
упорядочены по первичному ключу, так как ключи упорядочены по времени. Это избавляет нас от добавления дополнительного индекса.
NATS
Добавим сервис nats
в docker-compose.yaml
.
services:
nats:
image: "nats-streaming:0.9.2"
restart: "always"
Создадим папку event
и файл event/messages.go
, который содержит типы сообщений.
package event
import (
"time"
)
type Message interface {
Key() string
}
type MeowCreatedMessage struct {
ID string
Body string
CreatedAt time.Time
}
func (m *MeowCreatedMessage) Key() string {
return "meow.created"
}
Похожим способом, как мы делали с доступом к базе данных, определим функционал для работы с хранилищем сообщений в файле event/event.go
.
package event
import "github.com/tinrab/meower/schema"
type EventStore interface {
Close()
PublishMeowCreated(meow schema.Meow) error
SubscribeMeowCreated() (<-chan MeowCreatedMessage, error)
OnMeowCreated(f func(MeowCreatedMessage)) error
}
var impl EventStore
func SetEventStore(es EventStore) {
impl = es
}
func Close() {
impl.Close()
}
func PublishMeowCreated(meow schema.Meow) error {
return impl.PublishMeowCreated(meow)
}
func SubscribeMeowCreated() (<-chan MeowCreatedMessage, error) {
return impl.SubscribeMeowCreated()
}
func OnMeowCreated(f func(MeowCreatedMessage)) error {
return impl.OnMeowCreated(f)
}
Теперь реализуем интерфейс EventStore
для NATS в файле event/nats.go
.
package event
import (
"bytes"
"encoding/gob"
"github.com/nats-io/go-nats"
"github.com/tinrab/meower/schema"
)
type NatsEventStore struct {
nc *nats.Conn
meowCreatedSubscription *nats.Subscription
meowCreatedChan chan MeowCreatedMessage
}
func NewNats(url string) (*NatsEventStore, error) {
nc, err := nats.Connect(url)
if err != nil {
return nil, err
}
return &NatsEventStore{nc: nc}, nil
}
func (e *NatsEventStore) Close() {
if e.nc != nil {
e.nc.Close()
}
if e.meowCreatedSubscription != nil {
e.meowCreatedSubscription.Unsubscribe()
}
close(e.meowCreatedChan)
}
func (e *NatsEventStore) PublishMeowCreated(meow schema.Meow) error {
m := MeowCreatedMessage{meow.ID, meow.Body, meow.CreatedAt}
data, err := e.writeMessage(&m)
if err != nil {
return err
}
return e.nc.Publish(m.Key(), data)
}
func (mq *NatsEventStore) writeMessage(m Message) ([]byte, error) {
b := bytes.Buffer{}
err := gob.NewEncoder(&b).Encode(m)
if err != nil {
return nil, err
}
return b.Bytes(), nil
}
Здесь существует два различных подхода для реализации функций подписок. При использовании этого API, используйте тот, который больше нравится.
Один из способов – это использовать функцию обратного вызова.
func (e *NatsEventStore) OnMeowCreated(f func(MeowCreatedMessage)) (err error) {
m := MeowCreatedMessage{}
e.meowCreatedSubscription, err = e.nc.Subscribe(m.Key(), func(msg *nats.Msg) {
e.readMessage(msg.Data, &m)
f(m)
})
return
}
func (mq *NatsEventStore) readMessage(data []byte, m interface{}) error {
b := bytes.Buffer{}
b.Write(data)
return gob.NewDecoder(&b).Decode(m)
}
Другой способ – это вернуть канал. Здесь создается промежуточный канал для преобразования сообщений в подходящий тип.
func (e *NatsEventStore) SubscribeMeowCreated() (<-chan MeowCreatedMessage, error) {
m := MeowCreatedMessage{}
e.meowCreatedChan = make(chan MeowCreatedMessage, 64)
ch := make(chan *nats.Msg, 64)
var err error
e.meowCreatedSubscription, err = e.nc.ChanSubscribe(m.Key(), ch)
if err != nil {
return nil, err
}
// Decode message
go func() {
for {
select {
case msg := <-ch:
e.readMessage(msg.Data, &m)
e.meowCreatedChan <- m
}
}
}()
return (<-chan MeowCreatedMessage)(e.meowCreatedChan), nil
}
Elasticsearch
Обновим файл docker-compose.yaml
.
services:
elasticsearch:
image: 'docker.elastic.co/elasticsearch/elasticsearch:6.2.3'
Создадим интерфейс хранилища в файле search/repository.go
.
package search
import (
"context"
"github.com/tinrab/meower/schema"
)
type Repository interface {
Close()
InsertMeow(ctx context.Context, meow schema.Meow) error
SearchMeows(ctx context.Context, query string, skip uint64, take uint64) ([]schema.Meow, error)
}
var impl Repository
func SetRepository(repository Repository) {
impl = repository
}
func Close() {
impl.Close()
}
func InsertMeow(ctx context.Context, meow schema.Meow) error {
return impl.InsertMeow(ctx, meow)
}
func SearchMeows(ctx context.Context, query string, skip uint64, take uint64) ([]schema.Meow, error) {
return impl.SearchMeows(ctx, query, skip, take)
}
Реализуем этот интерфейс для Elasticsearch в файле search/elastic.go
используя пакет olivere/elastic.
package search
import (
"context"
"encoding/json"
"log"
"github.com/olivere/elastic"
"github.com/tinrab/meower/schema"
)
type ElasticRepository struct {
client *elastic.Client
}
func NewElastic(url string) (*ElasticRepository, error) {
client, err := elastic.NewClient(
elastic.SetURL(url),
elastic.SetSniff(false),
)
if err != nil {
return nil, err
}
return &ElasticRepository{client}, nil
}
func (r *ElasticRepository) Close() {
}
func (r *ElasticRepository) InsertMeow(ctx context.Context, meow schema.Meow) error {
_, err := r.client.Index().
Index("meows").
Type("meow").
Id(meow.ID).
BodyJson(meow).
Refresh("wait_for").
Do(ctx)
return err
}
func (r *ElasticRepository) SearchMeows(ctx context.Context, query string, skip uint64, take uint64) ([]schema.Meow, error) {
result, err := r.client.Search().
Index("meows").
Query(
elastic.NewMultiMatchQuery(query, "body").
Fuzziness("3").
PrefixLength(1).
CutoffFrequency(0.0001),
).
From(int(skip)).
Size(int(take)).
Do(ctx)
if err != nil {
return nil, err
}
meows := []schema.Meow{}
for _, hit := range result.Hits.Hits {
var meow schema.Meow
if err = json.Unmarshal(*hit.Source, &meow); err != nil {
log.Println(err)
}
meows = append(meows, meow)
}
return meows, nil
}
Сервис Meow
Создадим папку meow-service
и в файле meow-services/main.go
получим значения конфигурации из переменных среды.
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/kelseyhightower/envconfig"
"github.com/tinrab/meower/db"
"github.com/tinrab/meower/event"
"github.com/tinrab/retry"
)
type Config struct {
PostgresDB string `envconfig:"POSTGRES_DB"`
PostgresUser string `envconfig:"POSTGRES_USER"`
PostgresPassword string `envconfig:"POSTGRES_PASSWORD"`
NatsAddress string `envconfig:"NATS_ADDRESS"`
}
func main() {
var cfg Config
err := envconfig.Process("", &cfg)
if err != nil {
log.Fatal(err)
}
// ...
}
Подключимся к PostgreSQL и внедрим репозиторий. Код ниже повторяет соединение каждые 2 секунды с помощью пакета tinrab/retry.
retry.ForeverSleep(2*time.Second, func(attempt int) error {
addr := fmt.Sprintf("postgres://%s:%s@postgres/%s?sslmode=disable", cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDB)
repo, err := db.NewPostgres(addr)
if err != nil {
log.Println(err)
return err
}
db.SetRepository(repo)
return nil
})
defer db.Close()
Подключимся к NATS.
retry.ForeverSleep(2*time.Second, func(_ int) error {
es, err := event.NewNats(fmt.Sprintf("nats://%s", cfg.NatsAddress))
if err != nil {
log.Println(err)
return err
}
event.SetEventStore(es)
return nil
})
defer event.Close()
Наконец, запустим HTTP-сервер.
func newRouter() (router *mux.Router) {
router = mux.NewRouter()
router.HandleFunc("/meows", createMeowHandler).
Methods("POST").
Queries("body", "{body}")
return
}
func main() {
// ...
router := newRouter()
if err := http.ListenAndServe(":8080", router); err != nil {
log.Fatal(err)
}
}
Роутер связывает конечную точку для POST-запросов с обработчиком createMeowHandler
. Объявим его в файле meow-service/handlers.go
.
package main
import (
"html/template"
"log"
"net/http"
"time"
"github.com/segmentio/ksuid"
"github.com/tinrab/meower/db"
"github.com/tinrab/meower/event"
"github.com/tinrab/meower/schema"
"github.com/tinrab/meower/util"
)
func createMeowHandler(w http.ResponseWriter, r *http.Request) {
type response struct {
ID string `json:"id"`
}
ctx := r.Context()
// Read parameters
body := template.HTMLEscapeString(r.FormValue("body"))
if len(body) < 1 || len(body) > 140 {
util.ResponseError(w, http.StatusBadRequest, "Invalid body")
return
}
// Create meow
createdAt := time.Now().UTC()
id, err := ksuid.NewRandomWithTime(createdAt)
if err != nil {
util.ResponseError(w, http.StatusInternalServerError, "Failed to create meow")
return
}
meow := schema.Meow{
ID: id.String(),
Body: body,
CreatedAt: createdAt,
}
if err := db.InsertMeow(ctx, meow); err != nil {
log.Println(err)
util.ResponseError(w, http.StatusInternalServerError, "Failed to create meow")
return
}
// Publish event
if err := event.PublishMeowCreated(meow); err != nil {
log.Println(err)
}
// Return new meow
util.ResponseOk(w, response{ID: meow.ID})
}
Новое сообщение создано, добавлено в базу данных и событие опубликовано.
Сервис Query
Создадим папку query-service
и прочитаем переменные конфигурации в файле query-service/main.go
.
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/gorilla/mux"
"github.com/kelseyhightower/envconfig"
"github.com/tinrab/meower/db"
"github.com/tinrab/meower/event"
"github.com/tinrab/meower/search"
"github.com/tinrab/retry"
)
type Config struct {
PostgresDB string `envconfig:"POSTGRES_DB"`
PostgresUser string `envconfig:"POSTGRES_USER"`
PostgresPassword string `envconfig:"POSTGRES_PASSWORD"`
NatsAddress string `envconfig:"NATS_ADDRESS"`
ElasticsearchAddress string `envconfig:"ELASTICSEARCH_ADDRESS"`
}
func main() {
var cfg Config
err := envconfig.Process("", &cfg)
if err != nil {
log.Fatal(err)
}
// ...
}
Затем подключимся к PostgreSQL, Elasticsearch и NATS.
// Connect to PostgreSQL
retry.ForeverSleep(2*time.Second, func(attempt int) error {
addr := fmt.Sprintf("postgres://%s:%s@postgres/%s?sslmode=disable", cfg.PostgresUser, cfg.PostgresPassword, cfg.PostgresDB)
repo, err := db.NewPostgres(addr)
if err != nil {
log.Println(err)
return err
}
db.SetRepository(repo)
return nil
})
defer db.Close()
// Connect to ElasticSearch
retry.ForeverSleep(2*time.Second, func(_ int) error {
es, err := search.NewElastic(fmt.Sprintf("http://%s", cfg.ElasticsearchAddress))
if err != nil {
log.Println(err)
return err
}
search.SetRepository(es)
return nil
})
defer search.Close()
// Connect to Nats
retry.ForeverSleep(2*time.Second, func(_ int) error {
es, err := event.NewNats(fmt.Sprintf("nats://%s", cfg.NatsAddress))
if err != nil {
log.Println(err)
return err
}
err = es.OnMeowCreated(onMeowCreated)
if err != nil {
log.Println(err)
return err
}
event.SetEventStore(es)
return nil
})
defer event.Close()
Здесь сервис подписывается на событие OnMeowCreated
с помощью функции onMeowCreated
.
Запустим HTTP-сервер.
func newRouter() (router *mux.Router) {
router = mux.NewRouter()
router.HandleFunc("/meows", listMeowsHandler).
Methods("GET")
router.HandleFunc("/search", searchMeowsHandler).
Methods("GET")
return
}
func main() {
// ...
router := newRouter()
if err := http.ListenAndServe(":8080", router); err != nil {
log.Fatal(err)
}
}
Затем в файле query-service/handlers.go
объявим функцию onMeowCreated
для вставки сообщений в Elasticsearch когда получено событие OnMeowCreated
.
package main
import (
"context"
"log"
"net/http"
"strconv"
"github.com/tinrab/meower/db"
"github.com/tinrab/meower/event"
"github.com/tinrab/meower/schema"
"github.com/tinrab/meower/search"
"github.com/tinrab/meower/util"
)
func onMeowCreated(m event.MeowCreatedMessage) {
meow := schema.Meow{
ID: m.ID,
Body: m.Body,
CreatedAt: m.CreatedAt,
}
if err := search.InsertMeow(context.Background(), meow); err != nil {
log.Println(err)
}
}
Напишем функцию-обработчик searchMeowsHandler
, которая выполняет полнотекстовый поиск и возвращает сообщения ограниченные параметрами skip
и take
.
func searchMeowsHandler(w http.ResponseWriter, r *http.Request) {
var err error
ctx := r.Context()
// Read parameters
query := r.FormValue("query")
if len(query) == 0 {
util.ResponseError(w, http.StatusBadRequest, "Missing query parameter")
return
}
skip := uint64(0)
skipStr := r.FormValue("skip")
take := uint64(100)
takeStr := r.FormValue("take")
if len(skipStr) != 0 {
skip, err = strconv.ParseUint(skipStr, 10, 64)
if err != nil {
util.ResponseError(w, http.StatusBadRequest, "Invalid skip parameter")
return
}
}
if len(takeStr) != 0 {
take, err = strconv.ParseUint(takeStr, 10, 64)
if err != nil {
util.ResponseError(w, http.StatusBadRequest, "Invalid take parameter")
return
}
}
// Search meows
meows, err := search.SearchMeows(ctx, query, skip, take)
if err != nil {
log.Println(err)
util.ResponseOk(w, []schema.Meow{})
return
}
util.ResponseOk(w, meows)
}
Напишем обработчик listMeowsHandler
, который будет возвращать все сообщения отсортированные по времени их создания.
func listMeowsHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var err error
// Read parameters
skip := uint64(0)
skipStr := r.FormValue("skip")
take := uint64(100)
takeStr := r.FormValue("take")
if len(skipStr) != 0 {
skip, err = strconv.ParseUint(skipStr, 10, 64)
if err != nil {
util.ResponseError(w, http.StatusBadRequest, "Invalid skip parameter")
return
}
}
if len(takeStr) != 0 {
take, err = strconv.ParseUint(takeStr, 10, 64)
if err != nil {
util.ResponseError(w, http.StatusBadRequest, "Invalid take parameter")
return
}
}
// Fetch meows
meows, err := db.ListMeows(ctx, skip, take)
if err != nil {
log.Println(err)
util.ResponseError(w, http.StatusInternalServerError, "Could not fetch meows")
return
}
util.ResponseOk(w, meows)
}
Сервис Pusher
Создадим папку pusher-service
.
Сообщения
Создадим файл pusher-service/messages.go
и объявим сообщения, которые будут отправлены по протоколу WebSocket.
package main
import (
"time"
)
const (
KindMeowCreated = iota + 1
)
type MeowCreatedMessage struct {
Kind uint32 `json:"kind"`
ID string `json:"id"`
Body string `json:"body"`
CreatedAt time.Time `json:"created_at"`
}
func newMeowCreatedMessage(id string, body string, createdAt time.Time) *MeowCreatedMessage {
return &MeowCreatedMessage{
Kind: KindMeowCreated,
ID: id,
Body: body,
CreatedAt: createdAt,
}
}
Клиент
Создадим файл pusher-service/client.go
и объявим структуру, описывающую подключенный клиент.
package main
import "github.com/gorilla/websocket"
type Client struct {
hub *Hub
id int
socket *websocket.Conn
outbound chan []byte
}
func newClient(hub *Hub, socket *websocket.Conn) *Client {
return &Client{
hub: hub,
socket: socket,
outbound: make(chan []byte),
}
}
func (client *Client) write() {
for {
select {
case data, ok := <-client.outbound:
if !ok {
client.socket.WriteMessage(websocket.CloseMessage, []byte{})
return
}
client.socket.WriteMessage(websocket.TextMessage, data)
}
}
}
func (client Client) close() {
client.socket.Close()
close(client.outbound)
}
Хаб
Создадим файл pusher-service/hub.go
для структуры Hub
, которая будет управлять всеми клиентами.
package main
import (
"encoding/json"
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
type Hub struct {
clients []*Client
nextID int
register chan *Client
unregister chan *Client
mutex *sync.Mutex
}
func newHub() *Hub {
return &Hub{
clients: make([]*Client, 0),
nextID: 0,
register: make(chan *Client),
unregister: make(chan *Client),
mutex: &sync.Mutex{},
}
}
Напишем функцию Run
.
func (hub *Hub) run() {
for {
select {
case client := <-hub.register:
hub.onConnect(client)
case client := <-hub.unregister:
hub.onDisconnect(client)
}
}
}
Напишем функцию для отправки сообщений.
func (hub *Hub) broadcast(message interface{}, ignore *Client) {
data, _ := json.Marshal(message)
for _, c := range hub.clients {
if c != ignore {
c.outbound <- data
}
}
}
func (hub *Hub) send(message interface{}, client *Client) {
data, _ := json.Marshal(message)
client.outbound <- data
}
Напишем функцию для обновления HTTP-запросов к подключениям WebSocket.
func (hub *Hub) handleWebSocket(w http.ResponseWriter, r *http.Request) {
socket, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
http.Error(w, "could not upgrade", http.StatusInternalServerError)
return
}
client := newClient(hub, socket)
hub.register <- client
go client.write()
}
Когда клиент подключится, добавим его в список.
func (hub *Hub) onConnect(client *Client) {
log.Println("client connected: ", client.socket.RemoteAddr())
// Make new client
hub.mutex.Lock()
defer hub.mutex.Unlock()
client.id = hub.nextID
hub.nextID++
hub.clients = append(hub.clients, client)
}
Когда клиент отключится, удалим его из списка.
func (hub *Hub) onDisconnect(client *Client) {
log.Println("client disconnected: ", client.socket.RemoteAddr())
client.close()
hub.mutex.Lock()
defer hub.mutex.Unlock()
// Find index of client
i := -1
for j, c := range hub.clients {
if c.id == client.id {
i = j
break
}
}
// Delete client from list
copy(hub.clients[i:], hub.clients[i+1:])
hub.clients[len(hub.clients)-1] = nil
hub.clients = hub.clients[:len(hub.clients)-1]
}
Точка входа
Создадим файл pusher-service/main.go
.
package main
import (
"fmt"
"log"
"net/http"
"time"
"github.com/kelseyhightower/envconfig"
"github.com/tinrab/meower/event"
"github.com/tinrab/retry"
)
type Config struct {
NatsAddress string `envconfig:"NATS_ADDRESS"`
}
func main() {
var cfg Config
err := envconfig.Process("", &cfg)
if err != nil {
log.Fatal(err)
}
// Connect to Nats
hub := newHub()
retry.ForeverSleep(2*time.Second, func(_ int) error {
es, err := event.NewNats(fmt.Sprintf("nats://%s", cfg.NatsAddress))
if err != nil {
log.Println(err)
return err
}
// Push messages to clients
err = es.OnMeowCreated(func(m event.MeowCreatedMessage) {
log.Printf("Meow received: %v\n", m)
hub.broadcast(newMeowCreatedMessage(m.ID, m.Body, m.CreatedAt), nil)
})
if err != nil {
log.Println(err)
return err
}
event.SetEventStore(es)
return nil
})
defer event.Close()
// Run WebSocket server
go hub.run()
http.HandleFunc("/pusher", hub.handleWebSocket)
err = http.ListenAndServe(":8080", nil)
if err != nil {
log.Fatal(err)
}
}
Docker image
Укажем все сервисы и зависимости между ними в файле docker-compose.yaml
.
services:
meow:
build: "."
command: "meow-service"
depends_on:
- "postgres"
- "nats"
ports:
- "8080"
environment:
POSTGRES_DB: "meower"
POSTGRES_USER: "meower"
POSTGRES_PASSWORD: "123456"
NATS_ADDRESS: "nats:4222"
query:
build: "."
command: "query-service"
depends_on:
- "postgres"
- "nats"
ports:
- "8080"
environment:
POSTGRES_DB: "meower"
POSTGRES_USER: "meower"
POSTGRES_PASSWORD: "123456"
NATS_ADDRESS: "nats:4222"
ELASTICSEARCH_ADDRESS: "elasticsearch:9200"
pusher:
build: "."
command: "pusher-service"
depends_on:
- "nats"
ports:
- "8080"
environment:
NATS_ADDRESS: "nats:4222"
Создадим Dockerfile
в корневой директории проекта. Этот образ строится в два этапа и содержит исполняемые файлы всех сервисов. Смотрите Multi-Stage Docker Builds for Kubernetes для дополнительной информации.
FROM golang:1.10.2-alpine3.7 AS build
RUN apk --no-cache add gcc g++ make ca-certificates
WORKDIR /go/src/github.com/tinrab/meower
COPY Gopkg.lock Gopkg.toml ./
COPY vendor vendor
COPY util util
COPY event event
COPY db db
COPY search search
COPY schema schema
COPY meow-service meow-service
COPY query-service query-service
COPY pusher-service pusher-service
RUN go install ./...
FROM alpine:3.7
WORKDIR /usr/bin
COPY --from=build /go/bin .
Обратный прокси
Обратный прокси будет отправлять трафик на фронтенд и обратно.
Обновим файл docker-compose.yaml
.
services:
nginx:
build: "./nginx"
ports:
- "8080:80"
depends_on:
- "meow"
- "query"
- "pusher"
Создадим папку nginx
и файл nginx/Dockerfile
.
FROM nginx:1.13.12
COPY nginx.conf /etc/nginx/nginx.conf
CMD ["nginx", "-g", "daemon off;"]
Опишем конфигурацию для NGINX в файле nginx/nginx.conf
.
user nginx;
worker_processes 1;
events {
worker_connections 1024;
}
http {
upstream meows_POST {
server meow:8080;
}
upstream meows_GET {
server query:8080;
}
upstream search_GET {
server query:8080;
}
upstream pusher {
server pusher:8080;
}
server {
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header Host $http_host;
add_header Access-Control-Allow-Origin *;
location /meows {
limit_except GET POST OPTIONS {
deny all;
}
proxy_pass http://meows_$request_method;
}
location /search {
limit_except GET OPTIONS {
deny all;
}
proxy_pass http://search_GET;
}
location /pusher {
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_pass http://pusher;
}
}
}
Фронтенд
Создадим приложение vue с использованием vue-cli 3.0+. Укажем Vuex при выборе функций.
$ vue create frontend
$ cd frontend
Добавим все необходимые зависимости.
$ yarn add bootstrap timeago.js axios vue-native-websocket
Откроем main.js
и импортируем файл Bootstrap SCSS.
import 'bootstrap/scss/bootstrap.scss';
Изменим store.js
.
import Vue from 'vue';
import Vuex from 'vuex';
import axios from 'axios';
import VueNativeSock from 'vue-native-websocket';
const BACKEND_URL = 'http://localhost:8080';
const PUSHER_URL = 'ws://localhost:8080/pusher';
const SET_MEOWS = 'SET_MEOWS';
const CREATE_MEOW = 'CREATE_MEOW';
const SEARCH_SUCCESS = 'SEARCH_SUCCESS';
const SEARCH_ERROR = 'SEARCH_ERROR';
const MESSAGE_MEOW_CREATED = 1;
Vue.use(Vuex);
const store = new Vuex.Store({
state: {
meows: [],
searchResults: [],
},
mutations: {
},
actions: {
},
});
Vue.use(VueNativeSock, PUSHER_URL, { store, format: 'json' });
export default store;
Объявим мутации WebSocket.
mutations: {
SOCKET_ONOPEN(state, event) {},
SOCKET_ONCLOSE(state, event) {},
SOCKET_ONERROR(state, event) {
console.error(event);
},
SOCKET_ONMESSAGE(state, message) {
switch (message.kind) {
case MESSAGE_MEOW_CREATED:
this.commit(CREATE_MEOW, { id: message.id, body: message.body });
}
},
// ...
},
Объявим мутации для обновления сообщений.
mutations: {
// ...
[SET_MEOWS](state, meows) {
state.meows = meows;
},
[CREATE_MEOW](state, meow) {
state.meows = [meow, ...state.meows];
},
[SEARCH_SUCCESS](state, meows) {
state.searchResults = meows;
},
[SEARCH_ERROR](state) {
state.searchResults = [];
},
},
Добавим действие для получения сообщений.
actions: {
getMeows({ commit }) {
axios
.get(`${BACKEND_URL}/meows`)
.then(({ data }) => {
commit(SET_MEOWS, data);
})
.catch((err) => console.error(err));
},
// ...
},
Добавим действие для создания сообщений.
actions: {
// ...
async createMeow({ commit }, meow) {
const { data } = await axios.post(`${BACKEND_URL}/meows`, null, {
params: {
body: meow.body,
},
});
},
// ...
},
Добавим действие для поиска сообщений.
actions: {
// ...
async searchMeows({ commit }, query) {
if (query.length == 0) {
commit(SEARCH_SUCCESS, []);
return;
}
axios
.get(`${BACKEND_URL}/search`, {
params: { query },
})
.then(({ data }) => commit(SEARCH_SUCCESS, data))
.catch((err) => {
console.error(err);
commit(SEARCH_ERROR);
});
},
},
В конце файла store.js
выполним действие getMeows
чтобы получить сообщения после загрузки страницы.
store.dispatch('getMeows');
export default store;
Компонент Meow
Создадим компонент в файле src/components/Meow.vue
, который будет отображать одно сообщение.
<template>
<div class="card">
<div class="card-body">
<p class="card-text" v-html="body"></p>
<p class="card-text">
<small class="text-muted">
{{time}}
</small>
</p>
</div>
</div>
</template>
<script>
import timeago from 'timeago.js';
export default {
props: ['meow'],
computed: {
body() {
return this.meow.body;
},
time() {
return timeago().format(Date.parse(this.meow.created_at));
},
},
};
</script>
<style lang="scss" scoped>
.card {
margin-bottom: 1rem;
}
.card-body {
padding: 0.5rem;
p {
margin-bottom: 0;
}
}
</style>
Компонент Timeline
В файле src/components/Timeline.vue
определим компонент для отображения списка сообщений и формы для публикации новых.
<template>
<div>
<form v-on:submit.prevent="createMeow">
<div class="input-group">
<input v-model.trim="meowBody" type="text" class="form-control" placeholder="What's happening?">
<div class="input-group-append">
<button class="btn btn-primary" type="submit">Meow</button>
</div>
</div>
</form>
<div class="mt-4">
<Meow v-for="meow in meows" :key="meow.id" :meow="meow" />
</div>
</div>
</template>
<script>
import { mapState } from 'vuex';
import Meow from '@/components/Meow';
export default {
data() {
return {
meowBody: '',
};
},
computed: mapState({
meows: (state) => state.meows,
}),
methods: {
createMeow() {
if (this.meowBody.length != 0) {
this.$store.dispatch('createMeow', { body: this.meowBody });
this.meowBody = '';
}
},
},
components: {
Meow,
},
};
</script>
Компонент Search
Компонент для поиска опишем в src/components/Search.vue
. Он похож на Timeline и будет осуществлять поиск при каждом изменении поля ввода.
<template>
<div>
<input @keyup="searchMeows" v-model.trim="query" type="text" class="form-control" placeholder="Search...">
<div class="mt-4">
<Meow v-for="meow in meows" :key="meow.id" :meow="meow" />
</div>
</div>
</template>
<script>
import { mapState } from 'vuex';
import Meow from '@/components/Meow';
export default {
data() {
return {
query: '',
};
},
computed: mapState({
meows: (state) => state.searchResults,
}),
methods: {
searchMeows() {
if (this.query != this.lastQuery) {
this.$store.dispatch('searchMeows', this.query);
this.lastQuery = this.query;
}
},
},
components: {
Meow,
},
};
</script>
Макет приложения
Включим компоненты Search и Timeline в src/App.vue
.
<template>
<div class="container py-5">
<div class="row mb-4">
<h1 class="col-12">Meower</h1>
</div>
<div class="row">
<Timeline class="col" />
<Search class="col" />
</div>
</div>
</template>
<script>
import Timeline from '@/components/Timeline';
import Search from '@/components/Search';
export default {
components: {
Timeline,
Search,
},
};
</script>
<style lang="scss" scoped>
.container {
max-width: 768px;
}
</style>
Завершение
На этом этапе все должно работать так, как и ожидалось. Для запуска приложения сначала соберем образ Docker с помощью Docker Compose, а затем запустим сервер для разработки Vue.
$ docker-compose up -d --build
$ cd frontend && yarn serve
Так выглядит Meower
Весь исходный код доступен на GitHub.