Add mutex for sync

This commit is contained in:
Александр Федорюк 2022-10-28 11:19:03 +03:00
parent 912062d11b
commit 5d23b322ec

View File

@ -4,6 +4,7 @@ import (
"crypto/rand"
"fmt"
"strings"
"sync"
"time"
"git.ymnuktech.ru/ymnuk/yt-function-sdk-go.git/network"
@ -97,6 +98,7 @@ func (fDesc *funcDesc) worker() {
}
}
// Сервер инстанса
type Serve struct {
NatsHost string
NatsPort string
@ -106,13 +108,15 @@ type Serve struct {
moduleName string
funcs map[string]funcDesc
//timeoutCallback time.Duration
queueCallback string
chanQueueCallback chan *nats.Msg
subj *nats.Subscription
callbackFuncs map[string]func(err error, result []byte)
accept map[string]map[string]bool
queueCallback string
chanQueueCallback chan *nats.Msg
subj *nats.Subscription
callbackFuncs map[string]func(err error, result []byte)
callbackFuncsMutex sync.Mutex
accept map[string]map[string]bool
}
// Создание нового инстанса сервера
func NewServe(host string, port string, projectName string, moduleName string, natsServ *nats.Conn) *Serve {
if strings.Trim(moduleName, " ") == "" {
moduleName = "default"
@ -148,6 +152,7 @@ func NewServe(host string, port string, projectName string, moduleName string, n
return serve
}
// Установка разрешающих правил выполнения функций
func (serve *Serve) AddAccept(ruleName string) bool {
return serve.AddAcceptBool(ruleName, true)
}
@ -193,6 +198,7 @@ func (serve *Serve) AddAcceptBool(ruleName string, allow bool) bool {
return true
}
// Запуск текущего инстанса сервера
func (serve *Serve) Run() (err error) {
if serve.nc == nil {
serve.nc, err = nats.Connect(fmt.Sprintf("nats://%s:%s", serve.NatsHost, serve.NatsPort))
@ -226,16 +232,21 @@ func (serve *Serve) worker() {
panic(err)
}
if f, ok := serve.callbackFuncs[payload.Metadata.CallID]; ok {
serve.callbackFuncsMutex.Lock()
delete(serve.callbackFuncs, payload.Metadata.CallID)
serve.callbackFuncsMutex.Unlock()
if payload.ErrNo != 0 {
err = fmt.Errorf(payload.Error)
}
f(err, payload.Payload)
delete(serve.callbackFuncs, payload.Metadata.CallID)
}
}
}
}
// Завершение текущего инстанса сервера
func (serve *Serve) Shutdown() {
for _, value := range serve.funcs {
value.subj.Unsubscribe()
@ -252,10 +263,12 @@ func (serve *Serve) Shutdown() {
}
// Регистрация функции в текущем инстансе
func (serve *Serve) RegisterFunction(funcName string, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) {
return serve.RegisterFunctionWithTimeout(funcName, time.Second*10, f)
}
// Регистрация функции с установленным таймаутом в текущем инстансе
func (serve *Serve) RegisterFunctionWithTimeout(funcName string, timeout time.Duration, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) {
if _, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)]; ok {
return fmt.Errorf(`restrict duplicate function name`)
@ -282,6 +295,7 @@ func (serve *Serve) RegisterFunctionWithTimeout(funcName string, timeout time.Du
return
}
// Проверка есть ли такая функция в текущем инстансе
func (serve *Serve) HasFunction(name string) bool {
_, ok := serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, name)]
return ok
@ -325,6 +339,7 @@ func (serve *Serve) acceptVerify(accept string) bool {
}
}
// Асинхронный вызов функции
func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, result []byte)) {
metadata := &network.Metadata{
PackageType: network.Metadata_TYPE_REQUEST,
@ -346,10 +361,13 @@ func (serve *Serve) CallAsync(name string, header *network.Header, payload []byt
if err != nil {
panic(err)
}
serve.callbackFuncsMutex.Lock()
serve.callbackFuncs[metadata.CallID] = f
serve.callbackFuncsMutex.Unlock()
serve.nc.Publish(name, buf)
}
// Синхронный вызов функции
func (serve *Serve) Call(name string, header *network.Header, payload []byte) (result []byte, err error) {
chanErr := make(chan error)
chanResult := make(chan []byte)