No description
- Go 95.1%
- HTML 4.9%
|
|
||
|---|---|---|
| .vscode | ||
| .woodpecker | ||
| docs | ||
| example | ||
| mq | ||
| .gitignore | ||
| .woodpecker.yml | ||
| CI_SETUP.md | ||
| client.go | ||
| factory.go | ||
| factory_test.go | ||
| go.mod | ||
| go.sum | ||
| LICENSE | ||
| README.md | ||
| redis.coverage.out | ||
Агрегатор клиентов для различных серверов MQ
Единый интерфейс для работы с различными MQ-бэкендами: MQTT, AMQP, Redis Pub/Sub и internal chan.
Возможности
- ✅ Единый интерфейс для всех бэкендов
- ✅ Синхронный Receive() — полный контроль над обработкой
- ✅ Ручное подтверждение (Ack/Nack) для надёжной обработки
- ✅ Потокобезопасность — Publish и Receive можно вызывать из разных горутин
- ✅ Context support — отмена и таймауты через
context.Context - ✅ Авто-переподключение при разрыве соединения
- ✅ Chan-бэкенд для тестов и внутренней коммуникации без внешних зависимостей
Установка
go get git.ymnuktech.ru/ymnuk/go-mq-agregate
Быстрый старт
Подключение и базовое использование
package main
import (
"context"
"log"
"git.ymnuktech.ru/ymnuk/go-mq-agregate"
)
func main() {
ctx := context.Background()
// Создаём клиент (chan для примера)
client := mq.NewChanClient(mq.ClientOptions{
ChanBufferSize: 1000,
})
// Подключаемся
if err := client.Connect(ctx); err != nil {
log.Fatal(err)
}
defer client.Disconnect(ctx)
// Публикация сообщения
client.Publish(ctx, "my.queue", []byte("hello"))
// Получение сообщения
msg, err := client.Receive(ctx, "my.queue")
if err != nil {
log.Fatal(err)
}
// Обработка и подтверждение
process(msg.Payload)
msg.Ack()
}
func process(data []byte) {
// обработка данных
}
Паттерн: непрерывная обработка
for {
msg, err := client.Receive(ctx, "my.queue")
if err != nil {
if errors.Is(err, context.Canceled) {
break // graceful shutdown
}
log.Printf("Receive error: %v", err)
continue
}
if err := process(msg.Payload); err != nil {
msg.Nack() // вернуть в очередь при ошибке
continue
}
msg.Ack() // подтвердить обработку
}
Паттерн: многопоточная обработка
// Несколько воркеров обрабатывают одну очередь
for i := 0; i < 3; i++ {
go func(workerID int) {
for {
msg, err := client.Receive(ctx, "my.queue")
if err != nil {
return
}
process(msg.Payload)
msg.Ack()
}
}(i)
}
Использование с разными бэкендами
// Chan (в памяти, без внешних зависимостей)
client := mq.NewChanClient(mq.ClientOptions{
ChanBufferSize: 1000,
})
// MQTT
client := mq.NewMQTTClient(mq.ClientOptions{
Host: "localhost",
Port: 1883,
ClientID: "my-client",
})
// AMQP (RabbitMQ)
client := mq.NewAMQPClient(mq.ClientOptions{
Host: "localhost",
Port: 5672,
Username: "guest",
Password: "guest",
VHost: "/",
})
// Redis Pub/Sub
client := mq.NewRedisClient(mq.ClientOptions{
Host: "localhost",
Port: 6379,
Database: 0, // номер БД (по умолчанию 0)
})
// Через фабрику
client, err := mq.NewClient(mq.ClientTypeChan, mq.ClientOptions{})
Конфигурация
ClientOptions
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
Host |
string | - | Хост сервера |
Port |
int | - | Порт |
Username |
string | - | Пользователь |
Password |
string | - | Пароль |
VHost |
string | - | Виртуальный хост (AMQP) |
ClientID |
string | - | ID клиента (MQTT) |
QueueName |
string | - | Имя очереди |
ExchangeName |
string | - | Имя exchange (AMQP) |
Database |
int | 0 | Номер БД (Redis) |
ChanBufferSize |
int | 1000 | Размер буфера (chan) |
MaxPacketSize |
int | 32MB | Макс. размер сообщения |
ReconnectMax |
int | 5 | Попыток переподключения |
OnDisconnect |
func(error) | nil | Callback при разрыве |
Поддерживаемые бэкенды
| Бэкенд | Статус | Пакет |
|---|---|---|
| Chan | ✅ Готов | mq/chanclient |
| Redis Pub/Sub | ✅ Готов | mq/redis |
| MQTT | ✅ Готов | mq/mqtt |
| AMQP | 🚧 В разработке | mq/amqp |
Производительность (chan-бэкенд)
Бенчмарки на Intel Celeron N5105:
| Операция | Время | Память | Аллокации |
|---|---|---|---|
| Publish | 130 ns/op | 56 B/op | 0 allocs/op |
| Receive | 219 ns/op | 88 B/op | 2 allocs/op |
| Publish + Receive | 329 ns/op | 88 B/op | 2 allocs/op |
| Concurrent (4 workers) | 372 ns/op | 88 B/op | 2 allocs/op |
Тест с 1500 сообщениями по 1KB:
- Пропускная способность: ~3.9 млн msg/sec (~3800 MB/sec)
- Время чтения 1500 msg: ~0.4 мс
- Память при публикации: +1 MB (возвращается GC после чтения)
Разные размеры сообщений:
| Размер | Время/op | Память/op |
|---|---|---|
| 64 B – 16 KB | ~320 ns/op | 88 B/op |
Производительность не зависит от размера сообщения (данные не копируются).
# Запуск бенчмарков
go test ./mq/chanclient/... -bench=. -benchmem
Производительность (Redis Pub/Sub-бэкенд)
Бенчмарки на Intel Celeron N5105 (с miniredis):
| Операция | Время | Память | Аллокации | Примечание |
|---|---|---|---|---|
| Connect | ~264k ns/op | 131 KB/op | 408 | |
| Publish | ~26k ns/op | 505 B/op | 21 | |
| Publish (100 bytes) | ~26k ns/op | 697 B/op | 21 | |
| Publish (1 KB) | ~27k ns/op | 2.6 KB/op | 21 | |
| Publish (10 KB) | ~33k ns/op | 21 KB/op | 21 | |
| Publish Concurrent | ~27k ns/op | 504 B/op | 21 | Параллельно |
| Receive | ~29k ns/op | 841 B/op | 36 | ✅ С miniredis |
| IsConnected | ~25 ns/op | 0 B/op | 0 | |
| Disconnect | ~13k ns/op | 256 B/op | 3 |
Покрытие тестами: 91.7%
Примечания:
-
Бенчмарки с Receive теперь работают с miniredis благодаря долговременным подпискам Pub/Sub.
-
Особенности тестирования с miniredis:
- ✅ Unit-тесты работают полностью
- ✅ Бенчмарки с Pub/Sub работают с miniredis
- ❌
b.Log()нельзя вызывать из горутин (не потокобезопасен)
# Запуск всех бенчмарков Redis
go test ./mq/redis/... -bench=. -benchmem
# Запуск всех тестов
go test ./mq/redis/... -v
Производительность (MQTT-бэкенд)
Бенчмарки на Intel Celeron N5105 (с mochi-mqtt embedded server):
| Операция | Время | Память | Аллокации | Примечание |
|---|---|---|---|---|
| Connect | ~149k ns/op | 81 KB/op | 244 | |
| Publish | ~35k ns/op | 2.8 KB/op | 51 | |
| Publish (100 bytes) | ~35k ns/op | 3.1 KB/op | 51 | |
| Publish (1 KB) | ~37k ns/op | 6.2 KB/op | 52 | |
| Publish (10 KB) | ~47k ns/op | 35 KB/op | 52 | |
| Publish Concurrent | ~13k ns/op | 2.8 KB/op | 52 | Параллельно |
| Receive | ~42k ns/op | 7.4 KB/op | 96 | ✅ Работает |
| IsConnected | ~42 ns/op | 0 B/op | 0 | |
| Disconnect | ~149k ns/op | 81 KB/op | 244 |
Покрытие тестами: ~90%
Примечания:
-
Бенчмарки с Receive теперь работают благодаря долговременным подпискам.
-
Особенности тестирования с mochi-mqtt:
- ✅ Unit-тесты работают с embedded сервером
- ✅ Полная поддержка Pub/Sub
- ✅ Бенчмарки Receive работают
- ⚠️ Медленнее miniredis (реальное TCP-подключение)
- ⚠️ Уникальный ClientID для каждого клиента
# Запуск всех бенчмарков MQTT
go test ./mq/mqtt/... -bench=. -benchmem
# Запуск всех тестов
go test ./mq/mqtt/... -v
Тестирование
# Только юнит-тесты
go test ./... -short
# Все тесты (включая интеграционные)
go test ./... -v
# С покрытием
go test ./... -coverprofile=coverage.out
Лицензия
MIT