This commit is contained in:
Александр Федорюк 2022-11-22 09:19:42 +03:00
commit 9035160757
2 changed files with 107 additions and 54 deletions

View File

@ -14,8 +14,9 @@ import (
)
type ChanRes struct {
res []byte
err error
res []byte
header *network.Header
err error
}
type funcDesc struct {
@ -35,9 +36,9 @@ func (fDesc *funcDesc) worker() {
break
}
var err error
var pkg network.Function
pkg := &network.Function{}
err = proto.Unmarshal(data.Data, &pkg)
err = proto.Unmarshal(data.Data, pkg)
if err != nil {
panic(err)
}
@ -47,7 +48,7 @@ func (fDesc *funcDesc) worker() {
pkg.ErrNo = 403
pkg.Error = "forbidden"
pkg.Metadata.CallResponseID = uuid.NewV4().String()
buff, err := proto.Marshal(&pkg)
buff, err := proto.Marshal(pkg)
if err != nil {
panic(err)
}
@ -60,6 +61,9 @@ func (fDesc *funcDesc) worker() {
// Вызов функции
chanRes := make(chan *ChanRes)
go func() {
if pkg.Header.ResponseHeaders == nil {
pkg.Header.ResponseHeaders = make(map[string]string)
}
result, err := fDesc.F(pkg.Header, pkg.Payload)
chanRes <- &ChanRes{
res: result,
@ -80,7 +84,7 @@ func (fDesc *funcDesc) worker() {
if pkg.Metadata.QueueCallback != "" {
pkg.Payload = res.res
pkg.Metadata.CallResponseID = uuid.NewV4().String()
buff, err := proto.Marshal(&pkg)
buff, err := proto.Marshal(pkg)
if err != nil {
//panic(err)
continue
@ -104,26 +108,33 @@ func (fDesc *funcDesc) worker() {
}
}
type CallbackFunc struct {
F func(err error, header *network.Header, result []byte)
TimeCall time.Time
}
// Сервер инстанса
type Serve struct {
NatsHost string
NatsPort string
nc *nats.Conn
hasNats bool
projectName string
moduleName string
funcs map[string]funcDesc
//timeoutCallback time.Duration
queueCallback string
chanQueueCallback chan *nats.Msg
subj *nats.Subscription
callbackFuncs map[string]func(err error, result []byte)
//NatsHost string
//NatsPort string
NatsAddr string
nc *nats.Conn
hasNats bool
projectName string
moduleName string
funcs map[string]funcDesc
timeoutCallback time.Duration
queueCallback string
chanQueueCallback chan *nats.Msg
subj *nats.Subscription
//callbackFuncs map[string]func(err error, header *network.Header, result []byte)
callbackFuncs map[string]*CallbackFunc
callbackFuncsMutex sync.Mutex
accept map[string]map[string]bool
}
// Создание нового инстанса сервера
func NewServe(host string, port string, projectName string, moduleName string, natsServ *nats.Conn) *Serve {
func NewServe(addr string, projectName string, moduleName string, natsServ *nats.Conn) *Serve {
if strings.Trim(moduleName, " ") == "" {
moduleName = "default"
} else {
@ -140,8 +151,9 @@ func NewServe(host string, port string, projectName string, moduleName string, n
if natsServ != nil {
serve.nc = natsServ
} else {
serve.NatsHost = host
serve.NatsPort = port
/*serve.NatsHost = host
serve.NatsPort = port*/
serve.NatsAddr = addr
}
serve.projectName = projectName
serve.moduleName = moduleName
@ -155,6 +167,8 @@ func NewServe(host string, port string, projectName string, moduleName string, n
serve.accept["system"]["telegram"] = true
//serve.accept[projectName][moduleName] = true
serve.timeoutCallback = time.Second * 30
return serve
}
@ -222,7 +236,7 @@ func (serve *Serve) natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsEr
// Запуск текущего инстанса сервера
func (serve *Serve) Run() (err error) {
if serve.nc == nil {
serve.nc, err = nats.Connect(fmt.Sprintf("nats://%s:%s", serve.NatsHost, serve.NatsPort), nats.ErrorHandler(serve.natsErrHandler))
serve.nc, err = nats.Connect(serve.NatsAddr, nats.ErrorHandler(serve.natsErrHandler))
if err != nil {
return
}
@ -234,17 +248,35 @@ func (serve *Serve) Run() (err error) {
if err != nil {
return
}
serve.callbackFuncs = make(map[string]func(err error, result []byte))
//serve.callbackFuncs = make(map[string]func(err error, header *network.Header, result []byte))
serve.callbackFuncs = make(map[string]*CallbackFunc)
serve.queueCallback = fmt.Sprintf("%s.%s.%x", serve.projectName, serve.moduleName, rnd)
fmt.Println(serve.queueCallback)
serve.chanQueueCallback = make(chan *nats.Msg)
serve.subj, err = serve.nc.ChanSubscribe(serve.queueCallback, serve.chanQueueCallback)
//serve.subj.SetPendingLimits(1024*500, 1024*5000)
serve.subj, err = serve.nc.ChanQueueSubscribe(serve.queueCallback, serve.projectName+"."+serve.moduleName, serve.chanQueueCallback)
go serve.worker()
return
}
func (serve *Serve) cleanTimeout() {
serve.callbackFuncsMutex.Lock()
defer serve.callbackFuncsMutex.Unlock()
for k, v := range serve.callbackFuncs {
if time.Since(v.TimeCall) > serve.timeoutCallback {
delete(serve.callbackFuncs, k)
err := fmt.Errorf("call function is timed out")
go v.F(err, nil, nil)
}
}
}
func (serve *Serve) worker() {
go func() {
for {
<-time.After(serve.timeoutCallback)
serve.cleanTimeout()
}
}()
for {
if msg, ok := <-serve.chanQueueCallback; !ok {
break
@ -263,8 +295,11 @@ func (serve *Serve) worker() {
if payload.ErrNo != 0 {
err = fmt.Errorf(payload.Error)
}
f(err, payload.Payload)
if f != nil {
//go f.F(err, payload.Header, payload.Payload)
f.F(err, payload.Header, payload.Payload)
}
//f(err, payload.Header, payload.Payload)
}
}
@ -310,7 +345,7 @@ func (serve *Serve) RegisterFunctionWithTimeout(funcName string, timeout time.Du
F: f,
}
fDesc.inputChan = make(chan *nats.Msg)
fDesc.subj, err = serve.nc.ChanSubscribe(fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), fDesc.inputChan)
fDesc.subj, err = serve.nc.ChanQueueSubscribe(fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), serve.projectName+"."+serve.moduleName, fDesc.inputChan)
fDesc.conn = serve.nc
fDesc.timeout = timeout
fDesc.acceptFunc = serve.acceptVerify
@ -365,7 +400,7 @@ func (serve *Serve) acceptVerify(accept string) bool {
}
// Асинхронный вызов функции
func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, result []byte)) {
func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, header *network.Header, result []byte)) {
metadata := &network.Metadata{
PackageType: network.Metadata_TYPE_REQUEST,
FuncName: name,
@ -386,23 +421,31 @@ func (serve *Serve) CallAsync(name string, header *network.Header, payload []byt
if err != nil {
panic(err)
}
callback := &CallbackFunc{
F: f,
TimeCall: time.Now(),
}
serve.callbackFuncsMutex.Lock()
defer serve.callbackFuncsMutex.Unlock()
serve.callbackFuncs[metadata.CallID] = f
serve.callbackFuncs[metadata.CallID] = callback
//serve.callbackFuncs[metadata.CallID] = f
serve.nc.Publish(name, buf)
}
// Синхронный вызов функции
func (serve *Serve) Call(name string, header *network.Header, payload []byte) (result []byte, err error) {
func (serve *Serve) Call(name string, header *network.Header, payload []byte) (result []byte, head *network.Header, err error) {
chanRes := make(chan *ChanRes)
serve.CallAsync(name, header, payload, func(err error, result []byte) {
serve.CallAsync(name, header, payload, func(err error, header *network.Header, result []byte) {
chanRes <- &ChanRes{
res: result,
err: err,
res: result,
header: header,
err: err,
}
})
res := <-chanRes
err = res.err
head = res.header
result = res.res
return

View File

@ -11,16 +11,20 @@ import (
var serve *Serve
func setup() {
host, exists := os.LookupEnv("NATS_HOST")
/*host, exists := os.LookupEnv("NATS_HOST")
if !exists {
host = "localhost"
}*/
addr, exists := os.LookupEnv("NATS_ADDR")
if !exists {
addr = "nats://localhost:4222"
}
port, exists := os.LookupEnv("NATS_PORT")
/*port, exists := os.LookupEnv("NATS_PORT")
if !exists {
port = "4222"
}
}*/
serve = NewServe(host, port, "", "test", nil)
serve = NewServe(addr, "", "test", nil)
if serve == nil {
panic(fmt.Errorf(`serve is not created`))
}
@ -32,12 +36,10 @@ func setup() {
}
func shutdown() {
// TODO
serve.Shutdown()
}
func TestMain(m *testing.M) {
fmt.Println("Main")
setup()
code := m.Run()
shutdown()
@ -46,7 +48,6 @@ func TestMain(m *testing.M) {
func TestRegisterFunction(t *testing.T) {
err := serve.RegisterFunction("testFunc", func(header *network.Header, paylod []byte) (result []byte, err error) {
fmt.Println(`Hello world`)
return []byte("Hello world"), nil
})
if err != nil {
@ -57,28 +58,25 @@ func TestRegisterFunction(t *testing.T) {
}
func TestCallFunctionAsync(t *testing.T) {
//var err error
err := serve.RegisterFunction("testFuncSimpleAsync", func(header *network.Header, paylod []byte) (result []byte, err error) {
header.ResponseHeaders["test"] = "my test"
return []byte("world"), nil
})
if err != nil {
t.Error("Has error: ", err)
return
}
//var result []byte
/*chanErr := make(chan error)
chanResult := make(chan []byte)*/
type ChanRes struct {
res []byte
err error
res []byte
header *network.Header
err error
}
chanRes := make(chan ChanRes)
serve.CallAsync("default.test.testFuncSimpleAsync", nil, []byte("Hello"), func(err error, result []byte) {
/*chanErr <- err
chanResult <- result*/
serve.CallAsync("default.test.testFuncSimpleAsync", nil, []byte("Hello"), func(err error, header *network.Header, result []byte) {
chanRes <- ChanRes{
res: result,
err: err,
res: result,
header: header,
err: err,
}
})
res := <-chanRes
@ -90,11 +88,17 @@ func TestCallFunctionAsync(t *testing.T) {
t.Error("Function not returned \"world\"")
return
}
if v, ok := res.header.ResponseHeaders["test"]; !ok {
t.Errorf("not set response header")
} else if v != "my test" {
t.Errorf("got \"my test\", but expect \"%s\"", v)
}
}
func TestCallFunction(t *testing.T) {
var err error
err = serve.RegisterFunction("testFuncSimple", func(header *network.Header, paylod []byte) (result []byte, err error) {
header.ResponseHeaders["test"] = "my test"
return []byte("world"), nil
})
if err != nil {
@ -102,7 +106,8 @@ func TestCallFunction(t *testing.T) {
return
}
var result []byte
result, err = serve.Call("default.test.testFuncSimple", nil, []byte("Hello"))
var header *network.Header
result, header, err = serve.Call("default.test.testFuncSimple", nil, []byte("Hello"))
if err != nil {
t.Error(err)
return
@ -111,4 +116,9 @@ func TestCallFunction(t *testing.T) {
t.Error("Function not returned \"world\"")
return
}
if v, ok := header.ResponseHeaders["test"]; !ok {
t.Errorf("not set response header")
} else if v != "my test" {
t.Errorf("got \"my test\", but expect \"%s\"", v)
}
}