Руководство пользователя¶
Руководство пользователя описывает примеры по созданию приложений-клиентов TQE(MQ), которые позволят публиковать сообщения в очередь и подписываться на сообщения очереди по протоколу gRPC
.
Примеры реализованы на языке go
.
Важно! Примеры кода в данном руководстве протестированы с очередью, настройки для которой были взяты из документа «Быстрый старт». Поэтому вначале рекомендуется ознакомится с этим документом.
Подготовка¶
Для дальнейшей сборки примеров потребуются следующие установленные и подготовленные компоненты:
go
версии > 1.21.1protoc
версии > 23.4protoc-gen-go
версии > 1.28.1proto-gen-grpc
версии > 1.2.0proto
-файлы с описанием gRPC-протокола TQE(MQ).
Создание директории для сборки клиентов очереди¶
Создадим директорию с go
-проектом, в которой в дальнейшем будут размещаться примеры клиентов.
mkdir mqee-clients
cd mqee-clients
go mod init example.com/mqee-clients
Для сборки примеров клиентов очереди TQE(MQ) необходимо установить зависимости с помощью команды:
go get google.golang.org/grpc
go get google.golang.org/protobuf
Генерация gRPC-клиента на Go¶
Для генерации gRPC-клиента потребуются компиляторы protoc
и плагины для компиляции go
-файлов.
Создайте директорию для хранения необходимых для генерации gRPC-клиента исполняемых файлов:
mkdir -p bin
Скачайте и установите дистрибутив protoc
, указав версию, тип операционной системы и архитектуру:
PB_REL="https://github.com/protocolbuffers/protobuf/releases"
VERSION=23.4
OS=linux
ARCH=x86_64
mkdir -p tmp
curl -o tmp/protoc.zip -L $PB_REL/download/v$VERSION/protoc-$VERSION-$OS-$ARCH.zip
unzip -j ./tmp/protoc.zip bin/protoc -d bin
Установите плагины для protoc
:
GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1
GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0
Для создания примеров клиентов очереди необходимо получить архив с .proto
-файлами, которые поставляются в составе дистрибутива TQM(MQ).
Полученный архив распакуйте во временный каталог:
tar xfzv tmp/protobuf-message-queue-ee-1.5.2.src.tar.gz -C tmp
Предварительно для генерации go-файлов с использованием protoc
установите директорию bin
в переменную окружения PATH
:
export PATH=$PWD/bin:$PATH
Сгенерируйте go-файлы проекта:
mkdir -p protocol
protoc \
--proto_path=$(pwd)/tmp/message-queue-ee/include/tarantool/message_queue_ee/ \
--go_out=protocol \
--go_opt=module=github.com/tarantool/message-queue-ee/protocol \
--go-grpc_out=protocol \
--go-grpc_opt=module=github.com/tarantool/message-queue-ee/protocol \
services/consumer.proto services/publisher.proto messages/message.proto
Примеры сценариев использования очереди¶
Клиент сервиса подписки на сообщения из очереди¶
Подготовьте исходный файл примера клиента сервиса подписки на очередь сообщений:
mkdir -p cmd/consumer
touch cmd/consumer/main.go
Содержание файла:
package main
func main() {
}
Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main
, указав при этом адрес подключения к gRPC-интерфейсу очереди:
ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"localhost:18182",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
Далее необходимо создать сам клиент сервиса подписки и отправить запрос на подписку на сообщения из очереди queue
:
client := protocol.NewConsumerServiceClient(conn)
subscribe, err := client.Subscribe(ctx, &protocol.SubscriptionRequest{
Queue: "queue",
})
if err != nil {
panic(err)
}
fmt.Println("Start listening...")
После того как выполнено подключение, необходимо запустить обработку входящих сообщений. Для этого необходимо создать среду выполнения Go, которая будет получать сообщения из gRPC-потока и выводить краткое уведомление:
go func() {
for {
recv, err := subscribe.Recv()
if err != nil {
panic(err)
}
for _, notification := range recv.Notifications {
cursor := notification.Cursor
message := notification.Message
fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n",
message.Queue, cursor, message.Id, message.Payload)
}
fmt.Println()
}
}()
Добавьте обработку системных сигналов:
notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
<-notifyCtx.Done()
fmt.Println("Exit")
Клиент сервиса подписки на сообщения создан.
Можно выполнить его запуск с помощью следующей команды:
$ go run example.com/mqee-clients/cmd/consumer
Start listening...
После этого клиент ожидает появления новых сообщений в очереди.
Клиент сервиса публикации сообщения в очередь¶
Публикация одного сообщения (Publish)¶
Создайте файл, который будет содержать исходный код клиента сервиса для публикации сообщений в очередь:
mkdir -p cmd/publisher
touch cmd/publisher/main.go
Первоначальное содержимое файла:
package main
func main() {
}
Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main
, указав при этом адрес подключения к gRPC-интерфейсу очереди:
func main() {
// ...
ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"localhost:18182",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// ...
}
Далее необходимо создать сам клиент сервиса публикации для дальнейшей отправки сообщений в очередь:
client := protocol.NewPublisherServiceClient(conn)
Отправьте запрос на публикацию сообщения с помощью вызова метода Publish
:
response, err := client.Publish(ctx, &protocol.PublishRequest{
Queue: "queue",
Payload: []byte("Message published by 'Publish' method"),
})
if err != nil {
panic(err)
}
fmt.Printf("Message published with id: %d\n", response.Id)
Теперь можно запустить клиент сервиса публикации, чтобы опубликовать сообщение в очередь:
$ go run example.com/mqee-clients/cmd/publisher
Message published with id: 115719
Если в другом окне терминала запущен клиент-подписчик (cmd/consumer/main.go
), вы получите уведомление о сообщении:
Notification:
Queue: queue
Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQQ
Id: 115728
Payload: Message published by 'Publish' method
Публикация набора сообщений (PublishBatch)¶
Теперь реализуем отправку группы сообщений (Batch) с помощью одного запроса.
Для этого необходимо добавить в конце функции main
клиента сервиса публикации сообщений следующий код:
responseBatch, err := client.PublishBatch(ctx, &protocol.PublishBatchRequest{
Queue: "queue",
Messages: []*protocol.BatchRequestMessage{
{Payload: []byte("First message from batch")},
{Payload: []byte("Second message from batch")},
},
})
if err != nil {
panic(err)
}
fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)
Теперь при запуске клиента будет отправлено три сообщения: одно с помощью вызова метода Publish
, а второе и третье – с помощью вызова метода PublishBatch
:
$ go run example.com/mqee-clients/cmd/publisher
Message published with id: 115719
Messages published with ids: [115720 115721]
Сервис подписки на сообщения очереди получит уведомления о новых сообщениях:
Notification:
Queue: queue
Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQQ
Id: 115728
Payload: Message published by 'Publish' method
Notification:
Queue: queue
Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQR
Id: 115729
Payload: First message from batch
Notification:
Queue: queue
Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQS
Id: 115730
Payload: Second message from batch
Дополнительно¶
FAQ¶
Q: Получаю ошибку о нехватке зависимостей no required module provides package
, например,
protocol/consumer_grpc.pb.go:11:2: no required module provides package google.golang.org/grpc; to add it:
go get google.golang.org/grpc
protocol/consumer_grpc.pb.go:12:2: no required module provides package google.golang.org/grpc/codes; to add it:
go get google.golang.org/grpc/codes
protocol/consumer_grpc.pb.go:13:2: no required module provides package google.golang.org/grpc/status; to add it:
go get google.golang.org/grpc/status
protocol/consumer.pb.go:10:2: no required module provides package google.golang.org/protobuf/reflect/protoreflect; to add it:
go get google.golang.org/protobuf/reflect/protoreflect
protocol/consumer.pb.go:11:2: no required module provides package google.golang.org/protobuf/runtime/protoimpl; to add it:
go get google.golang.org/protobuf/runtime/protoimpl
cmd/publisher/main.go:9:2: no required module provides package google.golang.org/grpc/credentials/insecure; to add it:
go get google.golang.org/grpc/credentials/insecure
A: Выполните команду go mod tidy
для установки всех зависимостей.
Файловая структура¶
.
├── bin
│ ├── protoc
│ ├── protoc-gen-go
│ └── protoc-gen-go-grpc
├── cmd
│ ├── consumer
│ │ └── main.go
│ └── publisher
│ └── main.go
├── go.mod
├── go.sum
├── protocol
│ ├── consumer.pb.go
│ ├── consumer_grpc.pb.go
│ ├── message.pb.go
│ ├── publisher.pb.go
│ └── publisher_grpc.pb.go
└── tmp
├── message-queue-ee
│ └── include
│ └── tarantool
│ ├── message_queue_ee
│ │ ├── messages
│ │ │ └── message.proto
│ │ └── services
│ │ ├── consumer.proto
│ │ └── publisher.proto
│ └── queue_ee
│ ├── messages
│ │ └── message.proto
│ └── services
│ ├── consumer.proto
│ └── publisher.proto
└── protobuf-message-queue-ee-1.5.2.src.tar.gz
16 directories, 20 files
Листинги файлов¶
cmd/consumer/main.go
package main
import (
"context"
"fmt"
"os/signal"
"syscall"
"example.com/mqee-clients/protocol"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"localhost:18182",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
client := protocol.NewConsumerServiceClient(conn)
subscribe, err := client.Subscribe(ctx, &protocol.SubscriptionRequest{
Queue: "queue",
})
if err != nil {
panic(err)
}
fmt.Println("Start listening...")
go func() {
for {
recv, err := subscribe.Recv()
if err != nil {
panic(err)
}
for _, notification := range recv.Notifications {
cursor := notification.Cursor
message := notification.Message
fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n",
message.Queue, cursor, message.Id, message.Payload)
}
fmt.Println()
}
}()
notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
<-notifyCtx.Done()
fmt.Println("Exit")
}
cmd/publisher/main.go
:
package main
import (
"context"
"fmt"
"example.com/mqee-clients/protocol"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
ctx := context.Background()
conn, err := grpc.DialContext(
ctx,
"localhost:18182",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
client := protocol.NewPublisherServiceClient(conn)
response, err := client.Publish(ctx, &protocol.PublishRequest{
Queue: "queue",
Payload: []byte("Message published by 'Publish' method"),
})
if err != nil {
panic(err)
}
fmt.Printf("Message published with id: %d\n", response.Id)
responseBatch, err := client.PublishBatch(ctx, &protocol.PublishBatchRequest{
Queue: "queue",
Messages: []*protocol.BatchRequestMessage{
{Payload: []byte("First message from batch")},
{Payload: []byte("Second message from batch")},
},
})
if err != nil {
panic(err)
}
fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)
}