Fix response

This commit is contained in:
Александр Федорюк 2022-10-28 13:46:43 +03:00
parent 5d23b322ec
commit a43583865d
3 changed files with 76 additions and 34 deletions

View File

@ -6,8 +6,8 @@ services:
- "4222:4222"
- "8222:8222"
#command: "--http_port 8222 "
networks: ["nats"]
networks: [ "nats" ]
networks:
nats:
name: nats
name: nats

View File

@ -198,10 +198,25 @@ func (serve *Serve) AddAcceptBool(ruleName string, allow bool) bool {
return true
}
func (serve *Serve) natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) {
fmt.Printf("error: %v\n", natsErr)
if natsErr == nats.ErrSlowConsumer {
pendingMsgs, _, err := sub.Pending()
if err != nil {
fmt.Printf("couldn't get pending messages: %v", err)
return
}
fmt.Printf("Falling behind with %d pending messages on subject %q.\n",
pendingMsgs, sub.Subject)
// Log error, notify operations...
}
// check for other errors
}
// Запуск текущего инстанса сервера
func (serve *Serve) Run() (err error) {
if serve.nc == nil {
serve.nc, err = nats.Connect(fmt.Sprintf("nats://%s:%s", serve.NatsHost, serve.NatsPort))
serve.nc, err = nats.Connect(fmt.Sprintf("nats://%s:%s", serve.NatsHost, serve.NatsPort), nats.ErrorHandler(serve.natsErrHandler))
if err != nil {
return
}
@ -214,8 +229,10 @@ func (serve *Serve) Run() (err error) {
}
serve.callbackFuncs = make(map[string]func(err error, result []byte))
serve.queueCallback = fmt.Sprintf("%s.%s.%x", serve.projectName, serve.moduleName, rnd)
fmt.Println(serve.queueCallback)
serve.chanQueueCallback = make(chan *nats.Msg)
serve.subj, err = serve.nc.ChanSubscribe(serve.queueCallback, serve.chanQueueCallback)
//serve.subj.SetPendingLimits(1024*500, 1024*5000)
go serve.worker()
return
}
@ -232,9 +249,9 @@ func (serve *Serve) worker() {
panic(err)
}
if f, ok := serve.callbackFuncs[payload.Metadata.CallID]; ok {
serve.callbackFuncsMutex.Lock()
//serve.callbackFuncsMutex.Lock()
delete(serve.callbackFuncs, payload.Metadata.CallID)
serve.callbackFuncsMutex.Unlock()
//serve.callbackFuncsMutex.Unlock()
if payload.ErrNo != 0 {
err = fmt.Errorf(payload.Error)
}
@ -361,22 +378,32 @@ func (serve *Serve) CallAsync(name string, header *network.Header, payload []byt
if err != nil {
panic(err)
}
serve.callbackFuncsMutex.Lock()
//serve.callbackFuncsMutex.Lock()
serve.callbackFuncs[metadata.CallID] = f
serve.callbackFuncsMutex.Unlock()
//defer serve.callbackFuncsMutex.Unlock()
serve.nc.Publish(name, buf)
}
// Синхронный вызов функции
func (serve *Serve) Call(name string, header *network.Header, payload []byte) (result []byte, err error) {
chanErr := make(chan error)
chanResult := make(chan []byte)
/*chanErr := make(chan error)
chanResult := make(chan []byte)*/
type ChanRes struct {
res []byte
err error
}
chanRes := make(chan ChanRes)
serve.CallAsync(name, header, []byte("Hello"), func(err error, result []byte) {
chanErr <- err
chanResult <- result
/*chanErr <- err
chanResult <- result*/
chanRes <- ChanRes{
res: result,
err: err,
}
})
err = <-chanErr
result = <-chanResult
res := <-chanRes
err = res.err
result = res.res
return
}

View File

@ -11,15 +11,24 @@ import (
var serve *Serve
func setup() {
host, _ := os.LookupEnv("NATS_HOST")
port, _ := os.LookupEnv("NATS_PORT")
host, exists := os.LookupEnv("NATS_HOST")
if !exists {
host = "localhost"
}
port, exists := os.LookupEnv("NATS_PORT")
if !exists {
host = "4222"
}
serve = NewServe(host, port, "", "test", nil)
if serve == nil {
panic(fmt.Errorf(`serve is not created`))
}
serve.AddAccept("*")
serve.Run()
err := serve.Run()
if err != nil {
panic(err)
}
}
func shutdown() {
@ -48,32 +57,38 @@ func TestRegisterFunction(t *testing.T) {
}
func TestCallFunctionAsync(t *testing.T) {
var err error
err = serve.RegisterFunction("testFuncSimpleAsync", func(header *network.Header, paylod []byte) (result []byte, err error) {
//var err error
err := serve.RegisterFunction("testFuncSimpleAsync", func(header *network.Header, paylod []byte) (result []byte, err error) {
return []byte("world"), nil
})
if err != nil {
t.Error("Has error: ", err)
return
}
var result []byte
chanErr := make(chan error)
chanResult := make(chan []byte)
//var result []byte
/*chanErr := make(chan error)
chanResult := make(chan []byte)*/
type ChanRes struct {
res []byte
err error
}
chanRes := make(chan ChanRes)
serve.CallAsync("default.test.testFuncSimpleAsync", nil, []byte("Hello"), func(err error, result []byte) {
chanErr <- err
chanResult <- result
/*chanErr <- err
chanResult <- result*/
chanRes <- ChanRes{
res: result,
err: err,
}
})
select {
case err = <-chanErr:
if err != nil {
t.Error(err)
return
}
case result = <-chanResult:
if string(result) != "world" {
t.Error("Function not returned \"world\"")
return
}
res := <-chanRes
if res.err != nil {
t.Error(err)
return
}
if string(res.res) != "world" {
t.Error("Function not returned \"world\"")
return
}
}