yt-function-sdk-go/ytfunction.go

549 lines
15 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package ytfunction
import (
"crypto/rand"
"fmt"
"strings"
"sync"
"time"
"git.ymnuktech.ru/ymnuk/yt-function-sdk-go/network"
ytlogger "git.ymnuktech.ru/ymnuk/yt-logger-go"
"git.ymnuktech.ru/ymnuk/yt-logger-go/log"
"github.com/nats-io/nats.go"
uuid "github.com/satori/go.uuid"
"google.golang.org/protobuf/proto"
)
type ChanRes struct {
res []byte
header *network.Header
err error
}
type funcDesc struct {
Name string
F func(header *network.Header, payload []byte) (result []byte, err error)
inputChan chan *nats.Msg
subj *nats.Subscription
conn *nats.Conn
timeout time.Duration
acceptFunc func(access string) bool
logger *ytlogger.YTLogger
}
func (fDesc *funcDesc) worker() {
for {
data, ok := <-fDesc.inputChan
if !ok {
break
}
var err error
pkg := &network.Function{}
err = proto.Unmarshal(data.Data, pkg)
if err != nil {
panic(err)
}
timeStart := time.Now()
// Проверка может ли вызывающая сторона вызвать функцию
if !fDesc.acceptFunc(pkg.Metadata.CallFrom) {
if pkg.Metadata.QueueCallback != "" {
pkg.ErrNo = 403
pkg.Error = "forbidden"
pkg.Metadata.CallResponseID = uuid.NewV4().String()
buff, err := proto.Marshal(pkg)
if err != nil {
panic(err)
}
if fDesc.logger != nil {
fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil)
}
if err = fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff); err != nil {
continue
}
} else {
if fDesc.logger != nil {
fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil)
}
}
continue
}
// Вызов функции
chanRes := make(chan *ChanRes)
go func() {
if pkg.Header.ResponseHeaders == nil {
pkg.Header.ResponseHeaders = make(map[string]string)
}
result, err := fDesc.F(pkg.Header, pkg.Payload)
chanRes <- &ChanRes{
res: result,
err: err,
}
}()
select {
case res := <-chanRes:
if res.err != nil {
if pkg.Metadata.QueueCallback != "" {
pkg.ErrNo = 500
//if err != nil {
pkg.Error = res.err.Error()
/*} else {
pkg.Error = ""
}*/
pkg.Metadata.CallResponseID = uuid.NewV4().String()
if fDesc.logger != nil {
fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil)
}
buff, err := proto.Marshal(pkg)
if err != nil {
//panic(err)
continue
}
//fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte(pkg.Error))
fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff)
}
continue
}
if pkg.Metadata.QueueCallback != "" {
pkg.Payload = res.res
pkg.Metadata.CallResponseID = uuid.NewV4().String()
if fDesc.logger != nil {
fDesc.logger.Info(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, &[]int64{int64(len(pkg.Payload))}[0], nil)
}
buff, err := proto.Marshal(pkg)
if err != nil {
//panic(err)
continue
}
if err = fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff); err != nil {
continue
}
}
continue
case <-time.After(fDesc.timeout):
//if err != nil {
if pkg.Metadata.QueueCallback != "" {
pkg.ErrNo = 408
//pkg.Error = err.Error()
pkg.Error = "function call timeout"
pkg.Metadata.CallResponseID = uuid.NewV4().String()
if fDesc.logger != nil {
fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil)
}
buff, err := proto.Marshal(pkg)
if err != nil {
continue
}
//fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte("function call timeout"))
if err = fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff); err != nil {
continue
}
}
continue
//}
}
}
}
type CallbackFunc struct {
F func(err error, header *network.Header, result []byte)
TimeCall time.Time
//Metadata *network.Metadata
}
// Сервер инстанса
type Serve struct {
//NatsHost string
//NatsPort string
NatsAddr string
nc *nats.Conn
hasNats bool
projectName string
moduleName string
funcs map[string]funcDesc
timeoutCallback time.Duration
queueCallback string
chanQueueCallback chan *nats.Msg
subj *nats.Subscription
//callbackFuncs map[string]func(err error, header *network.Header, result []byte)
callbackFuncs map[string]*CallbackFunc
callbackFuncsMutex sync.Mutex
accept map[string]map[string]bool
logger *ytlogger.YTLogger
queueNameLog *string
}
// Создание нового инстанса сервера
func NewServe(addr string, projectName string, moduleName string, natsServ *nats.Conn, priority *ytlogger.Priority, queueNameLog *string) *Serve {
if strings.Trim(moduleName, " ") == "" {
moduleName = "default"
} else {
moduleName = strings.Trim(moduleName, " ")
}
if strings.Trim(projectName, " ") == "" {
projectName = "default"
} else {
projectName = strings.Trim(projectName, " ")
}
serve := &Serve{}
if natsServ != nil {
serve.nc = natsServ
} else {
/*serve.NatsHost = host
serve.NatsPort = port*/
serve.NatsAddr = addr
}
serve.projectName = projectName
serve.moduleName = moduleName
serve.funcs = make(map[string]funcDesc)
serve.accept = make(map[string]map[string]bool)
serve.accept["system"] = make(map[string]bool)
serve.accept["system"]["gateway"] = true
serve.accept["system"]["security"] = true
serve.accept["system"]["settings"] = true
serve.accept["system"]["telegram"] = true
//serve.accept[projectName][moduleName] = true
serve.timeoutCallback = time.Second * 30
if queueNameLog != nil {
if priority == nil {
priority = &[]ytlogger.Priority{ytlogger.LOG_INFO}[0]
}
serve.logger = ytlogger.NewYTLogger(*priority, 100000, serve.sendLog)
serve.queueNameLog = queueNameLog
}
return serve
}
// Функция отправки логов на удаленный сервер
func (serve *Serve) sendLog(pkg *log.Pkg) {
var b []byte
var err error
b, err = proto.Marshal(pkg)
if err != nil {
return
}
serve.nc.Publish(*serve.queueNameLog, b)
}
// Установка разрешающих правил выполнения функций
func (serve *Serve) AddAccept(ruleName string) bool {
return serve.AddAcceptBool(ruleName, true)
}
// Установка разрешающих правил выполнения функций
func (serve *Serve) AddAcceptBool(ruleName string, allow bool) bool {
ruleName = strings.Trim(ruleName, " ")
if ruleName == "" {
return false
}
if ruleName == "*" {
serve.accept = make(map[string]map[string]bool)
serve.accept["*"] = make(map[string]bool)
serve.accept["*"]["*"] = allow
} else {
rules := strings.Split(ruleName, ".")
for i := range rules {
rules[i] = strings.Trim(rules[i], " ")
}
switch len(rules) {
case 0:
return false
case 1:
serve.accept[rules[0]] = make(map[string]bool)
serve.accept[rules[0]]["*"] = allow
case 2:
if rules[0] == "*" || rules[0] == "" {
return false
}
if rules[1] == "" {
return false
}
if _, ok := serve.accept[rules[0]]; !ok {
serve.accept[rules[0]] = make(map[string]bool)
}
if rules[1] == "*" {
serve.accept[rules[0]]["*"] = allow
} else {
serve.accept[rules[0]][rules[1]] = allow
}
}
}
return true
}
func (serve *Serve) natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) {
fmt.Printf("error: %v\n", natsErr)
if natsErr == nats.ErrSlowConsumer {
pendingMsgs, _, err := sub.Pending()
if err != nil {
fmt.Printf("couldn't get pending messages: %v", err)
return
}
fmt.Printf("Falling behind with %d pending messages on subject %q.\n",
pendingMsgs, sub.Subject)
// Log error, notify operations...
}
// check for other errors
}
// Запуск текущего инстанса сервера
func (serve *Serve) Run() (err error) {
if serve.nc == nil {
serve.nc, err = nats.Connect(serve.NatsAddr, nats.ErrorHandler(serve.natsErrHandler))
if err != nil {
return
}
} else {
serve.hasNats = true
}
rnd := make([]byte, 16)
_, err = rand.Read(rnd)
if err != nil {
return
}
//serve.callbackFuncs = make(map[string]func(err error, header *network.Header, result []byte))
serve.callbackFuncs = make(map[string]*CallbackFunc)
serve.queueCallback = fmt.Sprintf("%s.%s.%x", serve.projectName, serve.moduleName, rnd)
serve.chanQueueCallback = make(chan *nats.Msg)
serve.subj, err = serve.nc.ChanQueueSubscribe(serve.queueCallback, serve.projectName+"."+serve.moduleName, serve.chanQueueCallback)
go serve.worker()
return
}
func (serve *Serve) cleanTimeout() {
serve.callbackFuncsMutex.Lock()
defer serve.callbackFuncsMutex.Unlock()
for k, v := range serve.callbackFuncs {
if time.Since(v.TimeCall) > serve.timeoutCallback {
delete(serve.callbackFuncs, k)
err := fmt.Errorf("call function is timed out")
go v.F(err, nil, nil)
}
}
}
func (serve *Serve) worker() {
go func() {
for {
<-time.After(serve.timeoutCallback)
serve.cleanTimeout()
}
}()
for {
if msg, ok := <-serve.chanQueueCallback; !ok {
break
} else {
var err error
var payload network.Function
err = proto.Unmarshal(msg.Data, &payload)
if err != nil {
//panic(err)
continue
}
if payload.Metadata != nil {
if f, ok := serve.callbackFuncs[payload.Metadata.CallID]; ok {
serve.callbackFuncsMutex.Lock()
delete(serve.callbackFuncs, payload.Metadata.CallID)
serve.callbackFuncsMutex.Unlock()
if payload.ErrNo != 0 {
err = fmt.Errorf(payload.Error)
}
if f != nil {
//go f.F(err, payload.Header, payload.Payload)
f.F(err, payload.Header, payload.Payload)
}
//f(err, payload.Header, payload.Payload)
}
}
}
}
}
// Завершение текущего инстанса сервера
func (serve *Serve) Shutdown() {
for _, value := range serve.funcs {
value.subj.Unsubscribe()
close(value.inputChan)
}
err := serve.subj.Unsubscribe()
if err != nil {
panic(err)
}
close(serve.chanQueueCallback)
if !serve.hasNats {
serve.nc.Close()
}
}
// Регистрация функции в текущем инстансе
func (serve *Serve) RegisterFunction(funcName string, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) {
return serve.RegisterFunctionWithTimeout(funcName, time.Second*30, f)
}
// Регистрация функции с установленным таймаутом в текущем инстансе
func (serve *Serve) RegisterFunctionWithTimeout(funcName string, timeout time.Duration, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) {
if _, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)]; ok {
return fmt.Errorf(`restrict duplicate function name`)
}
id := make([]byte, 16)
_, err = rand.Read(id)
if err != nil {
return
}
fDesc := funcDesc{
Name: fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName),
F: f,
logger: serve.logger,
}
fDesc.inputChan = make(chan *nats.Msg)
fDesc.subj, err = serve.nc.ChanQueueSubscribe(fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), serve.projectName+"."+serve.moduleName, fDesc.inputChan)
fDesc.conn = serve.nc
fDesc.timeout = timeout
fDesc.acceptFunc = serve.acceptVerify
go fDesc.worker()
serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName)] = fDesc
return
}
// Проверка есть ли такая функция в текущем инстансе
func (serve *Serve) HasFunction(name string) bool {
_, ok := serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, name)]
return ok
}
// Проверка возможности выполнения функции на основе установленных вызывающих сторон
func (serve *Serve) acceptVerify(accept string) bool {
if _, ok := serve.accept["*"]; ok {
return true
} else {
accept = strings.Trim(accept, " ")
if accept == "" {
return false
}
rules := strings.Split(accept, ".")
for i := range rules {
rules[i] = strings.Trim(rules[i], " ")
}
if len(rules) == 0 {
return false
}
if val, ok := serve.accept[rules[0]]; !ok {
return false
} else {
if _, ok := val["*"]; ok {
return true
} else {
if len(rules) == 1 {
return true
}
if v, ok := val[rules[1]]; !ok {
return false
} else {
return v
}
}
}
}
}
// Асинхронный вызов функции
func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, header *network.Header, result []byte)) {
metadata := &network.Metadata{
//PackageType: network.Metadata_TYPE_REQUEST,
FuncName: name,
QueueCallback: serve.queueCallback,
CallID: uuid.NewV4().String(),
CallFrom: fmt.Sprintf("%s.%s", serve.projectName, serve.moduleName),
}
if header == nil {
header = &network.Header{}
}
header.CallPrevID = header.CallID
header.CallID = metadata.CallID
header.Project = serve.projectName
pkg := network.Function{
Header: header,
Payload: payload,
Metadata: metadata,
}
buf, err := proto.Marshal(&pkg)
if err != nil {
if f != nil {
f(err, header, payload)
}
return
}
callback := &CallbackFunc{
F: f,
TimeCall: time.Now(),
//Metadata: pkg.Metadata,
}
serve.callbackFuncsMutex.Lock()
defer serve.callbackFuncsMutex.Unlock()
serve.callbackFuncs[metadata.CallID] = callback
//serve.callbackFuncs[metadata.CallID] = f
serve.nc.Publish(name, buf)
}
// Синхронный вызов функции
func (serve *Serve) Call(name string, header *network.Header, payload []byte) (result []byte, head *network.Header, err error) {
chanRes := make(chan *ChanRes)
serve.CallAsync(name, header, payload, func(err error, header *network.Header, result []byte) {
chanRes <- &ChanRes{
res: result,
header: header,
err: err,
}
})
res := <-chanRes
err = res.err
head = res.header
result = res.res
return
}
// Вызов функции как поток без ожидания возврата результата
func (serve *Serve) CallStream(name string, header *network.Header, payload []byte) (err error) {
metadata := &network.Metadata{
//PackageType: network.Metadata_TYPE_STREAM,
FuncName: name,
QueueCallback: serve.queueCallback,
CallID: uuid.NewV4().String(),
CallFrom: fmt.Sprintf("%s.%s", serve.projectName, serve.moduleName),
}
if header == nil {
header = &network.Header{}
}
header.Project = serve.projectName
pkg := network.Function{
Header: header,
Payload: payload,
Metadata: metadata,
}
var buf []byte
buf, err = proto.Marshal(&pkg)
if err != nil {
return
}
serve.nc.Publish(name, buf)
return
}