No description
  • Go 95.1%
  • HTML 4.9%
Find a file
ymnuk b831e0e5fd
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Убрали лишнюю проверку
2026-03-21 21:42:17 +03:00
.vscode Исправлены Receive для MQTT и Redis. Вот и разобрались почему бенчмарки падали. 2026-03-21 18:40:10 +03:00
.woodpecker Исправление шаблона письма 2026-03-21 21:13:10 +03:00
docs Настройка Woodpecker-CI 2026-03-21 19:37:12 +03:00
example Бенчмарки 2026-03-21 10:47:50 +03:00
mq Убрали лишнюю проверку 2026-03-21 21:42:17 +03:00
.gitignore Настройка Woodpecker-CI 2026-03-21 19:37:12 +03:00
.woodpecker.yml Покрытие тестами 2026-03-21 21:29:09 +03:00
CI_SETUP.md Настройка Woodpecker-CI 2026-03-21 19:37:12 +03:00
client.go Реализация интерфейсов и chanclient 2026-03-20 14:30:51 +03:00
factory.go И снова too slow теперь для MQTT. Нет, надо будет разбраться что за проблема... 2026-03-21 13:02:14 +03:00
factory_test.go Покрытие тестами 2026-03-21 21:29:09 +03:00
go.mod И снова too slow теперь для MQTT. Нет, надо будет разбраться что за проблема... 2026-03-21 13:02:14 +03:00
go.sum И снова too slow теперь для MQTT. Нет, надо будет разбраться что за проблема... 2026-03-21 13:02:14 +03:00
LICENSE first commit 2026-03-20 13:13:50 +03:00
README.md Добавлен Database для Redis 2026-03-21 18:47:21 +03:00
redis.coverage.out Реализация redis 2026-03-21 11:31:53 +03:00

Агрегатор клиентов для различных серверов MQ

Go Reference Go Report Card

Единый интерфейс для работы с различными MQ-бэкендами: MQTT, AMQP, Redis Pub/Sub и internal chan.

Возможности

  • Единый интерфейс для всех бэкендов
  • Синхронный Receive() — полный контроль над обработкой
  • Ручное подтверждение (Ack/Nack) для надёжной обработки
  • Потокобезопасность — Publish и Receive можно вызывать из разных горутин
  • Context support — отмена и таймауты через context.Context
  • Авто-переподключение при разрыве соединения
  • Chan-бэкенд для тестов и внутренней коммуникации без внешних зависимостей

Установка

go get git.ymnuktech.ru/ymnuk/go-mq-agregate

Быстрый старт

Подключение и базовое использование

package main

import (
    "context"
    "log"
    
    "git.ymnuktech.ru/ymnuk/go-mq-agregate"
)

func main() {
    ctx := context.Background()
    
    // Создаём клиент (chan для примера)
    client := mq.NewChanClient(mq.ClientOptions{
        ChanBufferSize: 1000,
    })
    
    // Подключаемся
    if err := client.Connect(ctx); err != nil {
        log.Fatal(err)
    }
    defer client.Disconnect(ctx)
    
    // Публикация сообщения
    client.Publish(ctx, "my.queue", []byte("hello"))
    
    // Получение сообщения
    msg, err := client.Receive(ctx, "my.queue")
    if err != nil {
        log.Fatal(err)
    }
    
    // Обработка и подтверждение
    process(msg.Payload)
    msg.Ack()
}

func process(data []byte) {
    // обработка данных
}

Паттерн: непрерывная обработка

for {
    msg, err := client.Receive(ctx, "my.queue")
    if err != nil {
        if errors.Is(err, context.Canceled) {
            break // graceful shutdown
        }
        log.Printf("Receive error: %v", err)
        continue
    }
    
    if err := process(msg.Payload); err != nil {
        msg.Nack() // вернуть в очередь при ошибке
        continue
    }
    
    msg.Ack() // подтвердить обработку
}

Паттерн: многопоточная обработка

// Несколько воркеров обрабатывают одну очередь
for i := 0; i < 3; i++ {
    go func(workerID int) {
        for {
            msg, err := client.Receive(ctx, "my.queue")
            if err != nil {
                return
            }
            process(msg.Payload)
            msg.Ack()
        }
    }(i)
}

Использование с разными бэкендами

// Chan (в памяти, без внешних зависимостей)
client := mq.NewChanClient(mq.ClientOptions{
    ChanBufferSize: 1000,
})

// MQTT
client := mq.NewMQTTClient(mq.ClientOptions{
    Host:     "localhost",
    Port:     1883,
    ClientID: "my-client",
})

// AMQP (RabbitMQ)
client := mq.NewAMQPClient(mq.ClientOptions{
    Host:     "localhost",
    Port:     5672,
    Username: "guest",
    Password: "guest",
    VHost:    "/",
})

