diff options
| author | unwox <me@unwox.com> | 2025-10-30 17:22:26 +0600 |
|---|---|---|
| committer | unwox <me@unwox.com> | 2025-10-30 17:23:04 +0600 |
| commit | 8328e4d78455eaa3627455469c7517f6805c6da3 (patch) | |
| tree | 96991c3f0fce5c0b7ebf6a3a6cfa20fac9780781 | |
| parent | 06d94fe7f16e8985878a24332978731439599c4e (diff) | |
| -rw-r--r-- | lua.go | 21 | ||||
| -rw-r--r-- | lua_common.go | 6 | ||||
| -rw-r--r-- | luajit.go | 21 | ||||
| -rw-r--r-- | main.go | 36 | ||||
| -rw-r--r-- | worker.go | 102 |
5 files changed, 141 insertions, 45 deletions
@@ -36,6 +36,7 @@ type Lua struct { running **Lua yield func() resume func() bool + deferred func() } //export luna_run_go_func @@ -46,19 +47,26 @@ func luna_run_go_func(f C.uintptr_t) C.int { } // Start opens the Lua context with all built-in libraries. -func (l *Lua) Start() { +func (l *Lua) Start() error { + if l.l != nil { + return errors.New("already started") + } l.l = C.luaL_newstate() l.running = &l + l.yield = nil + l.resume = nil C.luaL_openlibs(l.l) // Put traceback function at the start of the stack so it's always // possible to refer to it via stack index 1. l.PushGoFunction(TracebackHandler) + return nil } // Close closes the Lua context. func (l *Lua) Close() { C.lua_close(l.l) + l.l = nil } // PCall calls a function with the stack index (-nargs-1) expecting nresults @@ -71,8 +79,13 @@ func (l *Lua) PCall(nargs int, nresults int, errfunc int) error { if res != C.LUA_OK { errMsg := l.ToString(-1) l.Pop(1) + l.deferred = nil return errors.New(errMsg) } + if l.deferred != nil { + l.deferred() + l.deferred = nil + } return nil } @@ -271,11 +284,7 @@ func (l *Lua) LoadAndCall(code string) error { l.Pop(1) return errors.New(errMsg) } - err := l.PCall(0, C.LUA_MULTRET, 1) - if err != nil { - return err - } - return nil + return l.PCall(0, C.LUA_MULTRET, 1) } // SetGlobal sets a global value at the -1 stack index with the name. diff --git a/lua_common.go b/lua_common.go index 29fdab9..616beea 100644 --- a/lua_common.go +++ b/lua_common.go @@ -42,12 +42,12 @@ func (l *Lua) PushAny(v any) error { switch v.(type) { case nil: l.PushNil() - case string: - v, _ := v.(string) - l.PushString(v) case func (l *Lua) int: v, _ := v.(func (l *Lua) int) l.PushGoFunction(v) + case string: + v, _ := v.(string) + l.PushString(v) case int: v, _ := v.(int) l.PushNumber(v) @@ -36,6 +36,7 @@ type Lua struct { running **Lua yield func() resume func() bool + deferred func() } //export luna_run_go_func @@ -46,19 +47,26 @@ func luna_run_go_func(f C.uintptr_t) C.int { } // Start opens the Lua context with all built-in libraries. -func (l *Lua) Start() { +func (l *Lua) Start() error { + if l.l != nil { + return errors.New("already started") + } l.l = C.luaL_newstate() l.running = &l + l.yield = nil + l.resume = nil C.luaL_openlibs(l.l) // Put traceback function at the start of the stack so it's always // possible to refer to it via stack index 1. l.PushGoFunction(TracebackHandler) + return nil } // Close closes the Lua context. func (l *Lua) Close() { C.lua_close(l.l) + l.l = nil } // PCall calls a function with the stack index (-nargs-1) expecting nresults @@ -69,8 +77,13 @@ func (l *Lua) PCall(nargs int, nresults int, errfunc int) error { if res != C.LUA_OK { errMsg := l.ToString(-1) l.Pop(1) + l.deferred = nil return errors.New(errMsg) } + if l.deferred != nil { + l.deferred() + l.deferred = nil + } return nil } @@ -268,11 +281,7 @@ func (l *Lua) LoadAndCall(code string) error { l.Pop(1) return errors.New(errMsg) } - err := l.PCall(0, C.LUA_MULTRET, 1) - if err != nil { - return err - } - return nil + return l.PCall(0, C.LUA_MULTRET, 1) } // SetGlobal sets a global value at the -1 stack index with the name. @@ -613,6 +613,34 @@ func main() { module["fs"] = fsModule module["image"] = imageModule module["crypto"] = cryptoModule + module["restart"] = func (l *Lua) int { + running := false + l.deferred = func () { + if running { + return + } + for i, wrk := range wrks { + if wrk.HasSameLua(l) == false { + continue + } + running = true + start := time.Now() + err := wrk.Restart() + if err != nil { + log.Printf("error restarting:\n%s\n", err.Error()) + return + } + log.Printf( + "worker %d restarted in %s\n", + i, + time.Now().Sub(start), + ) + go wrk.Listen(msgs) + } + } + + return luaOk(l, nil) + } module["on-eval"] = func (l *Lua) int { var evalFn LuaRef err := l.Scan(&evalFn) @@ -638,12 +666,16 @@ func main() { wrk := NewWorker() wrks = append(wrks, wrk) go func () { + start := time.Now() err := wrk.Start(luaArgv, module) if err != nil { log.Fatal(err) } - wrks = append(wrks, wrk) - log.Printf("worker %d started\n", i) + log.Printf( + "worker %d started in %s\n", + i, + time.Now().Sub(start), + ) wg.Add(-1) wrk.Listen(msgs) }() @@ -3,12 +3,12 @@ package main import ( _ "embed" "errors" + "fmt" "io" "log" "net/http" "os" "strings" - "sync" ) type HTTPRequest struct { @@ -45,16 +45,15 @@ type Worker struct { lua *Lua routes map[string]LuaRef started bool - mu sync.Mutex + argv []string + module map[string]any + stop chan bool evalFn *LuaRef } // NewWorker creates a new instance of Worker type. func NewWorker() *Worker { - return &Worker { - routes: make(map[string]LuaRef), - lua: &Lua{}, - } + return &Worker { routes: make(map[string]LuaRef) } } // Start starts the worker: @@ -66,13 +65,15 @@ func (w *Worker) Start(argv []string, module map[string]any) error { if len(argv) == 0 { return errors.New("argv must at least contain lua file name") } - w.mu.Lock() - defer w.mu.Unlock() - if w.started { return errors.New("already started") } - w.lua.Start() + + w.lua = &Lua{} + err := w.lua.Start() + if err != nil { + return fmt.Errorf("lua error: %w", err) + } defer w.lua.RestoreStackFunc()() // emulate passing arguments to the loaded chunk @@ -80,7 +81,7 @@ func (w *Worker) Start(argv []string, module map[string]any) error { for _, arg := range argv { args = append(args, arg) } - err := w.lua.PushArray(args) + err = w.lua.PushArray(args) if err != nil { return err } @@ -123,11 +124,51 @@ func (w *Worker) Start(argv []string, module map[string]any) error { } w.started = true + w.argv = argv + w.module = module + return nil +} + +// Stop stops the worker closing the Lua context. +func (w *Worker) Stop() error { + if !w.started { + return errors.New("not started yet") + } + if w.stop != nil { + w.stop <- true + } + w.started = false + w.evalFn = nil + w.routes = make(map[string]LuaRef) + w.lua.Close() + return nil +} + +// Restart restarts worker with previous arguments. +func (w *Worker) Restart() error { + if !w.started { + return errors.New("not started yet") + } + oldLua := w.lua + w.started = false + w.routes = make(map[string]LuaRef) + err := w.Start(w.argv, w.module) + if err != nil { + w.lua.Close() + w.lua = oldLua + w.started = true + return err + } + if w.stop != nil { + w.stop <- true + } + oldLua.Close() return nil } // Listen starts handling HTTP requests from the queue. func (w *Worker) Listen(queue chan *HTTPRequest) { + w.stop = make(chan bool) stringListToAny := func(slice []string) []any { res := []any{} for _, v := range slice { @@ -169,6 +210,12 @@ func (w *Worker) Listen(queue chan *HTTPRequest) { flatQr[k] = stringListToAny(qr[k]) } res["query"] = flatQr + + cookies := make(map[string]any) + for _, cookie := range req.Cookies() { + cookies[cookie.Name] = cookie.Value + } + res["cookies"] = cookies // if request body is a multipart form: automatically parse it, // save the files and form values and put them in to the @@ -278,7 +325,7 @@ func (w *Worker) Listen(queue chan *HTTPRequest) { return } - // TODO: probably it would be better to just use l.Scan() + // probably it would be better to just use l.Scan() // here but i'm not really sure if we want to have that // overhead here. code := l.ToInt(-3) @@ -307,24 +354,26 @@ func (w *Worker) Listen(queue chan *HTTPRequest) { } } - resCh := make(chan func() bool, 4096) + corQueue := make(chan func() bool, 4096) outer: for { select { - case r, ok := <- queue: + case <- w.stop: + break outer + case req, ok := <- queue: // accept new requests if !ok { break outer } - resCh <- NewCoroutine( + corQueue <- NewCoroutine( func(yield func(), resume func() bool) { - handle(r, yield, func () bool { - resCh <- resume + handle(req, yield, func () bool { + corQueue <- resume return true }) }, ) - case resume, ok := <-resCh: + case resume, ok := <-corQueue: // coroutine executor if !ok { break outer @@ -332,14 +381,19 @@ outer: resume() } } + + // exhaust queue with coroutines so every request that started + // processing will be answered + for i := 0; i < len(corQueue); i++ { + resume := <- corQueue + resume() + } } // Eval evaluates the code in the Lua context. Not safe for execution when // there are requests in the processing queue, only meant for development // purposes. func (w *Worker) Eval(code string) error { - w.mu.Lock() - defer w.mu.Unlock() if w.evalFn != nil { // FIXME: does this branch pollute stack? w.lua.PushFromRef(*w.evalFn) @@ -355,14 +409,6 @@ func (w *Worker) SetRoute(route string, handler LuaRef) { w.routes[route] = handler } -// Stop stops the worker closing the Lua context. TODO: stop Listen goroutine -// as well. -func (w *Worker) Stop() { - w.mu.Lock() - defer w.mu.Unlock() - w.lua.Close() -} - // HasSameLua checks if the Lua context belongs to the worker. func (w *Worker) HasSameLua(l *Lua) bool { return w.lua == l |
