Руководство по эксплуатации
Руководство по эксплуатации описывает примеры по созданию приложений-клиентов TQE(MQ), которые позволят
публиковать сообщения в очередь и подписываться на сообщения очереди по протоколу gRPC с использованием
персистентной подписки. Примеры реализованы на языке go.
Для дальнейшей сборки примеров потребуются следующие установленные и подготовленные компоненты:
goверсии > 1.21.1protocверсии > 23.4protoc-gen-goверсии > 1.28.1proto-gen-grpcверсии > 1.2.0proto-файлы с описанием gRPC-протокола TQE(MQ).
Создадим директорию с go-проектом, в которой в дальнейшем будут размещаться примеры клиентов:
mkdir mqee-clientscd mqee-clientsgo mod init example.com/mqee-clients
Для сборки примеров клиентов очереди TQE(MQ) необходимо установить зависимости с помощью команды:
go get google.golang.org/grpcgo get google.golang.org/protobuf
Для генерации gRPC-клиента потребуются компиляторы protoc и плагины для компиляции go-файлов.
Создайте директорию для хранения необходимых для генерации gRPC-клиента исполняемых файлов:
mkdir -p bin
Скачайте и установите дистрибутив protoc, указав версию, тип операционной системы и архитектуру:
PB_REL="https://github.com/protocolbuffers/protobuf/releases"VERSION=23.4OS=linuxARCH=x86_64mkdir -p tmpcurl -o tmp/protoc.zip -L $PB_REL/download/v$VERSION/protoc-$VERSION-$OS-$ARCH.zipunzip -j ./tmp/protoc.zip bin/protoc -d bin
Установите плагины для protoc:
GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0
Для создания примеров клиентов очереди необходимо получить архив с .proto-файлами, которые поставляются в составе дистрибутива TQE(MQ).
Полученный архив распакуйте во временный каталог:
tar xfzv tmp/protobuf-message-queue-ee-1.5.2.src.tar.gz -C tmp
Предварительно для генерации go-файлов с использованием protoc установите директорию bin в переменную окружения PATH:
export PATH=$PWD/bin:$PATH
Сгенерируйте go-файлы проекта:
mkdir -p protocolprotoc \--proto_path=$(pwd)/tmp/message-queue-ee/include/tarantool/message_queue_ee/ \--go_out=protocol \--go_opt=module=gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/protocol \--go-grpc_out=protocol \--go-grpc_opt=module=gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/protocol \services/consumer.proto services/producer.proto messages/message.proto
Подготовьте исходный файл примера клиента сервиса подписки на очередь сообщений:
mkdir -p cmd/consumertouch cmd/consumer/main.go
Содержание файла:
package mainfunc main() {}
Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main,
указав при этом адрес подключения к gRPC-интерфейсу очереди:
ctx := context.Background()conn, err := grpc.DialContext(ctx,"localhost:18182",grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {panic(err)}defer conn.Close()
Далее необходимо создать сам клиент сервиса подписки и отправить запрос на подписку на сообщения из
очереди queue. Для использования персистентной подписки нужно указать идентификатор и время жизни подписки:
client := protocol.NewConsumerServiceClient(conn)subscribe, err := client.Subscribe(ctx)if err != nil {panic(err)}subreq := &protocol.SubscriptionRequest{}subreq.SetQueue("queue")subreq.SetConsumeId("example-consume-id")subreq.SetTtl(float32(time.Hour.Seconds()))streamreq := new(protocol.SubscriptionStreamRequest)streamreq.SetSubscribeRequest(subreq)if err := subscribe.Send(streamreq); err != nil {panic(err)}fmt.Println("Start listening...")
После того как выполнено подключение, необходимо запустить обработку входящих сообщений. Для этого необходимо создать среду выполнения Go, которая будет получать сообщения из gRPC-потока, выводить краткое уведомление и обновлять состояние подписки:
go func() {for {recv, err := subscribe.Recv()if err != nil {panic(err)}switch recv.WhichResponse() {case protocol.SubscriptionStreamResponse_Notifications_case:nresp := recv.GetNotifications()for _, notification := range notifResp.GetNotifications() {cursor := notification.GetCursor()message := notification.GetMessage()fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n\n",message.GetQueue(), cursor, message.GetId(), message.GetPayload())comreq := new(protocol.CommitRequest)comreq.SetCursor(cursor)streamreq := new(protocol.SubscriptionStreamRequest)streamreq.SetCommitRequest(comreq)if err := stream.Send(streamreq); err != nil {panic(err)}}fmt.Println()case protocol.SubscriptionStreamResponse_CommitResponse_case:resp := recv.GetCommitResponse()cursor := resp.GetCursor()fmt.Printf("Consumer state has updated by %s cursor\n", cursor)}}}()
Добавьте обработку системных сигналов:
notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)<-notifyCtx.Done()fmt.Println("Exit")
Клиент сервиса подписки на сообщения создан.
Можно выполнить его запуск с помощью следующей команды:
$ go run example.com/mqee-clients/cmd/consumerStart listening...
После этого клиент ожидает появления новых сообщений в очереди.
(user-produce)=
Создайте файл, который будет содержать исходный код клиента сервиса для публикации сообщений в очередь:
mkdir -p cmd/producertouch cmd/producer/main.go
Первоначальное содержимое файла:
package mainfunc main() {}
Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main, указав
при этом адрес подключения к gRPC-интерфейсу очереди:
func main() {// ...ctx := context.Background()conn, err := grpc.DialContext(ctx,"localhost:18182",grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {panic(err)}defer conn.Close()// ...}
Далее необходимо создать сам клиент сервиса публикации для дальнейшей отправки сообщений в очередь:
client := protocol.NewProducerClient(conn)
Отправьте запрос на публикацию сообщения с помощью вызова метода Produce:
responseBatch, err := client.Produce(ctx, &protocol.ProduceRequest{Queue: "queue",Messages: []*protocol.ProduceMessage{{Payload: []byte("First message from batch")},{Payload: []byte("Second message from batch")},},})if err != nil {panic(err)}fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)
Теперь при запуске клиента будет отправлено два сообщения с помощью вызова метода Produce:
$ go run example.com/mqee-clients/cmd/producerMessages published with ids: [115720 115721]
Если в другом окне терминала запущен клиент-подписчик (cmd/consumer/main.go), вы получите уведомление о новых сообщениях:
Notification:Queue: queueCursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQRId: 115729Payload: First message from batchConsumer state has updatedNotification:Queue: queueCursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQSId: 115730Payload: Second message from batchConsumer state has updated
Q: Получаю ошибку о нехватке зависимостей no required module provides package, например:
protocol/consumer_grpc.pb.go:11:2: no required module provides package google.golang.org/grpc; to add it:go get google.golang.org/grpcprotocol/consumer_grpc.pb.go:12:2: no required module provides package google.golang.org/grpc/codes; to add it:go get google.golang.org/grpc/codesprotocol/consumer_grpc.pb.go:13:2: no required module provides package google.golang.org/grpc/status; to add it:go get google.golang.org/grpc/statusprotocol/consumer.pb.go:10:2: no required module provides package google.golang.org/protobuf/reflect/protoreflect; to add it:go get google.golang.org/protobuf/reflect/protoreflectprotocol/consumer.pb.go:11:2: no required module provides package google.golang.org/protobuf/runtime/protoimpl; to add it:go get google.golang.org/protobuf/runtime/protoimplcmd/producer/main.go:9:2: no required module provides package google.golang.org/grpc/credentials/insecure; to add it:go get google.golang.org/grpc/credentials/insecure
A: Выполните команду go mod tidy для установки всех зависимостей.
.├── bin│ ├── protoc│ ├── protoc-gen-go│ └── protoc-gen-go-grpc├── cmd│ ├── consumer│ │ └── main.go│ └── producer│ └── main.go├── go.mod├── go.sum├── protocol│ ├── consumer.pb.go│ ├── consumer_grpc.pb.go│ ├── message.pb.go│ ├── producer.pb.go│ └── producer_grpc.pb.go└── tmp├── message-queue-ee│ └── include│ └── tarantool│ ├── message_queue_ee│ │ ├── messages│ │ │ └── message.proto│ │ └── services│ │ ├── consumer.proto│ │ └── producer.proto│ └── queue_ee│ ├── messages│ │ └── message.proto│ └── services│ ├── consumer.proto│ └── producer.proto└── protobuf-message-queue-ee-1.5.2.src.tar.gz16 directories, 20 files
cmd/consumer/main.go
package mainimport ("context""fmt""os/signal""syscall""example.com/mqee-clients/protocol""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure")func main() {ctx := context.Background()conn, err := grpc.DialContext(ctx,"localhost:18182",grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {panic(err)}defer conn.Close()client := protocol.NewConsumerServiceClient(conn)subscribe, err := client.Subscribe(ctx)if err != nil {panic(err)}subreq := &protocol.SubscriptionRequest{}subreq.SetQueue("queue")subreq.SetConsumeId("example-consume-id")subreq.SetTtl(float32(time.Hour.Seconds()))streamreq := new(protocol.SubscriptionStreamRequest)streamreq.SetSubscribeRequest(subreq)if err := subscribe.Send(streamreq); err != nil {panic(err)}fmt.Println("Start listening...")go func() {for {recv, err := subscribe.Recv()if err != nil {panic(err)}switch recv.WhichResponse() {case protocol.SubscriptionStreamResponse_Notifications_case:nresp := recv.GetNotifications()for _, notification := range notifResp.GetNotifications() {cursor := notification.GetCursor()message := notification.GetMessage()fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n\n",message.GetQueue(), cursor, message.GetId(), message.GetPayload())comreq := new(protocol.CommitRequest)comreq.SetCursor(cursor)streamreq := new(protocol.SubscriptionStreamRequest)streamreq.SetCommitRequest(comreq)if err := stream.Send(streamreq); err != nil {panic(err)}}fmt.Println()case protocol.SubscriptionStreamResponse_CommitResponse_case:resp := recv.GetCommitResponse()cursor := resp.GetCursor()fmt.Printf("Consumer state has updated by %s cursor\n", cursor)}}()notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)<-notifyCtx.Done()fmt.Println("Exit")}
cmd/producer/main.go:
package mainimport ("context""fmt""example.com/mqee-clients/protocol""google.golang.org/grpc""google.golang.org/grpc/credentials/insecure")func main() {ctx := context.Background()conn, err := grpc.DialContext(ctx,"localhost:18182",grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {panic(err)}defer conn.Close()client := protocol.NewProducerClient(conn)responseBatch, err := client.Produce(ctx, &protocol.ProduceRequest{Queue: "queue",Messages: []*protocol.ProduceMessage{{Payload: []byte("First message from batch")},{Payload: []byte("Second message from batch")},},})if err != nil {panic(err)}fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)}