summaryrefslogtreecommitdiff
path: root/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'worker.go')
-rw-r--r--worker.go241
1 files changed, 142 insertions, 99 deletions
diff --git a/worker.go b/worker.go
index 54a3757..aa18b62 100644
--- a/worker.go
+++ b/worker.go
@@ -21,15 +21,8 @@ type HTTPResponse struct {
Body string
}
-type Worker struct {
- lua *Lua
- routes map[string]LuaRef
- started bool
- mu sync.Mutex
-}
-
func HandleHTTPRequest(
- queue chan any,
+ queue chan *HTTPRequest,
route string,
req *http.Request,
) chan *HTTPResponse {
@@ -42,6 +35,13 @@ func HandleHTTPRequest(
return res
}
+type Worker struct {
+ lua *Lua
+ routes map[string]LuaRef
+ started bool
+ mu sync.Mutex
+}
+
// NewWorker creates a new instance of Worker type.
func NewWorker() *Worker {
return &Worker {
@@ -97,8 +97,8 @@ func (w *Worker) Start(argv []string, module map[string]any) error {
return nil
}
-// Listen starts a goroutine listening/handling HTTP requests from the queue.
-func (w *Worker) Listen(queue chan any) {
+// Listen starts handling HTTP requests from the queue.
+func (w *Worker) Listen(queue chan *HTTPRequest) {
stringListToAny := func(slice []string) []any {
res := []any{}
for _, v := range slice {
@@ -106,109 +106,127 @@ func (w *Worker) Listen(queue chan any) {
}
return res
}
- handle := func() {
- defer w.lua.RestoreStackFunc()()
- r := <- queue
-
- switch r.(type) {
- case *HTTPRequest:
- r := r.(*HTTPRequest)
- if _, ok := w.routes[r.route]; !ok {
- r.result <- &HTTPResponse {
- Code: 404,
- Headers: make(map[string]string),
- Body: "not found",
- }
- log.Println("no corresponding route")
- return
- }
+ handle := func(r *HTTPRequest, yield func(), resume func() bool) {
+ l := w.lua.NewThread(yield, resume)
+ // Save a thread to a reference so it's not garbage collected
+ // before we are done with it.
+ ref := w.lua.PopToRef()
+ defer w.lua.Unref(ref)
- w.lua.PushFromRef(w.routes[r.route])
- res := make(map[string]any)
- res["method"] = r.request.Method
- res["path"] = r.request.URL.Path
-
- fh := make(map[string]any)
- for k := range r.request.Header {
- fh[k] = r.request.Header.Get(k)
+ if _, ok := w.routes[r.route]; !ok {
+ r.result <- &HTTPResponse {
+ Code: 404,
+ Headers: make(map[string]string),
+ Body: "not found",
}
- res["headers"] = fh
+ log.Println("no corresponding route")
+ return
+ }
- flatQr := make(map[string]any)
- qr := r.request.URL.Query()
- for k := range qr {
- flatQr[k] = stringListToAny(qr[k])
- }
- res["query"] = flatQr
-
- body, err := io.ReadAll(r.request.Body)
- if err != nil {
- r.result <- &HTTPResponse {
- Code: 500,
- Headers: make(map[string]string),
- Body: "server error",
- }
- log.Println("could not read a request body:", err)
- return
- }
- res["body"] = string(body)
-
- err = w.lua.PushObject(res)
- if err != nil {
- r.result <- &HTTPResponse {
- Code: 500,
- Headers: make(map[string]string),
- Body: "server error",
- }
- log.Println("could not form a request to lua:", err)
- return
- }
+ l.PushFromRef(w.routes[r.route])
+ res := make(map[string]any)
+ res["method"] = r.request.Method
+ res["path"] = r.request.URL.Path
- err = w.lua.PCall(1, 3)
+ fh := make(map[string]any)
+ for k := range r.request.Header {
+ fh[k] = r.request.Header.Get(k)
+ }
+ res["headers"] = fh
- if err != nil {
- r.result <- &HTTPResponse {
- Code: 500,
- Headers: make(map[string]string),
- Body: "server error",
- }
- log.Println("could not read a request body:", err)
- return
+ flatQr := make(map[string]any)
+ qr := r.request.URL.Query()
+ for k := range qr {
+ flatQr[k] = stringListToAny(qr[k])
+ }
+ res["query"] = flatQr
+
+ body, err := io.ReadAll(r.request.Body)
+ if err != nil {
+ r.result <- &HTTPResponse{
+ Code: 500,
+ Headers: make(map[string]string),
+ Body: "server error",
}
+ log.Println("could not read request body:", err)
+ return
+ }
+ res["body"] = string(body)
- code := w.lua.ToInt(-3)
- rbody := w.lua.ToString(-1)
-
- // Parse headers.
- headers := make(map[string]string)
- w.lua.Pop(1)
- w.lua.PushNil()
- for w.lua.Next() {
- if !w.lua.IsString(-2) || !w.lua.IsString(-2) {
- w.lua.Pop(1)
- continue
- }
- v := w.lua.ToString(-1)
- w.lua.Pop(1)
- // We must not pop the item key from the stack
- // because otherwise C.lua_next won't work
- // properly.
- k := w.lua.ToString(-1)
- headers[k] = v
+ err = l.PushObject(res)
+ if err != nil {
+ r.result <- &HTTPResponse{
+ Code: 500,
+ Headers: make(map[string]string),
+ Body: "server error",
}
- r.result <- &HTTPResponse {
- Code: int(code),
- Headers: headers,
- Body: rbody,
+ log.Println("could not form request to lua:", err)
+ return
+ }
+
+ err = l.PCall(1, 3)
+ if err != nil {
+ r.result <- &HTTPResponse{
+ Code: 500,
+ Headers: make(map[string]string),
+ Body: "server error",
}
+ log.Println("could not process request:", err)
+ // TODO: print lua stack as well?
+ return
+ }
- default:
- log.Fatal("unknown request")
+ // TODO: probably it would be better to just use l.Scan()
+ // here but i'm not really sure if we want to have that
+ // overhead here.
+ code := l.ToInt(-3)
+ rbody := l.ToString(-1)
+ // Parse headers.
+ headers := make(map[string]string)
+ l.Pop(1)
+ l.PushNil()
+ for l.Next() {
+ if !l.IsString(-2) || !l.IsString(-2) {
+ l.Pop(1)
+ continue
+ }
+ v := l.ToString(-1)
+ l.Pop(1)
+ // We must not pop the item key from the stack
+ // because otherwise C.lua_next won't work
+ // properly.
+ k := l.ToString(-1)
+ headers[k] = v
+ }
+ r.result <- &HTTPResponse{
+ Code: int(code),
+ Headers: headers,
+ Body: rbody,
}
}
+ resCh := make(chan func() bool, 4096)
+outer:
for {
- handle()
+ select {
+ case r, ok := <- queue:
+ // accept new requests
+ if !ok {
+ break outer
+ }
+ resCh <- NewCoroutine(func(yield func(), resume func() bool) {
+ handle(r, yield, func () bool {
+ resCh <- resume
+ return true
+ })
+ })
+ case resume, ok := <-resCh:
+ // coroutine executor
+ if !ok {
+ break outer
+ }
+ resume()
+ }
}
}
@@ -237,3 +255,28 @@ func (w *Worker) Stop() {
func (w *Worker) HasSameLua(l *Lua) bool {
return w.lua == l
}
+
+func NewCoroutine(f func (yield func(), resume func() bool)) (resume func() bool) {
+ cin := make(chan bool)
+ cout := make(chan bool)
+ running := true
+ resume = func() bool {
+ if !running {
+ return false
+ }
+ cin <- true
+ <-cout
+ return true
+ }
+ yield := func() {
+ cout <- true
+ <-cin
+ }
+ go func() {
+ <-cin
+ f(yield, resume)
+ running = false
+ cout <- true
+ }()
+ return resume
+}