From 7f62c1fd66ffeb7e46127e5972d814abb9299848 Mon Sep 17 00:00:00 2001 From: unwox Date: Fri, 25 Oct 2024 17:33:56 +0600 Subject: implement non-blocking IO operations --- worker.go | 241 ++++++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 142 insertions(+), 99 deletions(-) (limited to 'worker.go') diff --git a/worker.go b/worker.go index 54a3757..aa18b62 100644 --- a/worker.go +++ b/worker.go @@ -21,15 +21,8 @@ type HTTPResponse struct { Body string } -type Worker struct { - lua *Lua - routes map[string]LuaRef - started bool - mu sync.Mutex -} - func HandleHTTPRequest( - queue chan any, + queue chan *HTTPRequest, route string, req *http.Request, ) chan *HTTPResponse { @@ -42,6 +35,13 @@ func HandleHTTPRequest( return res } +type Worker struct { + lua *Lua + routes map[string]LuaRef + started bool + mu sync.Mutex +} + // NewWorker creates a new instance of Worker type. func NewWorker() *Worker { return &Worker { @@ -97,8 +97,8 @@ func (w *Worker) Start(argv []string, module map[string]any) error { return nil } -// Listen starts a goroutine listening/handling HTTP requests from the queue. -func (w *Worker) Listen(queue chan any) { +// Listen starts handling HTTP requests from the queue. +func (w *Worker) Listen(queue chan *HTTPRequest) { stringListToAny := func(slice []string) []any { res := []any{} for _, v := range slice { @@ -106,109 +106,127 @@ func (w *Worker) Listen(queue chan any) { } return res } - handle := func() { - defer w.lua.RestoreStackFunc()() - r := <- queue - - switch r.(type) { - case *HTTPRequest: - r := r.(*HTTPRequest) - if _, ok := w.routes[r.route]; !ok { - r.result <- &HTTPResponse { - Code: 404, - Headers: make(map[string]string), - Body: "not found", - } - log.Println("no corresponding route") - return - } + handle := func(r *HTTPRequest, yield func(), resume func() bool) { + l := w.lua.NewThread(yield, resume) + // Save a thread to a reference so it's not garbage collected + // before we are done with it. + ref := w.lua.PopToRef() + defer w.lua.Unref(ref) - w.lua.PushFromRef(w.routes[r.route]) - res := make(map[string]any) - res["method"] = r.request.Method - res["path"] = r.request.URL.Path - - fh := make(map[string]any) - for k := range r.request.Header { - fh[k] = r.request.Header.Get(k) + if _, ok := w.routes[r.route]; !ok { + r.result <- &HTTPResponse { + Code: 404, + Headers: make(map[string]string), + Body: "not found", } - res["headers"] = fh + log.Println("no corresponding route") + return + } - flatQr := make(map[string]any) - qr := r.request.URL.Query() - for k := range qr { - flatQr[k] = stringListToAny(qr[k]) - } - res["query"] = flatQr - - body, err := io.ReadAll(r.request.Body) - if err != nil { - r.result <- &HTTPResponse { - Code: 500, - Headers: make(map[string]string), - Body: "server error", - } - log.Println("could not read a request body:", err) - return - } - res["body"] = string(body) - - err = w.lua.PushObject(res) - if err != nil { - r.result <- &HTTPResponse { - Code: 500, - Headers: make(map[string]string), - Body: "server error", - } - log.Println("could not form a request to lua:", err) - return - } + l.PushFromRef(w.routes[r.route]) + res := make(map[string]any) + res["method"] = r.request.Method + res["path"] = r.request.URL.Path - err = w.lua.PCall(1, 3) + fh := make(map[string]any) + for k := range r.request.Header { + fh[k] = r.request.Header.Get(k) + } + res["headers"] = fh - if err != nil { - r.result <- &HTTPResponse { - Code: 500, - Headers: make(map[string]string), - Body: "server error", - } - log.Println("could not read a request body:", err) - return + flatQr := make(map[string]any) + qr := r.request.URL.Query() + for k := range qr { + flatQr[k] = stringListToAny(qr[k]) + } + res["query"] = flatQr + + body, err := io.ReadAll(r.request.Body) + if err != nil { + r.result <- &HTTPResponse{ + Code: 500, + Headers: make(map[string]string), + Body: "server error", } + log.Println("could not read request body:", err) + return + } + res["body"] = string(body) - code := w.lua.ToInt(-3) - rbody := w.lua.ToString(-1) - - // Parse headers. - headers := make(map[string]string) - w.lua.Pop(1) - w.lua.PushNil() - for w.lua.Next() { - if !w.lua.IsString(-2) || !w.lua.IsString(-2) { - w.lua.Pop(1) - continue - } - v := w.lua.ToString(-1) - w.lua.Pop(1) - // We must not pop the item key from the stack - // because otherwise C.lua_next won't work - // properly. - k := w.lua.ToString(-1) - headers[k] = v + err = l.PushObject(res) + if err != nil { + r.result <- &HTTPResponse{ + Code: 500, + Headers: make(map[string]string), + Body: "server error", } - r.result <- &HTTPResponse { - Code: int(code), - Headers: headers, - Body: rbody, + log.Println("could not form request to lua:", err) + return + } + + err = l.PCall(1, 3) + if err != nil { + r.result <- &HTTPResponse{ + Code: 500, + Headers: make(map[string]string), + Body: "server error", } + log.Println("could not process request:", err) + // TODO: print lua stack as well? + return + } - default: - log.Fatal("unknown request") + // TODO: probably it would be better to just use l.Scan() + // here but i'm not really sure if we want to have that + // overhead here. + code := l.ToInt(-3) + rbody := l.ToString(-1) + // Parse headers. + headers := make(map[string]string) + l.Pop(1) + l.PushNil() + for l.Next() { + if !l.IsString(-2) || !l.IsString(-2) { + l.Pop(1) + continue + } + v := l.ToString(-1) + l.Pop(1) + // We must not pop the item key from the stack + // because otherwise C.lua_next won't work + // properly. + k := l.ToString(-1) + headers[k] = v + } + r.result <- &HTTPResponse{ + Code: int(code), + Headers: headers, + Body: rbody, } } + resCh := make(chan func() bool, 4096) +outer: for { - handle() + select { + case r, ok := <- queue: + // accept new requests + if !ok { + break outer + } + resCh <- NewCoroutine(func(yield func(), resume func() bool) { + handle(r, yield, func () bool { + resCh <- resume + return true + }) + }) + case resume, ok := <-resCh: + // coroutine executor + if !ok { + break outer + } + resume() + } } } @@ -237,3 +255,28 @@ func (w *Worker) Stop() { func (w *Worker) HasSameLua(l *Lua) bool { return w.lua == l } + +func NewCoroutine(f func (yield func(), resume func() bool)) (resume func() bool) { + cin := make(chan bool) + cout := make(chan bool) + running := true + resume = func() bool { + if !running { + return false + } + cin <- true + <-cout + return true + } + yield := func() { + cout <- true + <-cin + } + go func() { + <-cin + f(yield, resume) + running = false + cout <- true + }() + return resume +} -- cgit v1.2.3