// Redis Pub/Sub
client := mq.NewRedisClient(mq.ClientOptions{
    Host:     "localhost",
    Port:     6379,
    Database: 0, // номер БД (по умолчанию 0)
})

// Через фабрику
client, err := mq.NewClient(mq.ClientTypeChan, mq.ClientOptions{})

Конфигурация

ClientOptions

Параметр Тип По умолчанию Описание
Host string - Хост сервера
Port int - Порт
Username string - Пользователь
Password string - Пароль
VHost string - Виртуальный хост (AMQP)
ClientID string - ID клиента (MQTT)
QueueName string - Имя очереди
ExchangeName string - Имя exchange (AMQP)
Database int 0 Номер БД (Redis)
ChanBufferSize int 1000 Размер буфера (chan)
MaxPacketSize int 32MB Макс. размер сообщения
ReconnectMax int 5 Попыток переподключения
OnDisconnect func(error) nil Callback при разрыве

Поддерживаемые бэкенды

Бэкенд Статус Пакет
Chan Готов mq/chanclient
Redis Pub/Sub Готов mq/redis
MQTT Готов mq/mqtt
AMQP 🚧 В разработке mq/amqp

Производительность (chan-бэкенд)

Бенчмарки на Intel Celeron N5105:

Операция Время Память Аллокации
Publish 130 ns/op 56 B/op 0 allocs/op
Receive 219 ns/op 88 B/op 2 allocs/op
Publish + Receive 329 ns/op 88 B/op 2 allocs/op
Concurrent (4 workers) 372 ns/op 88 B/op 2 allocs/op

Тест с 1500 сообщениями по 1KB:

  • Пропускная способность: ~3.9 млн msg/sec (~3800 MB/sec)
  • Время чтения 1500 msg: ~0.4 мс
  • Память при публикации: +1 MB (возвращается GC после чтения)

Разные размеры сообщений:

Размер Время/op Память/op
64 B 16 KB ~320 ns/op 88 B/op

Производительность не зависит от размера сообщения (данные не копируются).

# Запуск бенчмарков
go test ./mq/chanclient/... -bench=. -benchmem

Производительность (Redis Pub/Sub-бэкенд)

Бенчмарки на Intel Celeron N5105 (с miniredis):

Операция Время Память Аллокации Примечание
Connect ~264k ns/op 131 KB/op 408
Publish ~26k ns/op 505 B/op 21
Publish (100 bytes) ~26k ns/op 697 B/op 21
Publish (1 KB) ~27k ns/op 2.6 KB/op 21
Publish (10 KB) ~33k ns/op 21 KB/op 21
Publish Concurrent ~27k ns/op 504 B/op 21 Параллельно
Receive ~29k ns/op 841 B/op 36 С miniredis
IsConnected ~25 ns/op 0 B/op 0
Disconnect ~13k ns/op 256 B/op 3

Покрытие тестами: 91.7%

Примечания:

  1. Бенчмарки с Receive теперь работают с miniredis благодаря долговременным подпискам Pub/Sub.

  2. Особенности тестирования с miniredis:

    • Unit-тесты работают полностью
    • Бенчмарки с Pub/Sub работают с miniredis
    • b.Log() нельзя вызывать из горутин (не потокобезопасен)
# Запуск всех бенчмарков Redis
go test ./mq/redis/... -bench=. -benchmem

# Запуск всех тестов
go test ./mq/redis/... -v

Производительность (MQTT-бэкенд)

Бенчмарки на Intel Celeron N5105 (с mochi-mqtt embedded server):

Операция Время Память Аллокации Примечание
Connect ~149k ns/op 81 KB/op 244
Publish ~35k ns/op 2.8 KB/op 51
Publish (100 bytes) ~35k ns/op 3.1 KB/op 51
Publish (1 KB) ~37k ns/op 6.2 KB/op 52
Publish (10 KB) ~47k ns/op 35 KB/op 52
Publish Concurrent ~13k ns/op 2.8 KB/op 52 Параллельно
Receive ~42k ns/op 7.4 KB/op 96 Работает
IsConnected ~42 ns/op 0 B/op 0
Disconnect ~149k ns/op 81 KB/op 244

Покрытие тестами: ~90%

Примечания:

  1. Бенчмарки с Receive теперь работают благодаря долговременным подпискам.

  2. Особенности тестирования с mochi-mqtt:

    • Unit-тесты работают с embedded сервером
    • Полная поддержка Pub/Sub
    • Бенчмарки Receive работают
    • ⚠️ Медленнее miniredis (реальное TCP-подключение)
    • ⚠️ Уникальный ClientID для каждого клиента
# Запуск всех бенчмарков MQTT
go test ./mq/mqtt/... -bench=. -benchmem

# Запуск всех тестов
go test ./mq/mqtt/... -v

Тестирование

# Только юнит-тесты
go test ./... -short

# Все тесты (включая интеграционные)
go test ./... -v

# С покрытием
go test ./... -coverprofile=coverage.out

Лицензия

MIT