VK Docs logo
Помощь
Обновлена 30 декабря 2025 г. в 06:16

Руководство по эксплуатации

Руководство по эксплуатации описывает примеры по созданию приложений-клиентов TQE(MQ), которые позволят публиковать сообщения в очередь и подписываться на сообщения очереди по протоколу gRPC с использованием персистентной подписки. Примеры реализованы на языке go.

Подготовка

Для дальнейшей сборки примеров потребуются следующие установленные и подготовленные компоненты:

  • go версии > 1.21.1
  • protoc версии > 23.4
  • protoc-gen-go версии > 1.28.1
  • proto-gen-grpc версии > 1.2.0
  • proto-файлы с описанием gRPC-протокола TQE(MQ).

Создание директории для сборки клиентов очереди

Создадим директорию с go-проектом, в которой в дальнейшем будут размещаться примеры клиентов:

mkdir mqee-clientscd mqee-clientsgo mod init example.com/mqee-clients

Для сборки примеров клиентов очереди TQE(MQ) необходимо установить зависимости с помощью команды:

go get google.golang.org/grpcgo get google.golang.org/protobuf

Генерация gRPC-клиента на Go

Для генерации gRPC-клиента потребуются компиляторы protoc и плагины для компиляции go-файлов.

Создайте директорию для хранения необходимых для генерации gRPC-клиента исполняемых файлов:

mkdir -p bin

Скачайте и установите дистрибутив protoc, указав версию, тип операционной системы и архитектуру:

PB_REL="https://github.com/protocolbuffers/protobuf/releases"VERSION=23.4OS=linuxARCH=x86_64mkdir -p tmpcurl -o tmp/protoc.zip -L $PB_REL/download/v$VERSION/protoc-$VERSION-$OS-$ARCH.zipunzip -j ./tmp/protoc.zip bin/protoc -d bin

Установите плагины для protoc:

GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0

Для создания примеров клиентов очереди необходимо получить архив с .proto-файлами, которые поставляются в составе дистрибутива TQE(MQ).

Полученный архив распакуйте во временный каталог:

tar xfzv tmp/protobuf-message-queue-ee-1.5.2.src.tar.gz -C tmp

Предварительно для генерации go-файлов с использованием protoc установите директорию bin в переменную окружения PATH:

export PATH=$PWD/bin:$PATH

Сгенерируйте go-файлы проекта:

mkdir -p protocolprotoc \ --proto_path=$(pwd)/tmp/message-queue-ee/include/tarantool/message_queue_ee/ \ --go_out=protocol \ --go_opt=module=gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/protocol \ --go-grpc_out=protocol \ --go-grpc_opt=module=gitlab.vkteam.ru/tarantool/tqe/message-queue.git/v3/protocol \ services/consumer.proto services/producer.proto messages/message.proto

Примеры сценариев использования очереди

Клиент сервиса подписки на сообщения из очереди

Подготовьте исходный файл примера клиента сервиса подписки на очередь сообщений:

mkdir -p cmd/consumertouch cmd/consumer/main.go

Содержание файла:

package mainfunc main() {}

Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main, указав при этом адрес подключения к gRPC-интерфейсу очереди:

ctx := context.Background()conn, err := grpc.DialContext(	ctx,	"localhost:18182",	grpc.WithTransportCredentials(insecure.NewCredentials()),)if err != nil {	panic(err)}defer conn.Close()

Далее необходимо создать сам клиент сервиса подписки и отправить запрос на подписку на сообщения из очереди queue. Для использования персистентной подписки нужно указать идентификатор и время жизни подписки:

client := protocol.NewConsumerServiceClient(conn)subscribe, err := client.Subscribe(ctx)if err != nil {	panic(err)}subreq := &protocol.SubscriptionRequest{}subreq.SetQueue("queue")subreq.SetConsumeId("example-consume-id")subreq.SetTtl(float32(time.Hour.Seconds()))streamreq := new(protocol.SubscriptionStreamRequest)streamreq.SetSubscribeRequest(subreq)if err := subscribe.Send(streamreq); err != nil {	panic(err)}fmt.Println("Start listening...")

