549 lines
15 KiB
Go
549 lines
15 KiB
Go
package ytfunction
|
||
|
||
import (
|
||
"crypto/rand"
|
||
"fmt"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"git.ymnuktech.ru/ymnuk/yt-function-sdk-go/network"
|
||
ytlogger "git.ymnuktech.ru/ymnuk/yt-logger-go"
|
||
"git.ymnuktech.ru/ymnuk/yt-logger-go/log"
|
||
"github.com/nats-io/nats.go"
|
||
uuid "github.com/satori/go.uuid"
|
||
"google.golang.org/protobuf/proto"
|
||
)
|
||
|
||
type ChanRes struct {
|
||
res []byte
|
||
header *network.Header
|
||
err error
|
||
}
|
||
|
||
type funcDesc struct {
|
||
Name string
|
||
F func(header *network.Header, payload []byte) (result []byte, err error)
|
||
inputChan chan *nats.Msg
|
||
subj *nats.Subscription
|
||
conn *nats.Conn
|
||
timeout time.Duration
|
||
acceptFunc func(access string) bool
|
||
logger *ytlogger.YTLogger
|
||
}
|
||
|
||
func (fDesc *funcDesc) worker() {
|
||
for {
|
||
data, ok := <-fDesc.inputChan
|
||
if !ok {
|
||
break
|
||
}
|
||
var err error
|
||
pkg := &network.Function{}
|
||
|
||
err = proto.Unmarshal(data.Data, pkg)
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
timeStart := time.Now()
|
||
// Проверка может ли вызывающая сторона вызвать функцию
|
||
if !fDesc.acceptFunc(pkg.Metadata.CallFrom) {
|
||
if pkg.Metadata.QueueCallback != "" {
|
||
pkg.ErrNo = 403
|
||
pkg.Error = "forbidden"
|
||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||
buff, err := proto.Marshal(pkg)
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
if fDesc.logger != nil {
|
||
fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil)
|
||
}
|
||
if err = fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff); err != nil {
|
||
continue
|
||
}
|
||
} else {
|
||
if fDesc.logger != nil {
|
||
fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil)
|
||
}
|
||
}
|
||
continue
|
||
}
|
||
// Вызов функции
|
||
chanRes := make(chan *ChanRes)
|
||
go func() {
|
||
if pkg.Header.ResponseHeaders == nil {
|
||
pkg.Header.ResponseHeaders = make(map[string]string)
|
||
}
|
||
result, err := fDesc.F(pkg.Header, pkg.Payload)
|
||
chanRes <- &ChanRes{
|
||
res: result,
|
||
err: err,
|
||
}
|
||
}()
|
||
select {
|
||
case res := <-chanRes:
|
||
if res.err != nil {
|
||
if pkg.Metadata.QueueCallback != "" {
|
||
pkg.ErrNo = 500
|
||
//if err != nil {
|
||
pkg.Error = res.err.Error()
|
||
/*} else {
|
||
pkg.Error = ""
|
||
}*/
|
||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||
if fDesc.logger != nil {
|
||
fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil)
|
||
}
|
||
buff, err := proto.Marshal(pkg)
|
||
if err != nil {
|
||
//panic(err)
|
||
continue
|
||
}
|
||
//fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte(pkg.Error))
|
||
fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff)
|
||
}
|
||
continue
|
||
}
|
||
if pkg.Metadata.QueueCallback != "" {
|
||
pkg.Payload = res.res
|
||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||
if fDesc.logger != nil {
|
||
fDesc.logger.Info(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, &[]int64{int64(len(pkg.Payload))}[0], nil)
|
||
}
|
||
buff, err := proto.Marshal(pkg)
|
||
if err != nil {
|
||
//panic(err)
|
||
continue
|
||
}
|
||
if err = fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff); err != nil {
|
||
continue
|
||
}
|
||
}
|
||
continue
|
||
case <-time.After(fDesc.timeout):
|
||
//if err != nil {
|
||
if pkg.Metadata.QueueCallback != "" {
|
||
pkg.ErrNo = 408
|
||
//pkg.Error = err.Error()
|
||
pkg.Error = "function call timeout"
|
||
pkg.Metadata.CallResponseID = uuid.NewV4().String()
|
||
if fDesc.logger != nil {
|
||
fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil)
|
||
}
|
||
buff, err := proto.Marshal(pkg)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
//fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte("function call timeout"))
|
||
if err = fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff); err != nil {
|
||
continue
|
||
}
|
||
}
|
||
continue
|
||
//}
|
||
}
|
||
}
|
||
}
|
||
|
||
type CallbackFunc struct {
|
||
F func(err error, header *network.Header, result []byte)
|
||
TimeCall time.Time
|
||
//Metadata *network.Metadata
|
||
}
|
||
|
||
// Сервер инстанса
|
||
type Serve struct {
|
||
//NatsHost string
|
||
//NatsPort string
|
||
NatsAddr string
|
||
nc *nats.Conn
|
||
hasNats bool
|
||
projectName string
|
||
moduleName string
|
||
funcs map[string]funcDesc
|
||
timeoutCallback time.Duration
|
||
queueCallback string
|
||
chanQueueCallback chan *nats.Msg
|
||
subj *nats.Subscription
|
||
//callbackFuncs map[string]func(err error, header *network.Header, result []byte)
|
||
callbackFuncs map[string]*CallbackFunc
|
||
callbackFuncsMutex sync.Mutex
|
||
accept map[string]map[string]bool
|
||
logger *ytlogger.YTLogger
|
||
queueNameLog *string
|
||
}
|
||
|
||
// Создание нового инстанса сервера
|
||
func NewServe(addr string, projectName string, moduleName string, natsServ *nats.Conn, priority *ytlogger.Priority, queueNameLog *string) *Serve {
|
||
if strings.Trim(moduleName, " ") == "" {
|
||
moduleName = "default"
|
||
} else {
|
||
moduleName = strings.Trim(moduleName, " ")
|
||
}
|
||
|
||
if strings.Trim(projectName, " ") == "" {
|
||
projectName = "default"
|
||
} else {
|
||
projectName = strings.Trim(projectName, " ")
|
||
}
|
||
|
||
serve := &Serve{}
|
||
if natsServ != nil {
|
||
serve.nc = natsServ
|
||
} else {
|
||
/*serve.NatsHost = host
|
||
serve.NatsPort = port*/
|
||
serve.NatsAddr = addr
|
||
}
|
||
serve.projectName = projectName
|
||
serve.moduleName = moduleName
|
||
serve.funcs = make(map[string]funcDesc)
|
||
|
||
serve.accept = make(map[string]map[string]bool)
|
||
serve.accept["system"] = make(map[string]bool)
|
||
serve.accept["system"]["gateway"] = true
|
||
serve.accept["system"]["security"] = true
|
||
serve.accept["system"]["settings"] = true
|
||
serve.accept["system"]["telegram"] = true
|
||
//serve.accept[projectName][moduleName] = true
|
||
|
||
serve.timeoutCallback = time.Second * 30
|
||
|
||
if queueNameLog != nil {
|
||
if priority == nil {
|
||
priority = &[]ytlogger.Priority{ytlogger.LOG_INFO}[0]
|
||
}
|
||
serve.logger = ytlogger.NewYTLogger(*priority, 100000, serve.sendLog)
|
||
serve.queueNameLog = queueNameLog
|
||
}
|
||
|
||
return serve
|
||
}
|
||
|
||
// Функция отправки логов на удаленный сервер
|
||
func (serve *Serve) sendLog(pkg *log.Pkg) {
|
||
var b []byte
|
||
var err error
|
||
b, err = proto.Marshal(pkg)
|
||
if err != nil {
|
||
return
|
||
}
|
||
serve.nc.Publish(*serve.queueNameLog, b)
|
||
}
|
||
|
||
// Установка разрешающих правил выполнения функций
|
||
func (serve *Serve) AddAccept(ruleName string) bool {
|
||
return serve.AddAcceptBool(ruleName, true)
|
||
}
|
||
|
||
// Установка разрешающих правил выполнения функций
|
||
func (serve *Serve) AddAcceptBool(ruleName string, allow bool) bool {
|
||
ruleName = strings.Trim(ruleName, " ")
|
||
if ruleName == "" {
|
||
return false
|
||
}
|
||
if ruleName == "*" {
|
||
serve.accept = make(map[string]map[string]bool)
|
||
serve.accept["*"] = make(map[string]bool)
|
||
serve.accept["*"]["*"] = allow
|
||
} else {
|
||
rules := strings.Split(ruleName, ".")
|
||
for i := range rules {
|
||
rules[i] = strings.Trim(rules[i], " ")
|
||
}
|
||
switch len(rules) {
|
||
case 0:
|
||
return false
|
||
case 1:
|
||
serve.accept[rules[0]] = make(map[string]bool)
|
||
serve.accept[rules[0]]["*"] = allow
|
||
case 2:
|
||
if rules[0] == "*" || rules[0] == "" {
|
||
return false
|
||
}
|
||
if rules[1] == "" {
|
||
return false
|
||
}
|
||
if _, ok := serve.accept[rules[0]]; !ok {
|
||
serve.accept[rules[0]] = make(map[string]bool)
|
||
}
|
||
if rules[1] == "*" {
|
||
serve.accept[rules[0]]["*"] = allow
|
||
} else {
|
||
serve.accept[rules[0]][rules[1]] = allow
|
||
}
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
func (serve *Serve) natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) {
|
||
fmt.Printf("error: %v\n", natsErr)
|
||
if natsErr == nats.ErrSlowConsumer {
|
||
pendingMsgs, _, err := sub.Pending()
|
||
if err != nil {
|
||
fmt.Printf("couldn't get pending messages: %v", err)
|
||
return
|
||
}
|
||
fmt.Printf("Falling behind with %d pending messages on subject %q.\n",
|
||
pendingMsgs, sub.Subject)
|
||
// Log error, notify operations...
|
||
}
|
||
// check for other errors
|
||
}
|
||
|
||
// Запуск текущего инстанса сервера
|
||
func (serve *Serve) Run() (err error) {
|
||
if serve.nc == nil {
|
||
serve.nc, err = nats.Connect(serve.NatsAddr, nats.ErrorHandler(serve.natsErrHandler))
|
||
if err != nil {
|
||
return
|
||
}
|
||
} else {
|
||
serve.hasNats = true
|
||
}
|
||
rnd := make([]byte, 16)
|
||
_, err = rand.Read(rnd)
|
||
if err != nil {
|
||
return
|
||
}
|
||
//serve.callbackFuncs = make(map[string]func(err error, header *network.Header, result []byte))
|
||
serve.callbackFuncs = make(map[string]*CallbackFunc)
|
||
serve.queueCallback = fmt.Sprintf("%s.%s.%x", serve.projectName, serve.moduleName, rnd)
|
||
serve.chanQueueCallback = make(chan *nats.Msg)
|
||
serve.subj, err = serve.nc.ChanQueueSubscribe(serve.queueCallback, serve.projectName+"."+serve.moduleName, serve.chanQueueCallback)
|
||
go serve.worker()
|
||
return
|
||
}
|
||
|
||
func (serve *Serve) cleanTimeout() {
|
||
serve.callbackFuncsMutex.Lock()
|
||
defer serve.callbackFuncsMutex.Unlock()
|
||
for k, v := range serve.callbackFuncs {
|
||
if time.Since(v.TimeCall) > serve.timeoutCallback {
|
||
delete(serve.callbackFuncs, k)
|
||
err := fmt.Errorf("call function is timed out")
|
||
go v.F(err, nil, nil)
|
||
}
|
||
}
|
||
}
|
||
|
||
func (serve *Serve) worker() {
|
||
go func() {
|
||
for {
|
||
<-time.After(serve.timeoutCallback)
|
||
serve.cleanTimeout()
|
||
}
|
||
}()
|
||
|
||
for {
|
||
if msg, ok := <-serve.chanQueueCallback; !ok {
|
||
break
|
||
} else {
|
||
var err error
|
||
var payload network.Function
|
||
err = proto.Unmarshal(msg.Data, &payload)
|
||
if err != nil {
|
||
//panic(err)
|
||
continue
|
||
}
|
||
if payload.Metadata != nil {
|
||
if f, ok := serve.callbackFuncs[payload.Metadata.CallID]; ok {
|
||
serve.callbackFuncsMutex.Lock()
|
||
delete(serve.callbackFuncs, payload.Metadata.CallID)
|
||
serve.callbackFuncsMutex.Unlock()
|
||
if payload.ErrNo != 0 {
|
||
err = fmt.Errorf(payload.Error)
|
||
}
|
||
if f != nil {
|
||
//go f.F(err, payload.Header, payload.Payload)
|
||
f.F(err, payload.Header, payload.Payload)
|
||
}
|
||
//f(err, payload.Header, payload.Payload)
|
||
}
|
||
}
|
||
|
||
}
|
||
}
|
||
}
|
||
|
||
// Завершение текущего инстанса сервера
|
||
func (serve *Serve) Shutdown() {
|
||
for _, value := range serve.funcs {
|
||
value.subj.Unsubscribe()
|
||
close(value.inputChan)
|
||
}
|
||
err := serve.subj.Unsubscribe()
|
||
if err != nil {
|
||
panic(err)
|
||
}
|
||
close(serve.chanQueueCallback)
|
||
if !serve.hasNats {
|
||
serve.nc.Close()
|
||
}
|
||
|
||
}
|
||
|
||
// Регистрация функции в текущем инстансе
|
||
func (serve *Serve) RegisterFunction(funcName string, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) {
|
||
return serve.RegisterFunctionWithTimeout(funcName, time.Second*30, f)
|
||
}
|
||
|
||
// Регистрация функции с установленным таймаутом в текущем инстансе
|
||
func (serve *Serve) RegisterFunctionWithTimeout(funcName string, timeout time.Duration, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) {
|
||
if _, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)]; ok {
|
||
return fmt.Errorf(`restrict duplicate function name`)
|
||
}
|
||
id := make([]byte, 16)
|
||
|
||
_, err = rand.Read(id)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
fDesc := funcDesc{
|
||
Name: fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName),
|
||
F: f,
|
||
logger: serve.logger,
|
||
}
|
||
fDesc.inputChan = make(chan *nats.Msg)
|
||
fDesc.subj, err = serve.nc.ChanQueueSubscribe(fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), serve.projectName+"."+serve.moduleName, fDesc.inputChan)
|
||
fDesc.conn = serve.nc
|
||
fDesc.timeout = timeout
|
||
fDesc.acceptFunc = serve.acceptVerify
|
||
go fDesc.worker()
|
||
|
||
serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName)] = fDesc
|
||
return
|
||
}
|
||
|
||
// Проверка есть ли такая функция в текущем инстансе
|
||
func (serve *Serve) HasFunction(name string) bool {
|
||
_, ok := serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, name)]
|
||
return ok
|
||
}
|
||
|
||
// Проверка возможности выполнения функции на основе установленных вызывающих сторон
|
||
func (serve *Serve) acceptVerify(accept string) bool {
|
||
|
||
if _, ok := serve.accept["*"]; ok {
|
||
return true
|
||
} else {
|
||
accept = strings.Trim(accept, " ")
|
||
if accept == "" {
|
||
return false
|
||
}
|
||
|
||
rules := strings.Split(accept, ".")
|
||
for i := range rules {
|
||
rules[i] = strings.Trim(rules[i], " ")
|
||
}
|
||
if len(rules) == 0 {
|
||
return false
|
||
}
|
||
if val, ok := serve.accept[rules[0]]; !ok {
|
||
return false
|
||
} else {
|
||
if _, ok := val["*"]; ok {
|
||
return true
|
||
} else {
|
||
if len(rules) == 1 {
|
||
return true
|
||
}
|
||
if v, ok := val[rules[1]]; !ok {
|
||
return false
|
||
} else {
|
||
return v
|
||
}
|
||
}
|
||
}
|
||
|
||
}
|
||
}
|
||
|
||
// Асинхронный вызов функции
|
||
func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, header *network.Header, result []byte)) {
|
||
metadata := &network.Metadata{
|
||
//PackageType: network.Metadata_TYPE_REQUEST,
|
||
FuncName: name,
|
||
QueueCallback: serve.queueCallback,
|
||
CallID: uuid.NewV4().String(),
|
||
CallFrom: fmt.Sprintf("%s.%s", serve.projectName, serve.moduleName),
|
||
}
|
||
if header == nil {
|
||
header = &network.Header{}
|
||
}
|
||
header.CallPrevID = header.CallID
|
||
header.CallID = metadata.CallID
|
||
header.Project = serve.projectName
|
||
pkg := network.Function{
|
||
Header: header,
|
||
Payload: payload,
|
||
Metadata: metadata,
|
||
}
|
||
buf, err := proto.Marshal(&pkg)
|
||
if err != nil {
|
||
if f != nil {
|
||
f(err, header, payload)
|
||
}
|
||
return
|
||
}
|
||
|
||
callback := &CallbackFunc{
|
||
F: f,
|
||
TimeCall: time.Now(),
|
||
//Metadata: pkg.Metadata,
|
||
}
|
||
serve.callbackFuncsMutex.Lock()
|
||
defer serve.callbackFuncsMutex.Unlock()
|
||
serve.callbackFuncs[metadata.CallID] = callback
|
||
//serve.callbackFuncs[metadata.CallID] = f
|
||
serve.nc.Publish(name, buf)
|
||
}
|
||
|
||
// Синхронный вызов функции
|
||
func (serve *Serve) Call(name string, header *network.Header, payload []byte) (result []byte, head *network.Header, err error) {
|
||
chanRes := make(chan *ChanRes)
|
||
serve.CallAsync(name, header, payload, func(err error, header *network.Header, result []byte) {
|
||
chanRes <- &ChanRes{
|
||
res: result,
|
||
header: header,
|
||
err: err,
|
||
}
|
||
})
|
||
res := <-chanRes
|
||
err = res.err
|
||
head = res.header
|
||
result = res.res
|
||
|
||
return
|
||
}
|
||
|
||
// Вызов функции как поток без ожидания возврата результата
|
||
func (serve *Serve) CallStream(name string, header *network.Header, payload []byte) (err error) {
|
||
metadata := &network.Metadata{
|
||
//PackageType: network.Metadata_TYPE_STREAM,
|
||
FuncName: name,
|
||
QueueCallback: serve.queueCallback,
|
||
CallID: uuid.NewV4().String(),
|
||
CallFrom: fmt.Sprintf("%s.%s", serve.projectName, serve.moduleName),
|
||
}
|
||
if header == nil {
|
||
header = &network.Header{}
|
||
}
|
||
header.Project = serve.projectName
|
||
pkg := network.Function{
|
||
Header: header,
|
||
Payload: payload,
|
||
Metadata: metadata,
|
||
}
|
||
var buf []byte
|
||
buf, err = proto.Marshal(&pkg)
|
||
if err != nil {
|
||
return
|
||
}
|
||
serve.nc.Publish(name, buf)
|
||
return
|
||
}
|