Add control timeout call function

This commit is contained in:
ymnuk 2022-11-05 12:41:03 +03:00
parent 724bef43b7
commit b8383192ae

View File

@ -103,20 +103,26 @@ func (fDesc *funcDesc) worker() {
}
}
type CallbackFunc struct {
F func(err error, header *network.Header, result []byte)
TimeCall time.Time
}
// Сервер инстанса
type Serve struct {
NatsHost string
NatsPort string
nc *nats.Conn
hasNats bool
projectName string
moduleName string
funcs map[string]funcDesc
//timeoutCallback time.Duration
queueCallback string
chanQueueCallback chan *nats.Msg
subj *nats.Subscription
callbackFuncs map[string]func(err error, header *network.Header, result []byte)
NatsHost string
NatsPort string
nc *nats.Conn
hasNats bool
projectName string
moduleName string
funcs map[string]funcDesc
timeoutCallback time.Duration
queueCallback string
chanQueueCallback chan *nats.Msg
subj *nats.Subscription
//callbackFuncs map[string]func(err error, header *network.Header, result []byte)
callbackFuncs map[string]*CallbackFunc
callbackFuncsMutex sync.Mutex
accept map[string]map[string]bool
}
@ -154,6 +160,8 @@ func NewServe(host string, port string, projectName string, moduleName string, n
serve.accept["system"]["telegram"] = true
//serve.accept[projectName][moduleName] = true
serve.timeoutCallback = time.Second * 30
return serve
}
@ -233,7 +241,8 @@ func (serve *Serve) Run() (err error) {
if err != nil {
return
}
serve.callbackFuncs = make(map[string]func(err error, header *network.Header, result []byte))
//serve.callbackFuncs = make(map[string]func(err error, header *network.Header, result []byte))
serve.callbackFuncs = make(map[string]*CallbackFunc)
serve.queueCallback = fmt.Sprintf("%s.%s.%x", serve.projectName, serve.moduleName, rnd)
serve.chanQueueCallback = make(chan *nats.Msg)
serve.subj, err = serve.nc.ChanQueueSubscribe(serve.queueCallback, serve.projectName+"."+serve.moduleName, serve.chanQueueCallback)
@ -241,7 +250,26 @@ func (serve *Serve) Run() (err error) {
return
}
func (serve *Serve) cleanTimeout() {
serve.callbackFuncsMutex.Lock()
defer serve.callbackFuncsMutex.Unlock()
for k, v := range serve.callbackFuncs {
if time.Since(v.TimeCall) > serve.timeoutCallback {
delete(serve.callbackFuncs, k)
err := fmt.Errorf("call function is timed out")
go v.F(err, nil, nil)
}
}
}
func (serve *Serve) worker() {
go func() {
for {
<-time.After(serve.timeoutCallback)
serve.cleanTimeout()
}
}()
for {
if msg, ok := <-serve.chanQueueCallback; !ok {
break
@ -259,8 +287,10 @@ func (serve *Serve) worker() {
if payload.ErrNo != 0 {
err = fmt.Errorf(payload.Error)
}
f(err, payload.Header, payload.Payload)
if f != nil {
go f.F(err, payload.Header, payload.Payload)
}
//f(err, payload.Header, payload.Payload)
}
}
@ -382,9 +412,15 @@ func (serve *Serve) CallAsync(name string, header *network.Header, payload []byt
if err != nil {
panic(err)
}
callback := &CallbackFunc{
F: f,
TimeCall: time.Now(),
}
serve.callbackFuncsMutex.Lock()
defer serve.callbackFuncsMutex.Unlock()
serve.callbackFuncs[metadata.CallID] = f
serve.callbackFuncs[metadata.CallID] = callback
//serve.callbackFuncs[metadata.CallID] = f
serve.nc.Publish(name, buf)
}