После того как выполнено подключение, необходимо запустить обработку входящих сообщений. Для этого необходимо создать среду выполнения Go, которая будет получать сообщения из gRPC-потока, выводить краткое уведомление и обновлять состояние подписки:

go func() {	for {		recv, err := subscribe.Recv()		if err != nil {			panic(err)		}		switch recv.WhichResponse() {		case protocol.SubscriptionStreamResponse_Notifications_case:			nresp := recv.GetNotifications()			for _, notification := range notifResp.GetNotifications() {				cursor := notification.GetCursor()				message := notification.GetMessage()				fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n\n",					message.GetQueue(), cursor, message.GetId(), message.GetPayload())								comreq := new(protocol.CommitRequest)				comreq.SetCursor(cursor)				streamreq := new(protocol.SubscriptionStreamRequest)				streamreq.SetCommitRequest(comreq)				if err := stream.Send(streamreq); err != nil {					panic(err)				}			}			fmt.Println()		case protocol.SubscriptionStreamResponse_CommitResponse_case:			resp := recv.GetCommitResponse()			cursor := resp.GetCursor()			fmt.Printf("Consumer state has updated by %s cursor\n", cursor)		}	}}()

Добавьте обработку системных сигналов:

notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)<-notifyCtx.Done()fmt.Println("Exit")

Клиент сервиса подписки на сообщения создан.

Можно выполнить его запуск с помощью следующей команды:

$ go run example.com/mqee-clients/cmd/consumerStart listening...

После этого клиент ожидает появления новых сообщений в очереди.

Клиент сервиса подписки на сообщения из очереди

(user-produce)=

Публикация сообщений (Produce)

Создайте файл, который будет содержать исходный код клиента сервиса для публикации сообщений в очередь:

mkdir -p cmd/producertouch cmd/producer/main.go

Первоначальное содержимое файла:

package mainfunc main() {}

Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main, указав при этом адрес подключения к gRPC-интерфейсу очереди:

func main() {        // ...	ctx := context.Background()	conn, err := grpc.DialContext(		ctx,		"localhost:18182",		grpc.WithTransportCredentials(insecure.NewCredentials()),	)	if err != nil {		panic(err)	}	defer conn.Close()	// ...}

Далее необходимо создать сам клиент сервиса публикации для дальнейшей отправки сообщений в очередь:

client := protocol.NewProducerClient(conn)

Отправьте запрос на публикацию сообщения с помощью вызова метода Produce:

responseBatch, err := client.Produce(ctx, &protocol.ProduceRequest{	Queue: "queue",	Messages: []*protocol.ProduceMessage{		{Payload: []byte("First message from batch")},		{Payload: []byte("Second message from batch")},	},})if err != nil {	panic(err)}fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)

Теперь при запуске клиента будет отправлено два сообщения с помощью вызова метода Produce:

$ go run example.com/mqee-clients/cmd/producerMessages published with ids: [115720 115721]

Если в другом окне терминала запущен клиент-подписчик (cmd/consumer/main.go), вы получите уведомление о новых сообщениях:

Notification:	Queue: queue	Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQR	Id: 115729	Payload: First message from batchConsumer state has updatedNotification:	Queue: queue	Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQS	Id: 115730	Payload: Second message from batchConsumer state has updated

Дополнительно

FAQ

Q: Получаю ошибку о нехватке зависимостей no required module provides package, например:

protocol/consumer_grpc.pb.go:11:2: no required module provides package google.golang.org/grpc; to add it:	go get google.golang.org/grpcprotocol/consumer_grpc.pb.go:12:2: no required module provides package google.golang.org/grpc/codes; to add it:	go get google.golang.org/grpc/codesprotocol/consumer_grpc.pb.go:13:2: no required module provides package google.golang.org/grpc/status; to add it:	go get google.golang.org/grpc/statusprotocol/consumer.pb.go:10:2: no required module provides package google.golang.org/protobuf/reflect/protoreflect; to add it:	go get google.golang.org/protobuf/reflect/protoreflectprotocol/consumer.pb.go:11:2: no required module provides package google.golang.org/protobuf/runtime/protoimpl; to add it:	go get google.golang.org/protobuf/runtime/protoimplcmd/producer/main.go:9:2: no required module provides package google.golang.org/grpc/credentials/insecure; to add it:	go get google.golang.org/grpc/credentials/insecure

A: Выполните команду go mod tidy для установки всех зависимостей.

Файловая структура

.├── bin│   ├── protoc│   ├── protoc-gen-go│   └── protoc-gen-go-grpc├── cmd│   ├── consumer│   │   └── main.go│   └── producer│       └── main.go├── go.mod├── go.sum├── protocol│   ├── consumer.pb.go│   ├── consumer_grpc.pb.go│   ├── message.pb.go│   ├── producer.pb.go│   └── producer_grpc.pb.go└── tmp    ├── message-queue-ee    │   └── include    │       └── tarantool    │           ├── message_queue_ee    │           │   ├── messages    │           │   │   └── message.proto    │           │   └── services    │           │       ├── consumer.proto    │           │       └── producer.proto    │           └── queue_ee    │               ├── messages    │               │   └── message.proto    │               └── services    │                   ├── consumer.proto    │                   └── producer.proto    └── protobuf-message-queue-ee-1.5.2.src.tar.gz16 directories, 20 files

Листинги файлов

cmd/consumer/main.go

package mainimport (	"context"	"fmt"	"os/signal"	"syscall"	"example.com/mqee-clients/protocol"	"google.golang.org/grpc"	"google.golang.org/grpc/credentials/insecure")func main() {	ctx := context.Background()	conn, err := grpc.DialContext(		ctx,		"localhost:18182",		grpc.WithTransportCredentials(insecure.NewCredentials()),	)	if err != nil {		panic(err)	}	defer conn.Close()	client := protocol.NewConsumerServiceClient(conn)	subscribe, err := client.Subscribe(ctx)	if err != nil {		panic(err)	}	subreq := &protocol.SubscriptionRequest{}	subreq.SetQueue("queue")	subreq.SetConsumeId("example-consume-id")	subreq.SetTtl(float32(time.Hour.Seconds()))	streamreq := new(protocol.SubscriptionStreamRequest)	streamreq.SetSubscribeRequest(subreq)	if err := subscribe.Send(streamreq); err != nil {		panic(err)	}	fmt.Println("Start listening...")	go func() {		for {			recv, err := subscribe.Recv()			if err != nil {				panic(err)			}			switch recv.WhichResponse() {			case protocol.SubscriptionStreamResponse_Notifications_case:				nresp := recv.GetNotifications()				for _, notification := range notifResp.GetNotifications() {					cursor := notification.GetCursor()					message := notification.GetMessage()					fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n\n",						message.GetQueue(), cursor, message.GetId(), message.GetPayload())										comreq := new(protocol.CommitRequest)					comreq.SetCursor(cursor)					streamreq := new(protocol.SubscriptionStreamRequest)					streamreq.SetCommitRequest(comreq)					if err := stream.Send(streamreq); err != nil {						panic(err)					}				}				fmt.Println()			case protocol.SubscriptionStreamResponse_CommitResponse_case:				resp := recv.GetCommitResponse()				cursor := resp.GetCursor()				fmt.Printf("Consumer state has updated by %s cursor\n", cursor)		}	}()	notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)	<-notifyCtx.Done()	fmt.Println("Exit")}

cmd/producer/main.go:

package mainimport (	"context"	"fmt"	"example.com/mqee-clients/protocol"	"google.golang.org/grpc"	"google.golang.org/grpc/credentials/insecure")func main() {	ctx := context.Background()	conn, err := grpc.DialContext(		ctx,		"localhost:18182",		grpc.WithTransportCredentials(insecure.NewCredentials()),	)	if err != nil {		panic(err)	}	defer conn.Close()	client := protocol.NewProducerClient(conn)	responseBatch, err := client.Produce(ctx, &protocol.ProduceRequest{		Queue: "queue",		Messages: []*protocol.ProduceMessage{			{Payload: []byte("First message from batch")},			{Payload: []byte("Second message from batch")},		},	})	if err != nil {		panic(err)	}	fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)}