Add verify accept
This commit is contained in:
parent
b67132ce16
commit
68a03466c4
2
protobuf
2
protobuf
|
@ -1 +1 @@
|
|||
Subproject commit 7bdc3de09aae8a5b4c258c369e57c97121e9cbca
|
||||
Subproject commit 69dd6e26aabb1d0cc170d57e3bc2ff99082ffed9
|
162
ytfunction.go
162
ytfunction.go
|
@ -13,12 +13,13 @@ import (
|
|||
)
|
||||
|
||||
type funcDesc struct {
|
||||
Name string
|
||||
F func(header *network.Header, payload []byte) (result []byte, err error)
|
||||
inputChan chan *nats.Msg
|
||||
subj *nats.Subscription
|
||||
conn *nats.Conn
|
||||
timeout time.Duration
|
||||
Name string
|
||||
F func(header *network.Header, payload []byte) (result []byte, err error)
|
||||
inputChan chan *nats.Msg
|
||||
subj *nats.Subscription
|
||||
conn *nats.Conn
|
||||
timeout time.Duration
|
||||
acceptFunc func(access string) bool
|
||||
}
|
||||
|
||||
func (fDesc *funcDesc) worker() {
|
||||
|
@ -36,6 +37,21 @@ func (fDesc *funcDesc) worker() {
|
|||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// Проверка может ли вызывающая сторона вызвать функцию
|
||||
if !fDesc.acceptFunc(pkg.Metadata.CallFrom) {
|
||||
if pkg.Metadata.QueueCallback != "" {
|
||||
pkg.ErrNo = 403
|
||||
pkg.Error = "forbidden"
|
||||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||||
buff, err := proto.Marshal(&pkg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff)
|
||||
}
|
||||
continue
|
||||
}
|
||||
// Вызов функции
|
||||
go func() {
|
||||
result, err := fDesc.F(pkg.Header, data.Data)
|
||||
if err != nil {
|
||||
|
@ -47,28 +63,34 @@ func (fDesc *funcDesc) worker() {
|
|||
//var result []byte
|
||||
select {
|
||||
case result := <-chanResult:
|
||||
pkg.Payload = result
|
||||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||||
buff, err := proto.Marshal(&pkg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
if pkg.Metadata.QueueCallback != "" {
|
||||
pkg.Payload = result
|
||||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||||
buff, err := proto.Marshal(&pkg)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff)
|
||||
}
|
||||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff)
|
||||
continue
|
||||
case err = <-chanErr:
|
||||
if err != nil {
|
||||
pkg.ErrNo = 500
|
||||
pkg.Error = err.Error()
|
||||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte(err.Error()))
|
||||
if pkg.Metadata.QueueCallback != "" {
|
||||
pkg.ErrNo = 500
|
||||
pkg.Error = err.Error()
|
||||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte(err.Error()))
|
||||
}
|
||||
continue
|
||||
}
|
||||
case <-time.After(fDesc.timeout):
|
||||
if err != nil {
|
||||
pkg.ErrNo = 408
|
||||
pkg.Error = err.Error()
|
||||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte("function call timeout"))
|
||||
if pkg.Metadata.QueueCallback != "" {
|
||||
pkg.ErrNo = 408
|
||||
pkg.Error = err.Error()
|
||||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte("function call timeout"))
|
||||
}
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
@ -87,6 +109,7 @@ type Serve struct {
|
|||
chanQueueCallback chan *nats.Msg
|
||||
subj *nats.Subscription
|
||||
callbackFuncs map[string]func(err error, result []byte)
|
||||
accept map[string]map[string]bool
|
||||
}
|
||||
|
||||
func NewServe(host string, port string, projectName string, moduleName string) *Serve {
|
||||
|
@ -109,9 +132,62 @@ func NewServe(host string, port string, projectName string, moduleName string) *
|
|||
serve.moduleName = moduleName
|
||||
serve.funcs = make(map[string]funcDesc)
|
||||
|
||||
serve.accept = make(map[string]map[string]bool)
|
||||
serve.accept["system"] = make(map[string]bool)
|
||||
serve.accept["system"]["gateway"] = true
|
||||
serve.accept["system"]["security"] = true
|
||||
serve.accept["system"]["settings"] = true
|
||||
serve.accept["system"]["telegram"] = true
|
||||
//serve.accept[projectName][moduleName] = true
|
||||
|
||||
return serve
|
||||
}
|
||||
|
||||
func (serve *Serve) AddAccept(ruleName string) bool {
|
||||
return serve.AddAcceptBool(ruleName, true)
|
||||
}
|
||||
|
||||
// Установка разрешающих правил выполнения функций
|
||||
func (serve *Serve) AddAcceptBool(ruleName string, allow bool) bool {
|
||||
ruleName = strings.Trim(ruleName, " ")
|
||||
if ruleName == "" {
|
||||
return false
|
||||
}
|
||||
if ruleName == "*" {
|
||||
serve.accept = make(map[string]map[string]bool)
|
||||
serve.accept["*"] = make(map[string]bool)
|
||||
serve.accept["*"]["*"] = allow
|
||||
} else {
|
||||
rules := strings.Split(ruleName, ".")
|
||||
for i := range rules {
|
||||
rules[i] = strings.Trim(rules[i], " ")
|
||||
}
|
||||
switch len(rules) {
|
||||
case 0:
|
||||
return false
|
||||
case 1:
|
||||
serve.accept[rules[0]] = make(map[string]bool)
|
||||
serve.accept[rules[0]]["*"] = allow
|
||||
case 2:
|
||||
if rules[0] == "*" || rules[0] == "" {
|
||||
return false
|
||||
}
|
||||
if rules[1] == "" {
|
||||
return false
|
||||
}
|
||||
if _, ok := serve.accept[rules[0]]; !ok {
|
||||
serve.accept[rules[0]] = make(map[string]bool)
|
||||
}
|
||||
if rules[1] == "*" {
|
||||
serve.accept[rules[0]]["*"] = allow
|
||||
} else {
|
||||
serve.accept[rules[0]][rules[1]] = allow
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (serve *Serve) Run() (err error) {
|
||||
serve.nc, err = nats.Connect(fmt.Sprintf("nats://%s:%s", serve.NatsHost, serve.NatsPort))
|
||||
if err != nil {
|
||||
|
@ -185,6 +261,7 @@ func (serve *Serve) RegisterFunctionWithTimeout(funcName string, timeout time.Du
|
|||
fDesc.subj, err = serve.nc.ChanSubscribe(fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), fDesc.inputChan)
|
||||
fDesc.conn = serve.nc
|
||||
fDesc.timeout = timeout
|
||||
fDesc.acceptFunc = serve.acceptVerify
|
||||
go fDesc.worker()
|
||||
|
||||
serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName)] = fDesc
|
||||
|
@ -196,12 +273,51 @@ func (serve *Serve) HasFunction(name string) bool {
|
|||
return ok
|
||||
}
|
||||
|
||||
// Проверка возможности выполнения функции на основе установленных вызывающих сторон
|
||||
func (serve *Serve) acceptVerify(accept string) bool {
|
||||
|
||||
if _, ok := serve.accept["*"]; ok {
|
||||
return true
|
||||
} else {
|
||||
accept = strings.Trim(accept, " ")
|
||||
if accept == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
rules := strings.Split(accept, ".")
|
||||
for i := range rules {
|
||||
rules[i] = strings.Trim(rules[i], " ")
|
||||
}
|
||||
if len(rules) == 0 {
|
||||
return false
|
||||
}
|
||||
if val, ok := serve.accept[rules[0]]; !ok {
|
||||
return false
|
||||
} else {
|
||||
if _, ok := val["*"]; ok {
|
||||
return true
|
||||
} else {
|
||||
if len(rules) == 1 {
|
||||
return true
|
||||
}
|
||||
if v, ok := val[rules[1]]; !ok {
|
||||
return false
|
||||
} else {
|
||||
return v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, result []byte)) {
|
||||
metadata := &network.Metadata{
|
||||
PackageType: network.Metadata_TYPE_REQUEST,
|
||||
FuncName: name,
|
||||
QueueCallback: serve.queueCallback,
|
||||
CallID: uuid.NewV4().String(),
|
||||
CallFrom: fmt.Sprintf("%s.%s", serve.projectName, serve.moduleName),
|
||||
}
|
||||
pkg := network.Function{
|
||||
Header: header,
|
||||
|
@ -223,13 +339,7 @@ func (serve *Serve) Call(name string, header *network.Header, payload []byte) (r
|
|||
chanErr <- err
|
||||
chanResult <- result
|
||||
})
|
||||
/*select {
|
||||
case err = <-chanErr:
|
||||
}*/
|
||||
err = <-chanErr
|
||||
/*select {
|
||||
case result = <-chanResult:
|
||||
}*/
|
||||
result = <-chanResult
|
||||
|
||||
return
|
||||
|
|
|
@ -17,6 +17,7 @@ func setup() {
|
|||
if serve == nil {
|
||||
panic(fmt.Errorf(`serve is not created`))
|
||||
}
|
||||
serve.AddAccept("*")
|
||||
serve.Run()
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user