From 2e88bf3fa561d62477d7f9c3b0298e3f0eb3d48c Mon Sep 17 00:00:00 2001 From: ymnuk Date: Tue, 25 Oct 2022 12:59:22 +0300 Subject: [PATCH] Prepare function --- .gitignore | 3 +- README.md | 7 +- go.mod | 2 + go.sum | 8 +++ protobuf | 2 +- ytfunction.go | 174 +++++++++++++++++++++++++++++++++++++-------- ytfunction_test.go | 40 +++++++++-- 7 files changed, 193 insertions(+), 43 deletions(-) diff --git a/.gitignore b/.gitignore index 438f73b..7e5c6aa 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -__debug \ No newline at end of file +__debug +/network \ No newline at end of file diff --git a/README.md b/README.md index e4b2757..673dbba 100644 --- a/README.md +++ b/README.md @@ -7,9 +7,8 @@ После десериализации присутствует 4 поля: 1. Параметры заголовка, которые передаются с пакетом (например http-заголовки). Формат заголовка типа key:string=valye:string -2. Название очереди-функции обратного вызова. Это название очереди, в которую необходимо вернуть результат +2. Метаданные. В этом поле передаются служебные данные для самого SDK. 3. Бинарные данные, которые, которые передаются в функцию -4. Метаданные. В этом поле передаются служебные данные для самого SDK. # Сборка @@ -25,7 +24,3 @@ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest ``` protoc --go_out=. ./protobuf/* ``` - -``` - -``` diff --git a/go.mod b/go.mod index a981091..735f057 100644 --- a/go.mod +++ b/go.mod @@ -6,5 +6,7 @@ require ( github.com/nats-io/nats.go v1.18.0 // indirect github.com/nats-io/nkeys v0.3.0 // indirect github.com/nats-io/nuid v1.0.1 // indirect + github.com/satori/go.uuid v1.2.0 // indirect golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect + google.golang.org/protobuf v1.28.1 // indirect ) diff --git a/go.sum b/go.sum index e991af8..0878401 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,13 @@ +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/nats-io/nats.go v1.18.0 h1:o480Ao6kuSSFyJO75rGTXCEPj7LGkY84C1Ye+Uhm4c0= github.com/nats-io/nats.go v1.18.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -11,3 +15,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/protobuf b/protobuf index 5cfc91a..7bdc3de 160000 --- a/protobuf +++ b/protobuf @@ -1 +1 @@ -Subproject commit 5cfc91ad53d29f10769170576282a2df0885777e +Subproject commit 7bdc3de09aae8a5b4c258c369e57c97121e9cbca diff --git a/ytfunction.go b/ytfunction.go index 3cd70cb..c08e825 100644 --- a/ytfunction.go +++ b/ytfunction.go @@ -3,51 +3,109 @@ package ytfunction import ( "crypto/rand" "fmt" - "log" + "strings" + "time" + "yt-function-sdk-go/network" "github.com/nats-io/nats.go" + uuid "github.com/satori/go.uuid" + "google.golang.org/protobuf/proto" ) type funcDesc struct { - Name string - F func(header map[string]string, payload []byte) (result []byte, err error) - QueueCallback string - inputChan chan *nats.Msg - subj *nats.Subscription + Name string + F func(header *network.Header, payload []byte) (result []byte, err error) + inputChan chan *nats.Msg + subj *nats.Subscription + conn *nats.Conn + timeout time.Duration } -func (fDesс *funcDesc) worker() { +func (fDesc *funcDesc) worker() { + chanErr := make(chan error) + chanResult := make(chan []byte) for { - data, ok := <-fDesс.inputChan + data, ok := <-fDesc.inputChan if !ok { break } - result, err := fDesс.F(nil, data.Data) - if err != nil { - log.Println(err) - continue - } - fmt.Println(result) + var err error + var pkg network.Function + err = proto.Unmarshal(data.Data, &pkg) + if err != nil { + panic(err) + } + go func() { + result, err := fDesc.F(pkg.Header, data.Data) + if err != nil { + chanErr <- err + } else { + chanResult <- result + } + }() + //var result []byte + select { + case result := <-chanResult: + pkg.Payload = result + pkg.Metadata.CallResponseID = uuid.NewV4().String() + buff, err := proto.Marshal(&pkg) + if err != nil { + panic(err) + } + fDesc.conn.Publish(pkg.Metadata.QueueCallback, buff) + continue + case err = <-chanErr: + if err != nil { + pkg.ErrNo = 500 + pkg.Error = err.Error() + pkg.Metadata.CallResponseID = uuid.NewV4().String() + fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte(err.Error())) + continue + } + case <-time.After(fDesc.timeout): + if err != nil { + pkg.ErrNo = 408 + pkg.Error = err.Error() + pkg.Metadata.CallResponseID = uuid.NewV4().String() + fDesc.conn.Publish(pkg.Metadata.QueueCallback, []byte("function call timeout")) + continue + } + } } } type Serve struct { - NatsHost string - NatsPort string - nc *nats.Conn - moduleName string - funcs map[string]funcDesc + NatsHost string + NatsPort string + nc *nats.Conn + projectName string + moduleName string + funcs map[string]funcDesc + timeoutCallback time.Duration + queueCallback string + chanQueueCallback chan *nats.Msg + subj *nats.Subscription + callbackFuncs map[string]func(err error, result []byte) } -func NewServe(host string, port string, moduleName string) *Serve { - if len(moduleName) == 0 { - return nil +func NewServe(host string, port string, projectName string, moduleName string) *Serve { + if strings.Trim(moduleName, " ") == "" { + moduleName = "default" + } else { + moduleName = strings.Trim(moduleName, " ") + } + + if strings.Trim(projectName, " ") == "" { + projectName = "default" + } else { + projectName = strings.Trim(projectName, " ") } serve := &Serve{} serve.NatsHost = host serve.NatsPort = port + serve.projectName = projectName serve.moduleName = moduleName serve.funcs = make(map[string]funcDesc) @@ -59,19 +117,56 @@ func (serve *Serve) Run() (err error) { if err != nil { return } + rnd := make([]byte, 16) + _, err = rand.Read(rnd) + if err != nil { + return + } + serve.callbackFuncs = make(map[string]func(err error, result []byte)) + serve.queueCallback = fmt.Sprintf("%s.%s.%x", serve.projectName, serve.moduleName, rnd) + serve.chanQueueCallback = make(chan *nats.Msg) + serve.subj, err = serve.nc.ChanSubscribe(serve.queueCallback, serve.chanQueueCallback) + go serve.worker() return } +func (serve *Serve) worker() { + for { + if msg, ok := <-serve.chanQueueCallback; !ok { + break + } else { + var err error + var payload network.Function + err = proto.Unmarshal(msg.Data, &payload) + if err != nil { + panic(err) + } + if f, ok := serve.callbackFuncs[payload.Metadata.CallID]; ok { + if payload.ErrNo != 0 { + err = fmt.Errorf(payload.Error) + } + f(err, payload.Payload) + delete(serve.callbackFuncs, payload.Metadata.CallID) + } + } + } +} + func (serve *Serve) Shutdown() { for _, value := range serve.funcs { value.subj.Unsubscribe() close(value.inputChan) } + close(serve.chanQueueCallback) serve.nc.Close() } -func (serve *Serve) RegisterFunction(funcName string, f func(header map[string]string, paylod []byte) (result []byte, err error)) (err error) { +func (serve *Serve) RegisterFunction(funcName string, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) { + return serve.RegisterFunctionWithTimeout(funcName, time.Second*10, f) +} + +func (serve *Serve) RegisterFunctionWithTimeout(funcName string, timeout time.Duration, f func(header *network.Header, paylod []byte) (result []byte, err error)) (err error) { if _, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)]; ok { return fmt.Errorf(`Restrict duplicate function name`) } @@ -83,19 +178,40 @@ func (serve *Serve) RegisterFunction(funcName string, f func(header map[string]s } fDesc := funcDesc{ - Name: fmt.Sprintf("%s.%s", serve.moduleName, funcName), - QueueCallback: fmt.Sprintf("%s.%s_%x", serve.moduleName, funcName, id), - F: f, + Name: fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), + F: f, } fDesc.inputChan = make(chan *nats.Msg) - fDesc.subj, err = serve.nc.ChanSubscribe(fmt.Sprintf("%s.%s", serve.moduleName, funcName), fDesc.inputChan) + fDesc.subj, err = serve.nc.ChanSubscribe(fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName), fDesc.inputChan) + fDesc.conn = serve.nc + fDesc.timeout = timeout go fDesc.worker() - serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)] = fDesc + serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, funcName)] = fDesc return } func (serve *Serve) HasFunction(name string) bool { - _, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, name)] + _, ok := serve.funcs[fmt.Sprintf("%s.%s.%s", serve.projectName, serve.moduleName, name)] return ok } + +func (serve *Serve) CallAsync(name string, header *network.Header, payload []byte, f func(err error, result []byte)) { + metadata := &network.Metadata{ + PackageType: network.Metadata_TYPE_REQUEST, + FuncName: name, + QueueCallback: serve.queueCallback, + CallID: uuid.NewV4().String(), + } + pkg := network.Function{ + Header: header, + Payload: payload, + Metadata: metadata, + } + buf, err := proto.Marshal(&pkg) + if err != nil { + panic(err) + } + serve.callbackFuncs[metadata.CallID] = f + serve.nc.Publish(name, buf) +} diff --git a/ytfunction_test.go b/ytfunction_test.go index 2b0b2e8..01a924c 100644 --- a/ytfunction_test.go +++ b/ytfunction_test.go @@ -4,17 +4,16 @@ import ( "fmt" "os" "testing" - "time" + "yt-function-sdk-go/network" ) var serve *Serve func setup() { - // TODO host, _ := os.LookupEnv("NATS_HOST") port, _ := os.LookupEnv("NATS_PORT") - serve = NewServe(host, port, "test") + serve = NewServe(host, port, "", "test") if serve == nil { panic(fmt.Errorf(`serve is not created`)) } @@ -35,14 +34,43 @@ func TestMain(m *testing.M) { } func TestRegisterFunction(t *testing.T) { - err := serve.RegisterFunction("testFunc", func(header map[string]string, paylod []byte) (result []byte, err error) { + err := serve.RegisterFunction("testFunc", func(header *network.Header, paylod []byte) (result []byte, err error) { fmt.Println(`Hello world`) return []byte("Hello world"), nil }) if err != nil { t.Error("Has error: ", err) } else if !serve.HasFunction("testFunc") { - t.Error(`Function "test" not found`) + t.Error(`Function "testFunc" not found`) + } +} + +func TestCallFunctionAsync(t *testing.T) { + var err error + err = serve.RegisterFunction("testFuncSimpleAsync", func(header *network.Header, paylod []byte) (result []byte, err error) { + return []byte("world"), nil + }) + if err != nil { + t.Error("Has error: ", err) + return + } + var result []byte + chanErr := make(chan error) + chanResult := make(chan []byte) + serve.CallAsync("default.test.testFuncSimpleAsync", nil, []byte("Hello"), func(err error, result []byte) { + chanErr <- err + chanResult <- result + }) + select { + case err = <-chanErr: + if err != nil { + t.Error(err) + return + } + case result = <-chanResult: + if string(result) != "world" { + t.Error("Function not returned \"world\"") + return + } } - time.Sleep(time.Second * 10) }