package ytfunction import ( "crypto/rand" "fmt" "strings" "sync" "time" "git.ymnuktech.ru/ymnuk/yt-function-sdk-go/network" ytlogger "git.ymnuktech.ru/ymnuk/yt-logger-go" "git.ymnuktech.ru/ymnuk/yt-logger-go/log" "github.com/nats-io/nats.go" uuid "github.com/satori/go.uuid" "google.golang.org/protobuf/proto" ) type ChanRes struct { res []byte header *network.Header err error } type funcDesc struct { Name string F func(header *network.Header, payload []byte) (result []byte, err error) inputChan chan *nats.Msg subj *nats.Subscription conn *nats.Conn timeout time.Duration acceptFunc func(access string) bool logger *ytlogger.YTLogger } func (fDesc *funcDesc) worker() { for { data, ok := <-fDesc.inputChan if !ok { break } var err error pkg := &network.Function{} err = proto.Unmarshal(data.Data, pkg) if err != nil { panic(err) } timeStart := time.Now() // Проверка может ли вызывающая сторона вызвать функцию if !fDesc.acceptFunc(pkg.Metadata.CallFrom) { if pkg.Metadata.QueueCallback != "" { pkg.ErrNo = 403 pkg.Error = "forbidden" pkg.Metadata.CallResponseID = uuid.NewV4().String() buff, err := proto.Marshal(pkg) if err != nil { panic(err) } if fDesc.logger != nil { fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil) } if err = fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff); err != nil { continue } } else { if fDesc.logger != nil { fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil) } } continue } // Вызов функции chanRes := make(chan *ChanRes) go func() { if pkg.Header.ResponseHeaders == nil { pkg.Header.ResponseHeaders = make(map[string]string) } result, err := fDesc.F(pkg.Header, pkg.Payload) chanRes <- &ChanRes{ res: result, err: err, } }() select { case res := <-chanRes: if res.err != nil { if pkg.Metadata.QueueCallback != "" { pkg.ErrNo = 500 //if err != nil { pkg.Error = res.err.Error() /*} else { pkg.Error = "" }*/ pkg.Metadata.CallResponseID = uuid.NewV4().String() if fDesc.logger != nil { fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil) } buff, err := proto.Marshal(pkg) if err != nil { //panic(err) continue } //fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte(pkg.Error)) fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff) } continue } if pkg.Metadata.QueueCallback != "" { pkg.Payload = res.res pkg.Metadata.CallResponseID = uuid.NewV4().String() if fDesc.logger != nil { fDesc.logger.Info(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, &[]int64{int64(len(pkg.Payload))}[0], nil) } buff, err := proto.Marshal(pkg) if err != nil { //panic(err) continue } if err = fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff); err != nil { continue } } continue case <-time.After(fDesc.timeout): //if err != nil { if pkg.Metadata.QueueCallback != "" { pkg.ErrNo = 408 //pkg.Error = err.Error() pkg.Error = "function call timeout" pkg.Metadata.CallResponseID = uuid.NewV4().String() if fDesc.logger != nil { fDesc.logger.Err(&pkg.Metadata.FuncName, &pkg.Metadata.CallID, &pkg.Header.CallPrevID, &pkg.Metadata.CallFrom, &pkg.Metadata.CallResponseID, &timeStart, &[]time.Time{time.Now()}[0], nil, &pkg.ErrNo, &pkg.Error, &pkg.ErrNo, nil, nil) } buff, err := proto.Marshal(pkg) if err != nil { continue } //fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte("function call timeout")) if err = fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff); err != nil { continue } } continue //} } } } type CallbackFunc struct { F func(err error, header *network.Header, result []byte) TimeCall time.Time //Metadata *network.Metadata } // Сервер инстанса type Serve struct { //NatsHost string //NatsPort string NatsAddr string nc *nats.Conn hasNats bool projectName string moduleName string funcs map[string]funcDesc timeoutCallback time.Duration queueCallback string chanQueueCallback chan *nats.Msg subj *nats.Subscription //callbackFuncs map[string]func(err error, header *network.Header, result []byte) callbackFuncs map[string]*CallbackFunc callbackFuncsMutex sync.Mutex accept map[string]map[string]bool logger *ytlogger.YTLogger queueNameLog *string } // Создание нового инстанса сервера func NewServe(addr string, projectName string, moduleName string, natsServ *nats.Conn, priority *ytlogger.Priority, queueNameLog *string) *Serve { if strings.Trim(moduleName, " ") == "" { moduleName = "default" } else { moduleName = strings.Trim(moduleName, " ") } if strings.Trim(projectName, " ") == "" { projectName = "default" } else { projectName = strings.Trim(projectName, " ") } serve := &Serve{} if natsServ != nil { serve.nc = natsServ } else { /*serve.NatsHost = host serve.NatsPort = port*/ serve.NatsAddr = addr } serve.projectName = projectName serve.moduleName = moduleName serve.funcs = make(map[string]funcDesc) serve.accept = make(map[string]map[string]bool) serve.accept["system"] = make(map[string]bool) serve.accept["system"]["gateway"] = true serve.accept["system"]["security"] = true serve.accept["system"]["settings"] = true serve.accept["system"]["telegram"] = true //serve.accept[projectName][moduleName] = true serve.timeoutCallback = time.Second * 30 if queueNameLog != nil { if priority == nil { priority = &[]ytlogger.Priority{ytlogger.LOG_INFO}[0] } serve.logger = ytlogger.NewYTLogger(*priority, 100000, serve.sendLog) serve.queueNameLog = queueNameLog } return serve } // Функция отправки логов на удаленный сервер func (serve *Serve) sendLog(pkg *log.Pkg) { var b []byte var err error b, err = proto.Marshal(pkg) if err != nil { return } serve.nc.Publish(*serve.queueNameLog, b) } // Установка разрешающих правил выполнения функций func (serve *Serve) AddAccept(ruleName string) bool { return serve.AddAcceptBool(ruleName, true) } // Установка разрешающих правил выполнения функций func (serve *Serve) AddAcceptBool(ruleName string, allow bool) bool { ruleName = strings.Trim(ruleName, " ") if ruleName == "" { return false } if ruleName == "*" { serve.accept = make(map[string]map[string]bool) serve.accept["*"] = make(map[string]bool) serve.accept["*"]["*"] = allow } else { rules := strings.Split(ruleName, ".") for i := range rules { rules[i] = strings.Trim(rules[i], " ") } switch len(rules) { case 0: return false case 1: serve.accept[rules[0]] = make(map[string]bool) serve.accept[rules[0]]["*"] = allow case 2: if rules[0] == "*" || rules[0] == "" { return false } if rules[1] == "" { return false } if _, ok := serve.accept[rules[0]]; !ok { serve.accept[rules[0]] = make(map[string]bool) } if rules[1] == "*" { serve.accept[rules[0]]["*"] = allow } else { serve.accept[rules[0]][rules[1]] = allow } } } return true } func (serve *Serve) natsErrHandler(nc *nats.Conn, sub *nats.Subscription, natsErr error) { fmt.Printf("error: %v\n", natsErr) if natsErr == nats.ErrSlowConsumer { pendingMsgs, _, err := sub.Pending() if err != nil { fmt.Printf("couldn't get pending messages: %v", err) return } fmt.Printf("Falling behind with %d pending messages on subject %q.\n", pendingMsgs, sub.Subject) // Log error, notify operations... } // check for other errors } // Запуск текущего инстанса сервера func (serve *Serve) Run() (err error) { if serve.nc == nil { serve.nc, err = nats.Connect(serve.NatsAddr, nats.ErrorHandler(serve.natsErrHandler)) if err != nil { return } } else { serve.hasNats = true } rnd := make([]byte, 16) _, err = rand.Read(rnd) if err != nil { return } //serve.callbackFuncs = make(map[string]func(err error, header *network.Header, result []byte)) serve.callbackFuncs = make(map[string]*CallbackFunc) serve.queueCallback = fmt.Sprintf("%s.%s.%x", serve.projectName, serve.moduleName, rnd) serve.chanQueueCallback = make(chan *nats.Msg) serve.subj, err = serve.nc.ChanQueueSubscribe(serve.queueCallback, serve.projectName+"."+serve.moduleName, serve.chanQueueCallback) go serve.worker() return } func (serve *Serve) cleanTimeout() { serve.callbackFuncsMutex.Lock() defer serve.callbackFuncsMutex.Unlock() for k, v := range serve.callbackFuncs { if time.Since(v.TimeCall) > serve.timeoutCallback { delete(serve.callbackFuncs, k) err := fmt.Errorf("call function is timed out") go v.F(err, nil, nil) } } } func (serve *Serve) worker() { go func() { for { <-time.After(serve.timeoutCallback) serve.cleanTimeout() } }() for { if msg, ok := <-serve.chanQueueCallback; !ok { break } else { var err error var payload network.Function err = proto.Unmarshal(msg.Data, &payload) if err != nil { //panic(err) continue } if payload.Metadata != nil { if f, ok := serve.callbackFuncs[payload.Metadata.CallID]; ok { serve.callbackFuncsMutex.Lock() delete(serve.callbackFuncs, payload.Metadata.CallID) serve.callbackFuncsMutex.Unlock() if payload.ErrNo != 0 { err = fmt.Errorf(payload.Error) } if f != nil { //go f.F(err, payload.Header, payload.Payload) f.F(err, payload.Header, payload.Payload) } //f(err, payload.Header, payload.Payload) } } } } } // Завершение текущего инстанса сервера func (serve *Serve) Shutdown() { for _, value := range serve.funcs { value.subj.Unsubscribe() close(value.inputChan) } err := serve.subj.Unsubscribe() if err != nil { panic(err) } close(serve.chanQueueCallback) if !serve.hasNats { serve.nc.Close() } } // Регистрация функции в текущем инстансе func (serve *Serve) RegisterFunction(funcName string, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) { return serve.RegisterFunctionWithTimeout(funcName, time.Second*30, f) } // Регистрация функции с установленным таймаутом в текущем инстансе func (serve *Serve) RegisterFunctionWithTimeout(funcName string, timeout time.Duration, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) { if _, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)]; ok { return fmt.Errorf(`restrict duplicate function name`) } id := make([]byte, 16) _, err = rand.Read(id) if err != nil { return } fDesc := funcDesc{ Name: fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), F: f, logger: serve.logger, } fDesc.inputChan = make(chan *nats.Msg) fDesc.subj, err = serve.nc.ChanQueueSubscribe(fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), serve.projectName+"."+serve.moduleName, fDesc.inputChan) fDesc.conn = serve.nc fDesc.timeout = timeout fDesc.acceptFunc = serve.acceptVerify go fDesc.worker() serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName)] = fDesc return } // Проверка есть ли такая функция в текущем инстансе func (serve *Serve) HasFunction(name string) bool { _, ok := serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, name)] return ok } // Проверка возможности выполнения функции на основе установленных вызывающих сторон func (serve *Serve) acceptVerify(accept string) bool { if _, ok := serve.accept["*"]; ok { return true } else { accept = strings.Trim(accept, " ") if accept == "" { return false } rules := strings.Split(accept, ".") for i := range rules { rules[i] = strings.Trim(rules[i], " ") } if len(rules) == 0 { return false } if val, ok := serve.accept[rules[0]]; !ok { return false } else { if _, ok := val["*"]; ok { return true } else { if len(rules) == 1 { return true } if v, ok := val[rules[1]]; !ok { return false } else { return v } } } } } // Асинхронный вызов функции func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, header *network.Header, result []byte)) { metadata := &network.Metadata{ //PackageType: network.Metadata_TYPE_REQUEST, FuncName: name, QueueCallback: serve.queueCallback, CallID: uuid.NewV4().String(), CallFrom: fmt.Sprintf("%s.%s", serve.projectName, serve.moduleName), } if header == nil { header = &network.Header{} } header.CallPrevID = header.CallID header.CallID = metadata.CallID header.Project = serve.projectName pkg := network.Function{ Header: header, Payload: payload, Metadata: metadata, } buf, err := proto.Marshal(&pkg) if err != nil { if f != nil { f(err, header, payload) } return } callback := &CallbackFunc{ F: f, TimeCall: time.Now(), //Metadata: pkg.Metadata, } serve.callbackFuncsMutex.Lock() defer serve.callbackFuncsMutex.Unlock() serve.callbackFuncs[metadata.CallID] = callback //serve.callbackFuncs[metadata.CallID] = f serve.nc.Publish(name, buf) } // Синхронный вызов функции func (serve *Serve) Call(name string, header *network.Header, payload []byte) (result []byte, head *network.Header, err error) { chanRes := make(chan *ChanRes) serve.CallAsync(name, header, payload, func(err error, header *network.Header, result []byte) { chanRes <- &ChanRes{ res: result, header: header, err: err, } }) res := <-chanRes err = res.err head = res.header result = res.res return } // Вызов функции как поток без ожидания возврата результата func (serve *Serve) CallStream(name string, header *network.Header, payload []byte) (err error) { metadata := &network.Metadata{ //PackageType: network.Metadata_TYPE_STREAM, FuncName: name, QueueCallback: serve.queueCallback, CallID: uuid.NewV4().String(), CallFrom: fmt.Sprintf("%s.%s", serve.projectName, serve.moduleName), } if header == nil { header = &network.Header{} } header.Project = serve.projectName pkg := network.Function{ Header: header, Payload: payload, Metadata: metadata, } var buf []byte buf, err = proto.Marshal(&pkg) if err != nil { return } serve.nc.Publish(name, buf) return }