Fix channel in function worker
This commit is contained in:
parent
8e2cba9d09
commit
1dc19f9e72
|
@ -13,6 +13,11 @@ import (
|
|||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type ChanRes struct {
|
||||
res []byte
|
||||
err error
|
||||
}
|
||||
|
||||
type funcDesc struct {
|
||||
Name string
|
||||
F func(header *network.Header, payload []byte) (result []byte, err error)
|
||||
|
@ -24,8 +29,6 @@ type funcDesc struct {
|
|||
}
|
||||
|
||||
func (fDesc *funcDesc) worker() {
|
||||
chanErr := make(chan error)
|
||||
chanResult := make(chan []byte)
|
||||
for {
|
||||
data, ok := <-fDesc.inputChan
|
||||
if !ok {
|
||||
|
@ -53,29 +56,17 @@ func (fDesc *funcDesc) worker() {
|
|||
continue
|
||||
}
|
||||
// Вызов функции
|
||||
chanRes := make(chan *ChanRes)
|
||||
go func() {
|
||||
result, err := fDesc.F(pkg.Header, data.Data)
|
||||
if err != nil {
|
||||
chanErr <- err
|
||||
} else {
|
||||
chanResult <- result
|
||||
result, err := fDesc.F(pkg.Header, pkg.Payload)
|
||||
chanRes <- &ChanRes{
|
||||
res: result,
|
||||
err: err,
|
||||
}
|
||||
}()
|
||||
//var result []byte
|
||||
select {
|
||||
case result := <-chanResult:
|
||||
if pkg.Metadata.QueueCallback != "" {
|
||||
pkg.Payload = result
|
||||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||||
buff, err := proto.Marshal(&pkg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff)
|
||||
}
|
||||
continue
|
||||
case err = <-chanErr:
|
||||
if err != nil {
|
||||
case res := <-chanRes:
|
||||
if res.err != nil {
|
||||
if pkg.Metadata.QueueCallback != "" {
|
||||
pkg.ErrNo = 500
|
||||
pkg.Error = err.Error()
|
||||
|
@ -84,6 +75,16 @@ func (fDesc *funcDesc) worker() {
|
|||
}
|
||||
continue
|
||||
}
|
||||
if pkg.Metadata.QueueCallback != "" {
|
||||
pkg.Payload = res.res
|
||||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||||
buff, err := proto.Marshal(&pkg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff)
|
||||
}
|
||||
continue
|
||||
case <-time.After(fDesc.timeout):
|
||||
if err != nil {
|
||||
if pkg.Metadata.QueueCallback != "" {
|
||||
|
@ -387,16 +388,8 @@ func (serve *Serve) CallAsync(name string, header *network.Header, payload []byt
|
|||
|
||||
// Синхронный вызов функции
|
||||
func (serve *Serve) Call(name string, header *network.Header, payload []byte) (result []byte, err error) {
|
||||
/*chanErr := make(chan error)
|
||||
chanResult := make(chan []byte)*/
|
||||
type ChanRes struct {
|
||||
res []byte
|
||||
err error
|
||||
}
|
||||
chanRes := make(chan *ChanRes)
|
||||
serve.CallAsync(name, header, payload, func(err error, result []byte) {
|
||||
/*chanErr <- err
|
||||
chanResult <- result*/
|
||||
chanRes <- &ChanRes{
|
||||
res: result,
|
||||
err: err,
|
||||
|
|
Loading…
Reference in New Issue
Block a user