summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorunwox <me@unwox.com>2025-01-27 16:29:14 +0600
committerunwox <me@unwox.com>2025-01-27 16:41:01 +0600
commitc71a1b5ab5d7fbbf4613f0e130a205134875092f (patch)
treed6d3ceb44c1b5b818c95d83c61034c7f07756ec7
parent34cc55a0fe7e10e12cabac4654898762095b6043 (diff)
implement DB connections pooling to mitigate blocking nature of mattn/sqlite3
-rw-r--r--main.go95
-rw-r--r--pool.go42
2 files changed, 91 insertions, 46 deletions
diff --git a/main.go b/main.go
index 93c46c3..d78c95c 100644
--- a/main.go
+++ b/main.go
@@ -45,17 +45,11 @@ func main() {
wrks := []*Worker{}
// track routes for mux to avoid registering the same route twice
routes := make(map[string]bool)
- // track open dbs to close them on exit
- dbs := []*sql.DB{}
+ // since mattn/sqlite3 library doesn't do connection pooling we have
+ // to do it ourselves. otherwise sqlite becomes the bottleneck when
+ // there are many parallel requests to the same DB.
+ dbs := make(map[string]*Pool[sql.DB])
mu := sync.Mutex{}
- defer func() {
- for _, db := range dbs {
- if db == nil {
- continue
- }
- db.Close()
- }
- }()
// define luna.router module
routeModule := make(map[string]any)
@@ -187,23 +181,36 @@ func main() {
if err != nil {
return luaErr(l, err)
}
- db, err := sql.Open("sqlite3", file)
+ mu.Lock()
+ pool, ok := dbs[file]
+ if !ok {
+ pool = &Pool[sql.DB]{}
+ pool.New = func () (*sql.DB, error) {
+ return sql.Open("sqlite3", file)
+ }
+ dbs[file] = pool
+ }
+ // prepopulate pool with at least one connection so we know
+ // if there is an error
+ db, err := dbs[file].New()
if err != nil {
return luaErr(l, err)
}
- mu.Lock()
- dbs = append(dbs, db)
+ dbs[file].Put(db)
mu.Unlock()
- h := cgo.NewHandle(db)
- return luaOk(l, int(h))
+ return luaOk(l, file)
}
dbModule["begin"] = func (l *Lua) int {
- var handle cgo.Handle
- err := l.Scan(&handle)
+ var file string
+ err := l.Scan(&file)
+ if err != nil {
+ return luaErr(l, err)
+ }
+ db, err := dbs[file].Get()
if err != nil {
return luaErr(l, err)
}
- db := handle.Value().(*sql.DB)
+ defer dbs[file].Put(db)
tx, err := db.Begin()
if err != nil {
return luaErr(l, err)
@@ -255,14 +262,17 @@ func main() {
return luaOk(l, nil)
}
dbModule["exec"] = func (l *Lua) int {
- var handle cgo.Handle
- var query string
+ var file, query string
var params []any
- err := l.Scan(&handle, &query, &params)
+ err := l.Scan(&file, &query, &params)
if err != nil {
return luaErr(l, err)
}
- db := handle.Value().(*sql.DB)
+ db, err := dbs[file].Get()
+ if err != nil {
+ return luaErr(l, err)
+ }
+ defer dbs[file].Put(db)
_, err = db.Exec(query, params...)
if err != nil {
return luaErr(l, err)
@@ -270,15 +280,18 @@ func main() {
return luaOk(l, nil)
}
dbModule["query"] = func (l *Lua) int {
- var handle cgo.Handle
- var query string
+ var file, query string
var params []any
- err := l.Scan(&handle, &query, &params)
+ err := l.Scan(&file, &query, &params)
if err != nil {
return luaErr(l, err)
}
+ db, err := dbs[file].Get()
+ if err != nil {
+ return luaErr(l, err)
+ }
+ defer dbs[file].Put(db)
ares := []any{}
- db := handle.Value().(*sql.DB)
rows, err := db.Query(query, params...)
if err != nil {
return luaErr(l, err)
@@ -307,14 +320,17 @@ func main() {
return luaOk(l, ares)
}
dbModule["query*"] = func (l *Lua) int {
- var handle cgo.Handle
- var query string
+ var file, query string
var params []any
- err := l.Scan(&handle, &query, &params)
+ err := l.Scan(&file, &query, &params)
if err != nil {
return luaErr(l, err)
}
- db := handle.Value().(*sql.DB)
+ db, err := dbs[file].Get()
+ if err != nil {
+ return luaErr(l, err)
+ }
+ defer dbs[file].Put(db)
rows, err := db.Query(query, params...)
if err != nil {
return luaErr(l, err)
@@ -344,25 +360,12 @@ func main() {
return luaOk(l, ares)
}
dbModule["close"] = func (l *Lua) int {
- var handle cgo.Handle
- err := l.Scan(&handle)
- if err != nil {
- return luaErr(l, err)
- }
- db := handle.Value().(*sql.DB)
- err = db.Close()
+ var file string
+ err := l.Scan(&file)
if err != nil {
return luaErr(l, err)
}
- mu.Lock()
- for k, _db := range dbs {
- if db == _db {
- dbs[k] = nil
- break
- }
- }
- mu.Unlock()
- handle.Delete()
+ // FIXME: noop for now
return luaOk(l, nil)
}
diff --git a/pool.go b/pool.go
new file mode 100644
index 0000000..06e355c
--- /dev/null
+++ b/pool.go
@@ -0,0 +1,42 @@
+package main
+
+import (
+ "sync"
+)
+
+const MAX_POOL_SIZE = 50
+
+// Pool is a simple pool implementation which grows (up to MAX_POOL_SIZE) when
+// needed.
+type Pool[T any] struct {
+ New func() (*T, error)
+ pool [MAX_POOL_SIZE]*T
+ size int8
+ mu sync.Mutex
+}
+
+// Get returns an instance of T from the pool if it's has one. When it doesn't
+// - a new instance is created and returned.
+func (p *Pool[T]) Get() (*T, error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ last := p.size - 1
+ if last < 0 {
+ return p.New()
+ }
+ res := p.pool[last]
+ p.pool[last] = nil
+ p.size = last
+ return res, nil
+}
+
+// Put returns an instance of T to the pool. If the pool is full - does nothing.
+func (p *Pool[T]) Put(item *T) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.size + 1 > MAX_POOL_SIZE {
+ return
+ }
+ p.pool[p.size] = item
+ p.size = p.size + 1
+}