Руководство пользователя | Mq_Ee
Руководство пользователя

Руководство пользователя

Руководство пользователя описывает примеры по созданию приложений-клиентов TQE(MQ), которые позволят публиковать сообщения в очередь и подписываться на сообщения очереди по протоколу gRPC. Примеры реализованы на языке go.

Важно! Примеры кода в данном руководстве протестированы с очередью, настройки для которой были взяты из документа «Быстрый старт». Поэтому вначале рекомендуется ознакомится с этим документом.

Подготовка

Для дальнейшей сборки примеров потребуются следующие установленные и подготовленные компоненты:

  • go версии > 1.21.1

  • protoc версии > 23.4

  • protoc-gen-go версии > 1.28.1

  • proto-gen-grpc версии > 1.2.0

  • proto-файлы с описанием gRPC-протокола TQE(MQ).

Создание директории для сборки клиентов очереди

Создадим директорию с go-проектом, в которой в дальнейшем будут размещаться примеры клиентов.

mkdir mqee-clients
cd mqee-clients
go mod init example.com/mqee-clients

Для сборки примеров клиентов очереди TQE(MQ) необходимо установить зависимости с помощью команды:

go get google.golang.org/grpc
go get google.golang.org/protobuf

Генерация gRPC-клиента на Go

Для генерации gRPC-клиента потребуются компиляторы protoc и плагины для компиляции go-файлов.

Создайте директорию для хранения необходимых для генерации gRPC-клиента исполняемых файлов:

mkdir -p bin

Скачайте и установите дистрибутив protoc, указав версию, тип операционной системы и архитектуру:

PB_REL="https://github.com/protocolbuffers/protobuf/releases"
VERSION=23.4
OS=linux
ARCH=x86_64
mkdir -p tmp
curl -o tmp/protoc.zip -L $PB_REL/download/v$VERSION/protoc-$VERSION-$OS-$ARCH.zip
unzip -j ./tmp/protoc.zip bin/protoc -d bin

Установите плагины для protoc:

GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/protobuf/cmd/protoc-gen-go@v1.28.1
GOBIN=$(pwd)/bin go install -ldflags '-w -s' google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2.0

Для создания примеров клиентов очереди необходимо получить архив с .proto-файлами, которые поставляются в составе дистрибутива TQM(MQ).

Полученный архив распакуйте во временный каталог:

tar xfzv tmp/protobuf-message-queue-ee-1.5.2.src.tar.gz -C tmp

Предварительно для генерации go-файлов с использованием protoc установите директорию bin в переменную окружения PATH:

export PATH=$PWD/bin:$PATH

Сгенерируйте go-файлы проекта:

mkdir -p protocol
protoc \
 --proto_path=$(pwd)/tmp/message-queue-ee/include/tarantool/message_queue_ee/ \
 --go_out=protocol \
 --go_opt=module=github.com/tarantool/message-queue-ee/protocol \
 --go-grpc_out=protocol \
 --go-grpc_opt=module=github.com/tarantool/message-queue-ee/protocol \
 services/consumer.proto services/publisher.proto messages/message.proto

Примеры сценариев использования очереди

Клиент сервиса подписки на сообщения из очереди

Подготовьте исходный файл примера клиента сервиса подписки на очередь сообщений:

mkdir -p cmd/consumer
touch cmd/consumer/main.go

Содержание файла:

package main

func main() {

}

Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main, указав при этом адрес подключения к gRPC-интерфейсу очереди:

ctx := context.Background()
conn, err := grpc.DialContext(
	ctx,
	"localhost:18182",
	grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
	panic(err)
}
defer conn.Close()

Далее необходимо создать сам клиент сервиса подписки и отправить запрос на подписку на сообщения из очереди queue:

client := protocol.NewConsumerServiceClient(conn)
subscribe, err := client.Subscribe(ctx, &protocol.SubscriptionRequest{
	Queue: "queue",
})
if err != nil {
	panic(err)
}

fmt.Println("Start listening...")

