Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 341 additions & 0 deletions background_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
package frankenphp

// #include <stdint.h>
// #include "frankenphp.h"
import "C"
import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"
"unsafe"
)

// BackgroundWorkerLookup maps worker names to registries, enabling multiple entrypoint files.
type BackgroundWorkerLookup struct {
byName map[string]*BackgroundWorkerRegistry
catchAll *BackgroundWorkerRegistry
}

func NewBackgroundWorkerLookup() *BackgroundWorkerLookup {
return &BackgroundWorkerLookup{
byName: make(map[string]*BackgroundWorkerRegistry),
}
}

// NewBackgroundWorkerLookupWithCatchAll is a convenience constructor for tests.
func NewBackgroundWorkerLookupWithCatchAll(entrypoint string) *BackgroundWorkerLookup {
l := NewBackgroundWorkerLookup()
l.catchAll = NewBackgroundWorkerRegistry(entrypoint)
return l
}

func (l *BackgroundWorkerLookup) AddNamed(name string, registry *BackgroundWorkerRegistry) {
l.byName[name] = registry
}

func (l *BackgroundWorkerLookup) SetCatchAll(registry *BackgroundWorkerRegistry) {
l.catchAll = registry
}

// Resolve returns the registry for the given name, falling back to catch-all.
func (l *BackgroundWorkerLookup) Resolve(name string) *BackgroundWorkerRegistry {
if r, ok := l.byName[name]; ok {
return r
}
return l.catchAll
}

// StartAutoWorkers iterates unique registries and calls startAutoWorkers on each.
// Deprecated: auto-start is now handled via initWorkers. Kept for test compatibility.
func (l *BackgroundWorkerLookup) StartAutoWorkers() error {
return nil
}

type backgroundWorkerState struct {
varsPtr unsafe.Pointer // *C.HashTable, persistent, managed by C
mu sync.RWMutex
varsVersion atomic.Uint64 // incremented on each set_vars call
ready chan struct{}
readyOnce sync.Once
}

type BackgroundWorkerRegistry struct {
entrypoint string
num int // threads per background worker (0 = lazy-start with 1 thread)
maxWorkers int // max lazy-started instances (0 = unlimited)
autoStartNames []string // names to start at boot when num >= 1
mu sync.Mutex
workers map[string]*backgroundWorkerState
}

func NewBackgroundWorkerRegistry(entrypoint string) *BackgroundWorkerRegistry {
return &BackgroundWorkerRegistry{
entrypoint: entrypoint,
workers: make(map[string]*backgroundWorkerState),
}
}

func (registry *BackgroundWorkerRegistry) Entrypoint() string {
return registry.entrypoint
}

func (registry *BackgroundWorkerRegistry) Num() int {
if registry.num <= 0 {
return 0
}
return registry.num
}

func (registry *BackgroundWorkerRegistry) MaxThreads() int {
if registry.num > 0 {
return registry.num
}
return 1
}

func (registry *BackgroundWorkerRegistry) SetNum(num int) {
registry.num = num
}

func (registry *BackgroundWorkerRegistry) AddAutoStartNames(names ...string) {
registry.autoStartNames = append(registry.autoStartNames, names...)
}

func (registry *BackgroundWorkerRegistry) SetMaxWorkers(max int) {
registry.maxWorkers = max
}

func (registry *BackgroundWorkerRegistry) reserve(name string) (*backgroundWorkerState, bool, error) {
registry.mu.Lock()
defer registry.mu.Unlock()

if bgw := registry.workers[name]; bgw != nil {
return bgw, true, nil
}

if registry.maxWorkers > 0 && len(registry.workers) >= registry.maxWorkers {
return nil, false, fmt.Errorf("cannot start background worker %q: limit of %d reached - increase max_threads on the catch-all background worker or declare it as a named worker", name, registry.maxWorkers)
}

bgw := &backgroundWorkerState{
ready: make(chan struct{}),
}
registry.workers[name] = bgw

return bgw, false, nil
}

func (registry *BackgroundWorkerRegistry) remove(name string, bgw *backgroundWorkerState) {
registry.mu.Lock()
defer registry.mu.Unlock()

if registry.workers[name] == bgw {
delete(registry.workers, name)
}
}

func startBackgroundWorker(thread *phpThread, bgWorkerName string) error {
if bgWorkerName == "" {
return fmt.Errorf("background worker name must not be empty")
}

lookup := getLookup(thread)
if lookup == nil {
return fmt.Errorf("no background worker configured in this php_server")
}

registry := lookup.Resolve(bgWorkerName)
if registry == nil || registry.entrypoint == "" {
return fmt.Errorf("no background worker configured in this php_server")
}

return startBackgroundWorkerWithRegistry(registry, bgWorkerName)
}

