go-mq-rpc
RPC-фреймворк поверх Message Queue на базе go-mq-agregate.
Быстрый старт
go get gti.ymnuktech.ru/ymnuk/go-mq-rpc
package main
import (
"context"
"log"
"time"
mqaggr "git.ymnuktech.ru/ymnuk/go-mq-agregate"
jsoncodec "gti.ymnuktech.ru/ymnuk/go-mq-rpc/codec/json"
"gti.ymnuktech.ru/ymnuk/go-mq-rpc/rpc"
mqtransport "gti.ymnuktech.ru/ymnuk/go-mq-rpc/transport/mq"
)
func main() {
ctx := context.Background()
// Транспорт — один Chan для клиента и сервера (monolith pattern)
chanClient := mqaggr.NewChanClient(mqaggr.ClientOptions{
ChanBufferSize: 1000,
})
tr, _ := mqtransport.NewMQTransport(&mqtransport.Config{
RequestQueue: "rpc_queue",
Client: chanClient,
})
defer tr.Close()
// Сервер
srv, _ := rpc.NewServer(&rpc.ServerConfig{
Transport: tr,
Codec: &jsoncodec.Codec{},
RequestQueue: "rpc_queue",
RecoverPanic: true,
})
srv.Register("Math.Add", func(ctx context.Context, body []byte) ([]byte, error) {
var args struct{ A, B int }
jsoncodec.Unmarshal(body, &args)
return jsoncodec.Marshal(struct{ Sum int }{Sum: args.A + args.B})
})
go func() { _ = srv.Serve(ctx) }()
// Клиент
cli, _ := rpc.NewClient(&rpc.ClientConfig{
Transport: tr,
Codec: &jsoncodec.Codec{},
RequestQueue: "rpc_queue",
Timeout: 3 * time.Second,
})
defer cli.Close()
var result struct{ Sum int }
cli.Call(ctx, "Math.Add", struct{ A, B int }{A: 3, B: 4}, &result)
// result.Sum == 7
}
Особенности
| Возможность |
Описание |
| Shared Transport |
Один MQTransport обслуживает и клиент, и сервер |
| Metrics |
rpc.MetricsCollector — хуки без внешних зависимостей |
| Middleware |
rpc.Middleware — цепочка обёрток вокруг хендлеров |
| Panic Recovery |
Паники в хендлерах и middleware перехватываются |
| Timeouts |
Call() с таймаутом по умолчанию, CallWithTimeout() для override |
| Body Encoding |
Base85 (~20% overhead) или Base64 |
| Correlation ID |
UUID v7 (RFC 9562, сортируемый) |
| Backends |
Chan (in-memory), Redis, MQTT — через go-mq-agregate |
Структура
go-mq-rpc/
├── rpc/ # RPC-ядро: Client, Server, интерфейсы
│ ├── client.go # RPC-клиент с Call(), CallWithTimeout()
│ ├── server.go # RPC-сервер с Register(), Serve(), Stop()
│ ├── interfaces.go # Codec, Transport, HandlerFunc
│ ├── error.go # Типы ошибок RPC
│ ├── message.go # Message, Header
│ ├── metrics_test.go # Тесты MetricsCollector
│ ├── middleware_test.go # Тесты Middleware
│ ├── integration_test.go # Интеграционные тесты
│ ├── e2e_chan_test.go # E2E тесты
│ └── example_test.go # Примеры как тесты
├── codec/json/ # JSON-кодек
├── transport/mq/ # MQ-транспорт (go-mq-agregate)
├── internal/ # Внутренние утилиты
│ ├── bodyenc/ # Base85/Base64 кодирование
│ ├── correlation/ # UUID v7 генерация
│ └── callback/ # Callback queue имена
├── examples/
│ ├── basic/ # Базовое использование
│ ├── middleware/ # Middleware + MetricsCollector
│ ├── fibonacci/ # Вычисление чисел Фибоначчи + сравнение с локальным
│ └── mandelbrot/ # Множество Мандельброта (800x600) + PNG + бенчмарк
└── docs/ # Подробная документация
Статус
| Пакет |
Покрытие |
codec/json |
100.0% |
transport/mq |
88.5% |
rpc |
86.6% |
bodyenc |
83.8% |
callback |
80.0% |
correlation |
62.5% |
| Total |
85.8% |
Бенчмарки
BenchmarkClient_Call-4 10690 100µs/op 125KB/op 1753 allocs/op
BenchmarkMiddleware_Overhead-4 10471 137µs/op 175KB/op 2446 allocs/op
Документация
Запуск тестов
go test ./... # все тесты
go test -cover ./... # с покрытием
go test -bench=. -benchmem ./rpc/... # бенчмарки
Зависимости