После того как выполнено подключение, необходимо запустить обработку входящих сообщений. Для этого необходимо создать среду выполнения Go, которая будет получать сообщения из gRPC-потока и выводить краткое уведомление:

go func() {
	for {
		recv, err := subscribe.Recv()
		if err != nil {
			panic(err)
		}
		for _, notification := range recv.Notifications {
			cursor := notification.Cursor
			message := notification.Message
			fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n",
				message.Queue, cursor, message.Id, message.Payload)
		}
		fmt.Println()
	}
}()

Добавьте обработку системных сигналов:

notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
<-notifyCtx.Done()
fmt.Println("Exit")

Клиент сервиса подписки на сообщения создан.

Можно выполнить его запуск с помощью следующей команды:

$ go run example.com/mqee-clients/cmd/consumer
Start listening...

После этого клиент ожидает появления новых сообщений в очереди.

Клиент сервиса публикации сообщения в очередь

Публикация одного сообщения (Publish)

Создайте файл, который будет содержать исходный код клиента сервиса для публикации сообщений в очередь:

mkdir -p cmd/publisher
touch cmd/publisher/main.go

Первоначальное содержимое файла:

package main

func main() {

}

Сначала необходимо создать транспорт для работы gRPC-соединения. Добавьте этот транспорт в функцию main, указав при этом адрес подключения к gRPC-интерфейсу очереди:

func main() {
        // ...
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"localhost:18182",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	// ...
}

Далее необходимо создать сам клиент сервиса публикации для дальнейшей отправки сообщений в очередь:

client := protocol.NewPublisherServiceClient(conn)

Отправьте запрос на публикацию сообщения с помощью вызова метода Publish:

response, err := client.Publish(ctx, &protocol.PublishRequest{
	Queue:   "queue",
	Payload: []byte("Message published by 'Publish' method"),
})
if err != nil {
	panic(err)
}
fmt.Printf("Message published with id: %d\n", response.Id)

Теперь можно запустить клиент сервиса публикации, чтобы опубликовать сообщение в очередь:

$ go run example.com/mqee-clients/cmd/publisher
Message published with id: 115719

Если в другом окне терминала запущен клиент-подписчик (cmd/consumer/main.go), вы получите уведомление о сообщении:

Notification:
	Queue: queue
	Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQQ
	Id: 115728
	Payload: Message published by 'Publish' method

Публикация набора сообщений (PublishBatch)

Теперь реализуем отправку группы сообщений (Batch) с помощью одного запроса.

Для этого необходимо добавить в конце функции main клиента сервиса публикации сообщений следующий код:

responseBatch, err := client.PublishBatch(ctx, &protocol.PublishBatchRequest{
	Queue: "queue",
	Messages: []*protocol.BatchRequestMessage{
		{Payload: []byte("First message from batch")},
		{Payload: []byte("Second message from batch")},
	},
})
if err != nil {
	panic(err)
}
fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)

Теперь при запуске клиента будет отправлено три сообщения: одно с помощью вызова метода Publish, а второе и третье – с помощью вызова метода PublishBatch:

$ go run example.com/mqee-clients/cmd/publisher
Message published with id: 115719
Messages published with ids: [115720 115721]

Сервис подписки на сообщения очереди получит уведомления о новых сообщениях:

Notification:
	Queue: queue
	Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQQ
	Id: 115728
	Payload: Message published by 'Publish' method
Notification:
	Queue: queue
	Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQR
	Id: 115729
	Payload: First message from batch
Notification:
	Queue: queue
	Cursor: DX8EAQL/gAABDAEGAAAS/4AAAQlzdG9yYWdlLTH9AcQS
	Id: 115730
	Payload: Second message from batch

Дополнительно

FAQ

Q: Получаю ошибку о нехватке зависимостей no required module provides package, например,

protocol/consumer_grpc.pb.go:11:2: no required module provides package google.golang.org/grpc; to add it:
	go get google.golang.org/grpc
protocol/consumer_grpc.pb.go:12:2: no required module provides package google.golang.org/grpc/codes; to add it:
	go get google.golang.org/grpc/codes