func startBackgroundWorkerWithRegistry(registry *BackgroundWorkerRegistry, bgWorkerName string) error {
bgw, exists, err := registry.reserve(bgWorkerName)
if err != nil {
return err
}
if exists {
return nil
}

numThreads := registry.MaxThreads()

worker, err := newWorker(workerOpt{
name: bgWorkerName,
fileName: registry.entrypoint,
num: numThreads,
env: PrepareEnv(nil),
watch: []string{},
maxConsecutiveFailures: -1,
})
if err != nil {
registry.remove(bgWorkerName, bgw)

return fmt.Errorf("failed to create background worker: %w", err)
}

worker.isBackgroundWorker = true
worker.backgroundWorker = bgw
worker.backgroundRegistry = registry

for i := 0; i < numThreads; i++ {
bgWorkerThread := getInactivePHPThread()
if bgWorkerThread == nil {
if i == 0 {
registry.remove(bgWorkerName, bgw)
}

return fmt.Errorf("no available PHP thread for background worker (increase max_threads)")
}

scalingMu.Lock()
workers = append(workers, worker)
scalingMu.Unlock()

convertToBackgroundWorkerThread(bgWorkerThread, worker)
}

if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started", slog.String("name", bgWorkerName), slog.Int("threads", numThreads))
}

return nil
}

func getLookup(thread *phpThread) *BackgroundWorkerLookup {
if handler, ok := thread.handler.(*workerThread); ok && handler.worker.backgroundLookup != nil {
return handler.worker.backgroundLookup
}
if fc, ok := fromContext(thread.context()); ok {
return fc.backgroundLookup
}

return nil
}

// go_frankenphp_worker_get_vars starts background workers if needed, waits for them
// to be ready, takes read locks, copies vars via C helper, and releases locks.
// All locking/unlocking happens within this single Go call.
//
// callerVersions/outVersions: if callerVersions is non-nil and all versions match,
// the copy is skipped entirely (returns 1). outVersions receives current versions.
//
//export go_frankenphp_worker_get_vars
func go_frankenphp_worker_get_vars(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int, timeoutMs C.int, returnValue *C.zval, callerVersions *C.uint64_t, outVersions *C.uint64_t) *C.char {
thread := phpThreads[threadIndex]
lookup := getLookup(thread)
if lookup == nil {
return C.CString("no background worker configured in this php_server")
}

n := int(nameCount)
nameSlice := unsafe.Slice(names, n)
nameLenSlice := unsafe.Slice(nameLens, n)

sks := make([]*backgroundWorkerState, n)
goNames := make([]string, n)
for i := 0; i < n; i++ {
goNames[i] = C.GoStringN(nameSlice[i], C.int(nameLenSlice[i]))

// Start background worker if not already running
if err := startBackgroundWorker(thread, goNames[i]); err != nil {
return C.CString(err.Error())
}

registry := lookup.Resolve(goNames[i])
if registry == nil {
return C.CString("background worker not found: " + goNames[i])
}
registry.mu.Lock()
sks[i] = registry.workers[goNames[i]]
registry.mu.Unlock()
if sks[i] == nil {
return C.CString("background worker not found: " + goNames[i])
}
}

// Wait for all workers to be ready
timeout := time.Duration(timeoutMs) * time.Millisecond
timer := time.NewTimer(timeout)
defer timer.Stop()
for i, sk := range sks {
select {
case <-sk.ready:
// background worker has called set_vars
case <-timer.C:
return C.CString(fmt.Sprintf("timeout waiting for background worker: %s", goNames[i]))
}
}

// Fast path: if all caller versions match, skip lock + copy entirely.
// Read each version once and write to outVersions for the C side to compare.
if callerVersions != nil && outVersions != nil {
callerVSlice := unsafe.Slice(callerVersions, n)
outVSlice := unsafe.Slice(outVersions, n)
allMatch := true
for i, sk := range sks {
v := sk.varsVersion.Load()
outVSlice[i] = C.uint64_t(v)
if uint64(callerVSlice[i]) != v {
allMatch = false
}
}
if allMatch {
return nil // C side sees out == caller, uses cached value
}
}

// Take all read locks, collect pointers, copy via C helper, then release
ptrs := make([]unsafe.Pointer, n)
for i, sk := range sks {
sk.mu.RLock()
ptrs[i] = sk.varsPtr
}

C.frankenphp_worker_copy_vars(returnValue, C.int(n), names, nameLens, (*unsafe.Pointer)(unsafe.Pointer(&ptrs[0])))

// Write versions while locks are still held
if outVersions != nil {
outVSlice := unsafe.Slice(outVersions, n)
for i, sk := range sks {
outVSlice[i] = C.uint64_t(sk.varsVersion.Load())
}
}

for _, sk := range sks {
sk.mu.RUnlock()
}

return nil
}

//export go_frankenphp_worker_set_vars
func go_frankenphp_worker_set_vars(threadIndex C.uintptr_t, varsPtr unsafe.Pointer, oldPtr *unsafe.Pointer) *C.char {
thread := phpThreads[threadIndex]

bgHandler, ok := thread.handler.(*backgroundWorkerThread)
if !ok || bgHandler.worker.backgroundWorker == nil {
return C.CString("frankenphp_worker_set_vars() can only be called from a background worker")
}
handler := &bgHandler.workerThread

sk := handler.worker.backgroundWorker

sk.mu.Lock()
*oldPtr = sk.varsPtr
sk.varsPtr = varsPtr
sk.varsVersion.Add(1)
sk.mu.Unlock()

sk.readyOnce.Do(func() {
handler.markBackgroundReady()
close(sk.ready)
})

return nil
}
Loading
Loading