diff options
| author | unwox <me@unwox.com> | 2025-10-30 17:22:26 +0600 |
|---|---|---|
| committer | unwox <me@unwox.com> | 2025-10-30 17:23:04 +0600 |
| commit | 8328e4d78455eaa3627455469c7517f6805c6da3 (patch) | |
| tree | 96991c3f0fce5c0b7ebf6a3a6cfa20fac9780781 /worker.go | |
| parent | 06d94fe7f16e8985878a24332978731439599c4e (diff) | |
Diffstat (limited to 'worker.go')
| -rw-r--r-- | worker.go | 102 |
1 files changed, 74 insertions, 28 deletions
@@ -3,12 +3,12 @@ package main import ( _ "embed" "errors" + "fmt" "io" "log" "net/http" "os" "strings" - "sync" ) type HTTPRequest struct { @@ -45,16 +45,15 @@ type Worker struct { lua *Lua routes map[string]LuaRef started bool - mu sync.Mutex + argv []string + module map[string]any + stop chan bool evalFn *LuaRef } // NewWorker creates a new instance of Worker type. func NewWorker() *Worker { - return &Worker { - routes: make(map[string]LuaRef), - lua: &Lua{}, - } + return &Worker { routes: make(map[string]LuaRef) } } // Start starts the worker: @@ -66,13 +65,15 @@ func (w *Worker) Start(argv []string, module map[string]any) error { if len(argv) == 0 { return errors.New("argv must at least contain lua file name") } - w.mu.Lock() - defer w.mu.Unlock() - if w.started { return errors.New("already started") } - w.lua.Start() + + w.lua = &Lua{} + err := w.lua.Start() + if err != nil { + return fmt.Errorf("lua error: %w", err) + } defer w.lua.RestoreStackFunc()() // emulate passing arguments to the loaded chunk @@ -80,7 +81,7 @@ func (w *Worker) Start(argv []string, module map[string]any) error { for _, arg := range argv { args = append(args, arg) } - err := w.lua.PushArray(args) + err = w.lua.PushArray(args) if err != nil { return err } @@ -123,11 +124,51 @@ func (w *Worker) Start(argv []string, module map[string]any) error { } w.started = true + w.argv = argv + w.module = module + return nil +} + +// Stop stops the worker closing the Lua context. +func (w *Worker) Stop() error { + if !w.started { + return errors.New("not started yet") + } + if w.stop != nil { + w.stop <- true + } + w.started = false + w.evalFn = nil + w.routes = make(map[string]LuaRef) + w.lua.Close() + return nil +} + +// Restart restarts worker with previous arguments. +func (w *Worker) Restart() error { + if !w.started { + return errors.New("not started yet") + } + oldLua := w.lua + w.started = false + w.routes = make(map[string]LuaRef) + err := w.Start(w.argv, w.module) + if err != nil { + w.lua.Close() + w.lua = oldLua + w.started = true + return err + } + if w.stop != nil { + w.stop <- true + } + oldLua.Close() return nil } // Listen starts handling HTTP requests from the queue. func (w *Worker) Listen(queue chan *HTTPRequest) { + w.stop = make(chan bool) stringListToAny := func(slice []string) []any { res := []any{} for _, v := range slice { @@ -169,6 +210,12 @@ func (w *Worker) Listen(queue chan *HTTPRequest) { flatQr[k] = stringListToAny(qr[k]) } res["query"] = flatQr + + cookies := make(map[string]any) + for _, cookie := range req.Cookies() { + cookies[cookie.Name] = cookie.Value + } + res["cookies"] = cookies // if request body is a multipart form: automatically parse it, // save the files and form values and put them in to the @@ -278,7 +325,7 @@ func (w *Worker) Listen(queue chan *HTTPRequest) { return } - // TODO: probably it would be better to just use l.Scan() + // probably it would be better to just use l.Scan() // here but i'm not really sure if we want to have that // overhead here. code := l.ToInt(-3) @@ -307,24 +354,26 @@ func (w *Worker) Listen(queue chan *HTTPRequest) { } } - resCh := make(chan func() bool, 4096) + corQueue := make(chan func() bool, 4096) outer: for { select { - case r, ok := <- queue: + case <- w.stop: + break outer + case req, ok := <- queue: // accept new requests if !ok { break outer } - resCh <- NewCoroutine( + corQueue <- NewCoroutine( func(yield func(), resume func() bool) { - handle(r, yield, func () bool { - resCh <- resume + handle(req, yield, func () bool { + corQueue <- resume return true }) }, ) - case resume, ok := <-resCh: + case resume, ok := <-corQueue: // coroutine executor if !ok { break outer @@ -332,14 +381,19 @@ outer: resume() } } + + // exhaust queue with coroutines so every request that started + // processing will be answered + for i := 0; i < len(corQueue); i++ { + resume := <- corQueue + resume() + } } // Eval evaluates the code in the Lua context. Not safe for execution when // there are requests in the processing queue, only meant for development // purposes. func (w *Worker) Eval(code string) error { - w.mu.Lock() - defer w.mu.Unlock() if w.evalFn != nil { // FIXME: does this branch pollute stack? w.lua.PushFromRef(*w.evalFn) @@ -355,14 +409,6 @@ func (w *Worker) SetRoute(route string, handler LuaRef) { w.routes[route] = handler } -// Stop stops the worker closing the Lua context. TODO: stop Listen goroutine -// as well. -func (w *Worker) Stop() { - w.mu.Lock() - defer w.mu.Unlock() - w.lua.Close() -} - // HasSameLua checks if the Lua context belongs to the worker. func (w *Worker) HasSameLua(l *Lua) bool { return w.lua == l |