protocol/consumer_grpc.pb.go:13:2: no required module provides package google.golang.org/grpc/status; to add it:
	go get google.golang.org/grpc/status
protocol/consumer.pb.go:10:2: no required module provides package google.golang.org/protobuf/reflect/protoreflect; to add it:
	go get google.golang.org/protobuf/reflect/protoreflect
protocol/consumer.pb.go:11:2: no required module provides package google.golang.org/protobuf/runtime/protoimpl; to add it:
	go get google.golang.org/protobuf/runtime/protoimpl
cmd/publisher/main.go:9:2: no required module provides package google.golang.org/grpc/credentials/insecure; to add it:
	go get google.golang.org/grpc/credentials/insecure

A: Выполните команду go mod tidy для установки всех зависимостей.

Файловая структура

.
├── bin
│   ├── protoc
│   ├── protoc-gen-go
│   └── protoc-gen-go-grpc
├── cmd
│   ├── consumer
│   │   └── main.go
│   └── publisher
│       └── main.go
├── go.mod
├── go.sum
├── protocol
│   ├── consumer.pb.go
│   ├── consumer_grpc.pb.go
│   ├── message.pb.go
│   ├── publisher.pb.go
│   └── publisher_grpc.pb.go
└── tmp
    ├── message-queue-ee
    │   └── include
    │       └── tarantool
    │           ├── message_queue_ee
    │           │   ├── messages
    │           │   │   └── message.proto
    │           │   └── services
    │           │       ├── consumer.proto
    │           │       └── publisher.proto
    │           └── queue_ee
    │               ├── messages
    │               │   └── message.proto
    │               └── services
    │                   ├── consumer.proto
    │                   └── publisher.proto
    └── protobuf-message-queue-ee-1.5.2.src.tar.gz

16 directories, 20 files

Листинги файлов

cmd/consumer/main.go

package main

import (
	"context"
	"fmt"
	"os/signal"
	"syscall"

	"example.com/mqee-clients/protocol"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"localhost:18182",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	client := protocol.NewConsumerServiceClient(conn)
	subscribe, err := client.Subscribe(ctx, &protocol.SubscriptionRequest{
		Queue: "queue",
	})
	if err != nil {
		panic(err)
	}

	fmt.Println("Start listening...")

	go func() {
		for {
			recv, err := subscribe.Recv()
			if err != nil {
				panic(err)
			}
			for _, notification := range recv.Notifications {
				cursor := notification.Cursor
				message := notification.Message
				fmt.Printf("Notification:\n\tQueue: %s\n\tCursor: %s\n\tId: %d\n\tPayload: %s\n",
					message.Queue, cursor, message.Id, message.Payload)
			}
			fmt.Println()
		}
	}()


	notifyCtx, _ := signal.NotifyContext(ctx, syscall.SIGINT, syscall.SIGTERM)
	<-notifyCtx.Done()
	fmt.Println("Exit")
}

cmd/publisher/main.go:

package main

import (
	"context"
	"fmt"

	"example.com/mqee-clients/protocol"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {
	ctx := context.Background()
	conn, err := grpc.DialContext(
		ctx,
		"localhost:18182",
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	client := protocol.NewPublisherServiceClient(conn)

	response, err := client.Publish(ctx, &protocol.PublishRequest{
		Queue:   "queue",
		Payload: []byte("Message published by 'Publish' method"),
	})
	if err != nil {
		panic(err)
	}
	fmt.Printf("Message published with id: %d\n", response.Id)

	responseBatch, err := client.PublishBatch(ctx, &protocol.PublishBatchRequest{
		Queue: "queue",
		Messages: []*protocol.BatchRequestMessage{
			{Payload: []byte("First message from batch")},
			{Payload: []byte("Second message from batch")},
		},
	})
	if err != nil {
		panic(err)
	}
	fmt.Printf("Messages published with ids: %v\n", responseBatch.Ids)
}
Found what you were looking for?
Feedback