first commit
This commit is contained in:
commit
33ad822fdd
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
||||||
|
__debug
|
15
.vscode/launch.json
vendored
Normal file
15
.vscode/launch.json
vendored
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
{
|
||||||
|
// Используйте IntelliSense, чтобы узнать о возможных атрибутах.
|
||||||
|
// Наведите указатель мыши, чтобы просмотреть описания существующих атрибутов.
|
||||||
|
// Для получения дополнительной информации посетите: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"name": "Launch Package",
|
||||||
|
"type": "go",
|
||||||
|
"request": "launch",
|
||||||
|
"mode": "auto",
|
||||||
|
"program": "${fileDirname}"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
7
.vscode/settings.json
vendored
Normal file
7
.vscode/settings.json
vendored
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
{
|
||||||
|
"go.testEnvVars": {
|
||||||
|
"LOG_LEVEL": "debug",
|
||||||
|
"NATS_HOST": "localhost",
|
||||||
|
"NATS_PORT": "4222"
|
||||||
|
}
|
||||||
|
}
|
12
README.md
Normal file
12
README.md
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
# Набор инструментов для функций
|
||||||
|
|
||||||
|
# Пакет
|
||||||
|
|
||||||
|
Сообщение является бинарным пакетом. Для работы с ним его для начала необходимо десериализовать. Для этого используется protocol buffer
|
||||||
|
|
||||||
|
После десериализации присутствует 4 поля:
|
||||||
|
|
||||||
|
1. Параметры заголовка, которые передаются с пакетом (например http-заголовки). Формат заголовка типа key:string=valye:string
|
||||||
|
2. Название очереди-функции обратного вызова. Это название очереди, в которую необходимо вернуть результат
|
||||||
|
3. Бинарные данные, которые, которые передаются в функцию
|
||||||
|
4. Метаданные. В этом поле передаются служебные данные для самого SDK.
|
13
docker-compose.yml
Normal file
13
docker-compose.yml
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
version: "3.9"
|
||||||
|
services:
|
||||||
|
nats:
|
||||||
|
image: nats:2.9.3-scratch
|
||||||
|
ports:
|
||||||
|
- "4222:4222"
|
||||||
|
- "8222:8222"
|
||||||
|
#command: "--http_port 8222 "
|
||||||
|
networks: ["nats"]
|
||||||
|
|
||||||
|
networks:
|
||||||
|
nats:
|
||||||
|
name: nats
|
10
go.mod
Normal file
10
go.mod
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
module yt-function-sdk-go
|
||||||
|
|
||||||
|
go 1.19
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/nats-io/nats.go v1.18.0 // indirect
|
||||||
|
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||||
|
github.com/nats-io/nuid v1.0.1 // indirect
|
||||||
|
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b // indirect
|
||||||
|
)
|
13
go.sum
Normal file
13
go.sum
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
github.com/nats-io/nats.go v1.18.0 h1:o480Ao6kuSSFyJO75rGTXCEPj7LGkY84C1Ye+Uhm4c0=
|
||||||
|
github.com/nats-io/nats.go v1.18.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
|
||||||
|
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
|
||||||
|
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||||
|
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||||
|
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||||
|
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b h1:wSOdpTq0/eI46Ez/LkDwIsAKA71YP2SRKBODiRWM0as=
|
||||||
|
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
|
||||||
|
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||||
|
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||||
|
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||||
|
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||||
|
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
101
ytfunction.go
Normal file
101
ytfunction.go
Normal file
|
@ -0,0 +1,101 @@
|
||||||
|
package ytfunction
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/nats-io/nats.go"
|
||||||
|
)
|
||||||
|
|
||||||
|
type funcDesc struct {
|
||||||
|
Name string
|
||||||
|
F func(header map[string]string, payload []byte) (result []byte, err error)
|
||||||
|
QueueCallback string
|
||||||
|
inputChan chan *nats.Msg
|
||||||
|
subj *nats.Subscription
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fDesс *funcDesc) worker() {
|
||||||
|
for {
|
||||||
|
data, ok := <-fDesс.inputChan
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
result, err := fDesс.F(nil, data.Data)
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fmt.Println(result)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Serve struct {
|
||||||
|
NatsHost string
|
||||||
|
NatsPort string
|
||||||
|
nc *nats.Conn
|
||||||
|
moduleName string
|
||||||
|
funcs map[string]funcDesc
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewServe(host string, port string, moduleName string) *Serve {
|
||||||
|
if len(moduleName) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
serve := &Serve{}
|
||||||
|
serve.NatsHost = host
|
||||||
|
serve.NatsPort = port
|
||||||
|
serve.moduleName = moduleName
|
||||||
|
serve.funcs = make(map[string]funcDesc)
|
||||||
|
|
||||||
|
return serve
|
||||||
|
}
|
||||||
|
|
||||||
|
func (serve *Serve) Run() (err error) {
|
||||||
|
serve.nc, err = nats.Connect(fmt.Sprintf("nats://%s:%s", serve.NatsHost, serve.NatsPort))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (serve *Serve) Shutdown() {
|
||||||
|
for _, value := range serve.funcs {
|
||||||
|
value.subj.Unsubscribe()
|
||||||
|
close(value.inputChan)
|
||||||
|
}
|
||||||
|
serve.nc.Close()
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (serve *Serve) RegisterFunction(funcName string, f func(header map[string]string, paylod []byte) (result []byte, err error)) (err error) {
|
||||||
|
if _, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)]; ok {
|
||||||
|
return fmt.Errorf(`Restrict duplicate function name`)
|
||||||
|
}
|
||||||
|
id := make([]byte, 16)
|
||||||
|
|
||||||
|
_, err = rand.Read(id)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fDesc := funcDesc{
|
||||||
|
Name: fmt.Sprintf("%s.%s", serve.moduleName, funcName),
|
||||||
|
QueueCallback: fmt.Sprintf("%s.%s_%x", serve.moduleName, funcName, id),
|
||||||
|
F: f,
|
||||||
|
}
|
||||||
|
fDesc.inputChan = make(chan *nats.Msg)
|
||||||
|
fDesc.subj, err = serve.nc.ChanSubscribe(fmt.Sprintf("%s.%s", serve.moduleName, funcName), fDesc.inputChan)
|
||||||
|
go fDesc.worker()
|
||||||
|
|
||||||
|
serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, funcName)] = fDesc
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (serve *Serve) HasFunction(name string) bool {
|
||||||
|
_, ok := serve.funcs[fmt.Sprintf("%s.%s", serve.moduleName, name)]
|
||||||
|
return ok
|
||||||
|
}
|
48
ytfunction_test.go
Normal file
48
ytfunction_test.go
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
package ytfunction
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var serve *Serve
|
||||||
|
|
||||||
|
func setup() {
|
||||||
|
// TODO
|
||||||
|
host, _ := os.LookupEnv("NATS_HOST")
|
||||||
|
port, _ := os.LookupEnv("NATS_PORT")
|
||||||
|
|
||||||
|
serve = NewServe(host, port, "test")
|
||||||
|
if serve == nil {
|
||||||
|
panic(fmt.Errorf(`serve is not created`))
|
||||||
|
}
|
||||||
|
serve.Run()
|
||||||
|
}
|
||||||
|
|
||||||
|
func shutdown() {
|
||||||
|
// TODO
|
||||||
|
serve.Shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMain(m *testing.M) {
|
||||||
|
fmt.Println("Main")
|
||||||
|
setup()
|
||||||
|
code := m.Run()
|
||||||
|
shutdown()
|
||||||
|
os.Exit(code)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestRegisterFunction(t *testing.T) {
|
||||||
|
err := serve.RegisterFunction("testFunc", func(header map[string]string, paylod []byte) (result []byte, err error) {
|
||||||
|
fmt.Println(`Hello world`)
|
||||||
|
return []byte("Hello world"), nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Error("Has error: ", err)
|
||||||
|
} else if !serve.HasFunction("testFunc") {
|
||||||
|
t.Error(`Function "test" not found`)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second * 10)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user