Fix response header

This commit is contained in:
ymnuk 2022-11-04 17:10:59 +03:00
parent 41967c4d0a
commit c2db3ee905
2 changed files with 27 additions and 24 deletions

View File

@ -14,8 +14,9 @@ import (
)
type ChanRes struct {
res []byte
err error
res []byte
header *network.Header
err error
}
type funcDesc struct {
@ -58,6 +59,9 @@ func (fDesc *funcDesc) worker() {
// Вызов функции
chanRes := make(chan *ChanRes)
go func() {
if pkg.Header.ResponseHeaders == nil {
pkg.Header.ResponseHeaders = make(map[string]string)
}
result, err := fDesc.F(pkg.Header, pkg.Payload)
chanRes <- &ChanRes{
res: result,
@ -112,7 +116,7 @@ type Serve struct {
queueCallback string
chanQueueCallback chan *nats.Msg
subj *nats.Subscription
callbackFuncs map[string]func(err error, result []byte)
callbackFuncs map[string]func(err error, header *network.Header, result []byte)
callbackFuncsMutex sync.Mutex
accept map[string]map[string]bool
}
@ -229,12 +233,10 @@ func (serve *Serve) Run() (err error) {
if err != nil {
return
}
serve.callbackFuncs = make(map[string]func(err error, result []byte))
serve.callbackFuncs = make(map[string]func(err error, header *network.Header, result []byte))
serve.queueCallback = fmt.Sprintf("%s.%s.%x", serve.projectName, serve.moduleName, rnd)
serve.chanQueueCallback = make(chan *nats.Msg)
serve.subj, err = serve.nc.ChanQueueSubscribe(serve.queueCallback, serve.projectName+"."+serve.moduleName, serve.chanQueueCallback)
//serve.subj, err = serve.nc.ChanSubscribe(serve.queueCallback, serve.chanQueueCallback)
//serve.subj.SetPendingLimits(1024*500, 1024*5000)
go serve.worker()
return
}
@ -257,7 +259,7 @@ func (serve *Serve) worker() {
if payload.ErrNo != 0 {
err = fmt.Errorf(payload.Error)
}
f(err, payload.Payload)
f(err, payload.Header, payload.Payload)
}
@ -305,7 +307,6 @@ func (serve *Serve) RegisterFunctionWithTimeout(funcName string, timeout time.Du
}
fDesc.inputChan = make(chan *nats.Msg)
fDesc.subj, err = serve.nc.ChanQueueSubscribe(fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), serve.projectName+"."+serve.moduleName, fDesc.inputChan)
//fDesc.subj, err = serve.nc.ChanSubscribe(fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), fDesc.inputChan)
fDesc.conn = serve.nc
fDesc.timeout = timeout
fDesc.acceptFunc = serve.acceptVerify
@ -360,7 +361,7 @@ func (serve *Serve) acceptVerify(accept string) bool {
}
// Асинхронный вызов функции
func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, result []byte)) {
func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, header *network.Header, result []byte)) {
metadata := &network.Metadata{
PackageType: network.Metadata_TYPE_REQUEST,
FuncName: name,
@ -390,10 +391,11 @@ func (serve *Serve) CallAsync(name string, header *network.Header, payload []byt
// Синхронный вызов функции
func (serve *Serve) Call(name string, header *network.Header, payload []byte) (result []byte, err error) {
chanRes := make(chan *ChanRes)
serve.CallAsync(name, header, payload, func(err error, result []byte) {
serve.CallAsync(name, header, payload, func(err error, header *network.Header, result []byte) {
chanRes <- &ChanRes{
res: result,
err: err,
res: result,
header: header,
err: err,
}
})
res := <-chanRes

View File

@ -32,7 +32,6 @@ func setup() {
}
func shutdown() {
// TODO
serve.Shutdown()
}
@ -55,28 +54,25 @@ func TestRegisterFunction(t *testing.T) {
}
func TestCallFunctionAsync(t *testing.T) {
//var err error
err := serve.RegisterFunction("testFuncSimpleAsync", func(header *network.Header, paylod []byte) (result []byte, err error) {
header.ResponseHeaders["test"] = "my test"
return []byte("world"), nil
})
if err != nil {
t.Error("Has error: ", err)
return
}
//var result []byte
/*chanErr := make(chan error)
chanResult := make(chan []byte)*/
type ChanRes struct {
res []byte
err error
res []byte
header *network.Header
err error
}
chanRes := make(chan ChanRes)
serve.CallAsync("default.test.testFuncSimpleAsync", nil, []byte("Hello"), func(err error, result []byte) {
/*chanErr <- err
chanResult <- result*/
serve.CallAsync("default.test.testFuncSimpleAsync", nil, []byte("Hello"), func(err error, header *network.Header, result []byte) {
chanRes <- ChanRes{
res: result,
err: err,
res: result,
header: header,
err: err,
}
})
res := <-chanRes
@ -88,6 +84,11 @@ func TestCallFunctionAsync(t *testing.T) {
t.Error("Function not returned \"world\"")
return
}
if v, ok := res.header.ResponseHeaders["test"]; !ok {
t.Errorf("not set response header")
} else if v != "my test" {
t.Errorf("got \"my test\", but expect \"%s\"", v)
}
}
func TestCallFunction(t *testing.T) {