commit 33ad822fdd59344f15e01c4af35c257516eae2f9 Author: ymnuk Date: Sun Oct 23 20:12:23 2022 +0300 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..438f73b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__debug \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..a1b4469 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + // Используйте IntelliSense, чтобы узнать о возможных атрибутах. + // Наведите указатель мыши, чтобы просмотреть описания существующих атрибутов. + // Для получения дополнительной информации посетите: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Package", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${fileDirname}" + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..fe9d5c5 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "go.testEnvVars": { + "LOG_LEVEL": "debug", + "NATS_HOST": "localhost", + "NATS_PORT": "4222" + } +} diff --git a/README.md b/README.md new file mode 100644 index 0000000..145cf6f --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +# Набор инструментов для функций + +# Пакет + +Сообщение является бинарным пакетом. Для работы с ним его для начала необходимо десериализовать. Для этого используется protocol buffer + +После десериализации присутствует 4 поля: + +1. Параметры заголовка, которые передаются с пакетом (например http-заголовки). Формат заголовка типа key:string=valye:string +2. Название очереди-функции обратного вызова. Это название очереди, в которую необходимо вернуть результат +3. Бинарные данные, которые, которые передаются в функцию +4. Метаданные. В этом поле передаются служебные данные для самого SDK. diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..e0f9d44 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,13 @@ +version: "3.9" +services: + nats: + image: nats:2.9.3-scratch + ports: + - "4222:4222" + - "8222:8222" + #command: "--http_port 8222 " + networks: ["nats"] + +networks: + nats: + name: nats \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a981091 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module yt-function-sdk-go + +go 1.19 + +require ( + github.com/nats-io/nats.go v1.18.0 // indirect + github.com/nats-io/nkeys v0.3.0 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e991af8 --- /dev/null +++ b/go.sum @@ -0,0 +1,13 @@ +github.com/nats-io/nats.go v1.18.0 h1:o480Ao6kuSSFyJO75rGTXCEPj7LGkY84C1Ye+Uhm4c0= +github.com/nats-io/nats.go v1.18.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= +github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as= +golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/ytfunction.go b/ytfunction.go new file mode 100644 index 0000000..3cd70cb --- /dev/null +++ b/ytfunction.go @@ -0,0 +1,101 @@ +package ytfunction + +import ( + "crypto/rand" + "fmt" + "log" + + "github.com/nats-io/nats.go" +) + +type funcDesc struct { + Name string + F func(header map[string]string, payload []byte) (result []byte, err error) + QueueCallback string + inputChan chan *nats.Msg + subj *nats.Subscription +} + +func (fDesс *funcDesc) worker() { + for { + data, ok := <-fDesс.inputChan + if !ok { + break + } + result, err := fDesс.F(nil, data.Data) + if err != nil { + log.Println(err) + continue + } + fmt.Println(result) + + } +} + +type Serve struct { + NatsHost string + NatsPort string + nc *nats.Conn + moduleName string + funcs map[string]funcDesc +} + +func NewServe(host string, port string, moduleName string) *Serve { + if len(moduleName) == 0 { + return nil + } + + serve := &Serve{} + serve.NatsHost = host + serve.NatsPort = port + serve.moduleName = moduleName + serve.funcs = make(map[string]funcDesc) + + return serve +} + +func (serve *Serve) Run() (err error) { + serve.nc, err = nats.Connect(fmt.Sprintf("nats://%s:%s", serve.NatsHost, serve.NatsPort)) + if err != nil { + return + } + return +} + +func (serve *Serve) Shutdown() { + for _, value := range serve.funcs { + value.subj.Unsubscribe() + close(value.inputChan) + } + serve.nc.Close() + +} + +func (serve *Serve) RegisterFunction(funcName string, f func(header map[string]string, paylod []byte) (result []byte, err error)) (err error) { + if _, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)]; ok { + return fmt.Errorf(`Restrict duplicate function name`) + } + id := make([]byte, 16) + + _, err = rand.Read(id) + if err != nil { + return + } + + fDesc := funcDesc{ + Name: fmt.Sprintf("%s.%s", serve.moduleName, funcName), + QueueCallback: fmt.Sprintf("%s.%s_%x", serve.moduleName, funcName, id), + F: f, + } + fDesc.inputChan = make(chan *nats.Msg) + fDesc.subj, err = serve.nc.ChanSubscribe(fmt.Sprintf("%s.%s", serve.moduleName, funcName), fDesc.inputChan) + go fDesc.worker() + + serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)] = fDesc + return +} + +func (serve *Serve) HasFunction(name string) bool { + _, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, name)] + return ok +} diff --git a/ytfunction_test.go b/ytfunction_test.go new file mode 100644 index 0000000..2b0b2e8 --- /dev/null +++ b/ytfunction_test.go @@ -0,0 +1,48 @@ +package ytfunction + +import ( + "fmt" + "os" + "testing" + "time" +) + +var serve *Serve + +func setup() { + // TODO + host, _ := os.LookupEnv("NATS_HOST") + port, _ := os.LookupEnv("NATS_PORT") + + serve = NewServe(host, port, "test") + if serve == nil { + panic(fmt.Errorf(`serve is not created`)) + } + serve.Run() +} + +func shutdown() { + // TODO + serve.Shutdown() +} + +func TestMain(m *testing.M) { + fmt.Println("Main") + setup() + code := m.Run() + shutdown() + os.Exit(code) +} + +func TestRegisterFunction(t *testing.T) { + err := serve.RegisterFunction("testFunc", func(header map[string]string, paylod []byte) (result []byte, err error) { + fmt.Println(`Hello world`) + return []byte("Hello world"), nil + }) + if err != nil { + t.Error("Has error: ", err) + } else if !serve.HasFunction("testFunc") { + t.Error(`Function "test" not found`) + } + time.Sleep(time.Second * 10) +}