From a281cbf97f8ba94c19edbca51be1e7ad1ea648eb Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Fri, 20 Mar 2026 21:07:37 +0100 Subject: [PATCH 1/2] feat: background workers = non-HTTP workers with shared state refactor: address review feedback on background workers - Use `name` instead of `match` for background worker identification - Combine start + wait + lock + copy + unlock into single CGo call (go_frankenphp_worker_get_vars replaces three separate exports) - Remove lockedVarsStacks, InitLockedVarsStacks, and varsVersion - Set FRANKENPHP_WORKER_NAME for all workers (HTTP and background) - Split worker_bg_name into worker_name + is_background_worker flag - Rename httpEnabled to isBackgroundWorker on Go side - Remove name validation regex (same rules as HTTP workers) - Keep $_SERVER['argv'] for background workers (bin/console compat) Add generational cache back Review by henderkes --- background_worker.go | 405 +++++++++ background_worker_test.go | 124 +++ caddy/app.go | 5 + caddy/module.go | 4 + caddy/workerconfig.go | 32 +- context.go | 15 +- docs/background-workers.md | 225 +++++ frankenphp.c | 772 +++++++++++++++++- frankenphp.go | 24 + frankenphp.h | 14 +- frankenphp.stub.php | 17 +- frankenphp_arginfo.h | 18 + frankenphp_test.go | 296 ++++++- options.go | 29 + phpmainthread.go | 4 + phpthread.go | 2 + requestoptions.go | 8 + scaling.go | 6 +- .../background-worker-binary-entrypoint.php | 12 + testdata/background-worker-binary-safe.php | 23 + testdata/background-worker-crash-starter.php | 10 + testdata/background-worker-crash.php | 18 + testdata/background-worker-dedup.php | 10 + ...kground-worker-enum-missing-entrypoint.php | 13 + testdata/background-worker-enum-missing.php | 14 + testdata/background-worker-helper.php | 14 + testdata/background-worker-identity.php | 11 + .../background-worker-multi-entrypoint-a.php | 8 + .../background-worker-multi-entrypoint-b.php | 8 + .../background-worker-multi-entrypoint.php | 10 + testdata/background-worker-multi-file.php | 8 + testdata/background-worker-multi.php | 17 + testdata/background-worker-no-entrypoint.php | 10 + .../background-worker-restart-entrypoint.php | 19 + testdata/background-worker-restart.php | 7 + ...round-worker-set-server-var-validation.php | 23 + testdata/background-worker-start-twice.php | 10 + testdata/background-worker-start.php | 10 + .../background-worker-stop-fd-entrypoint.php | 18 + ...d-worker-stop-fd-non-background-worker.php | 10 + testdata/background-worker-stop-fd.php | 6 + ...ound-worker-type-validation-entrypoint.php | 63 ++ .../background-worker-type-validation.php | 23 + testdata/background-worker-with-argv.php | 12 + threadbackgroundworker.go | 110 +++ threadinactive.go | 3 + threadregular.go | 3 + threadtasks_test.go | 3 + threadworker.go | 20 +- worker.go | 121 ++- 50 files changed, 2617 insertions(+), 30 deletions(-) create mode 100644 background_worker.go create mode 100644 background_worker_test.go create mode 100644 docs/background-workers.md create mode 100644 testdata/background-worker-binary-entrypoint.php create mode 100644 testdata/background-worker-binary-safe.php create mode 100644 testdata/background-worker-crash-starter.php create mode 100644 testdata/background-worker-crash.php create mode 100644 testdata/background-worker-dedup.php create mode 100644 testdata/background-worker-enum-missing-entrypoint.php create mode 100644 testdata/background-worker-enum-missing.php create mode 100644 testdata/background-worker-helper.php create mode 100644 testdata/background-worker-identity.php create mode 100644 testdata/background-worker-multi-entrypoint-a.php create mode 100644 testdata/background-worker-multi-entrypoint-b.php create mode 100644 testdata/background-worker-multi-entrypoint.php create mode 100644 testdata/background-worker-multi-file.php create mode 100644 testdata/background-worker-multi.php create mode 100644 testdata/background-worker-no-entrypoint.php create mode 100644 testdata/background-worker-restart-entrypoint.php create mode 100644 testdata/background-worker-restart.php create mode 100644 testdata/background-worker-set-server-var-validation.php create mode 100644 testdata/background-worker-start-twice.php create mode 100644 testdata/background-worker-start.php create mode 100644 testdata/background-worker-stop-fd-entrypoint.php create mode 100644 testdata/background-worker-stop-fd-non-background-worker.php create mode 100644 testdata/background-worker-stop-fd.php create mode 100644 testdata/background-worker-type-validation-entrypoint.php create mode 100644 testdata/background-worker-type-validation.php create mode 100644 testdata/background-worker-with-argv.php create mode 100644 threadbackgroundworker.go diff --git a/background_worker.go b/background_worker.go new file mode 100644 index 0000000000..df6154698e --- /dev/null +++ b/background_worker.go @@ -0,0 +1,405 @@ +package frankenphp + +// #include +// #include "frankenphp.h" +import "C" +import ( + "fmt" + "log/slog" + "sync" + "sync/atomic" + "time" + "unsafe" +) + +// defaultMaxBackgroundWorkers is the default safety cap for catch-all background workers. +const defaultMaxBackgroundWorkers = 16 + +// globalBackgroundLookup is set during initWorkers and provides access +// to background workers from any thread (including non-worker requests). +var globalBackgroundLookup *backgroundWorkerLookup + +// backgroundWorkerLookup maps worker names to registries, enabling multiple entrypoint files. +type backgroundWorkerLookup struct { + byName map[string]*backgroundWorkerRegistry + catchAll *backgroundWorkerRegistry +} + +func newBackgroundWorkerLookup() *backgroundWorkerLookup { + return &backgroundWorkerLookup{ + byName: make(map[string]*backgroundWorkerRegistry), + } +} + +// newBackgroundWorkerLookupWithCatchAll is a convenience constructor for tests. +func newBackgroundWorkerLookupWithCatchAll(entrypoint string) *backgroundWorkerLookup { + l := newBackgroundWorkerLookup() + l.catchAll = newBackgroundWorkerRegistry(entrypoint) + return l +} + +func (l *backgroundWorkerLookup) AddNamed(name string, registry *backgroundWorkerRegistry) { + l.byName[name] = registry +} + +func (l *backgroundWorkerLookup) SetCatchAll(registry *backgroundWorkerRegistry) { + l.catchAll = registry +} + +// Resolve returns the registry for the given name, falling back to catch-all. +func (l *backgroundWorkerLookup) Resolve(name string) *backgroundWorkerRegistry { + if r, ok := l.byName[name]; ok { + return r + } + return l.catchAll +} + +// StartAutoWorkers iterates unique registries and calls startAutoWorkers on each. +// Deprecated: auto-start is now handled via initWorkers. Kept for test compatibility. +func (l *backgroundWorkerLookup) StartAutoWorkers() error { + return nil +} + +type backgroundWorkerState struct { + varsPtr unsafe.Pointer // *C.HashTable, persistent, managed by C + mu sync.RWMutex + varsVersion atomic.Uint64 // incremented on each set_vars call + ready chan struct{} + readyOnce sync.Once +} + +type backgroundWorkerRegistry struct { + entrypoint string + num int // threads per background worker (0 = lazy-start with 1 thread) + maxWorkers int // max lazy-started instances (0 = unlimited) + autoStartNames []string // names to start at boot when num >= 1 + mu sync.Mutex + workers map[string]*backgroundWorkerState +} + +func newBackgroundWorkerRegistry(entrypoint string) *backgroundWorkerRegistry { + return &backgroundWorkerRegistry{ + entrypoint: entrypoint, + workers: make(map[string]*backgroundWorkerState), + } +} + +func (registry *backgroundWorkerRegistry) Entrypoint() string { + return registry.entrypoint +} + +func (registry *backgroundWorkerRegistry) Num() int { + if registry.num <= 0 { + return 0 + } + return registry.num +} + +func (registry *backgroundWorkerRegistry) MaxThreads() int { + if registry.num > 0 { + return registry.num + } + return 1 +} + +func (registry *backgroundWorkerRegistry) SetNum(num int) { + registry.num = num +} + +func (registry *backgroundWorkerRegistry) AddAutoStartNames(names ...string) { + registry.autoStartNames = append(registry.autoStartNames, names...) +} + +func (registry *backgroundWorkerRegistry) SetMaxWorkers(max int) { + registry.maxWorkers = max +} + +// buildBackgroundWorkerLookup constructs a backgroundWorkerLookup from worker +// options. Called during initWorkers — the lookup is then assigned to all workers. +func buildBackgroundWorkerLookup(workers []*worker, opts []workerOpt) *backgroundWorkerLookup { + var lookup *backgroundWorkerLookup + registries := make(map[string]*backgroundWorkerRegistry) + + for i, o := range opts { + if o.backgroundEntrypoint == "" { + continue + } + + if lookup == nil { + lookup = newBackgroundWorkerLookup() + } + + entrypoint := o.backgroundEntrypoint + registry, ok := registries[entrypoint] + if !ok { + registry = newBackgroundWorkerRegistry(entrypoint) + registries[entrypoint] = registry + } + + if !o.isBackgroundWorker { + // Non-background worker declaring a background entrypoint (catch-all) + maxW := o.backgroundMaxWorkers + if maxW <= 0 { + maxW = defaultMaxBackgroundWorkers + } + registry.SetMaxWorkers(maxW) + lookup.SetCatchAll(registry) + continue + } + + w := workers[i] + if w.name != "" && w.name != w.fileName { + // Named background worker + if o.num > 0 { + registry.AddAutoStartNames(w.name) + registry.SetNum(o.num) + } + lookup.AddNamed(w.name, registry) + } else { + // Catch-all background worker + maxW := o.backgroundMaxWorkers + if maxW <= 0 { + maxW = defaultMaxBackgroundWorkers + } + registry.SetMaxWorkers(maxW) + lookup.SetCatchAll(registry) + } + + w.backgroundRegistry = registry + } + + return lookup +} + +func (registry *backgroundWorkerRegistry) reserve(name string) (*backgroundWorkerState, bool, error) { + registry.mu.Lock() + defer registry.mu.Unlock() + + if bgw := registry.workers[name]; bgw != nil { + return bgw, true, nil + } + + if registry.maxWorkers > 0 && len(registry.workers) >= registry.maxWorkers { + return nil, false, fmt.Errorf("cannot start background worker %q: limit of %d reached - increase max_threads on the catch-all background worker or declare it as a named worker", name, registry.maxWorkers) + } + + bgw := &backgroundWorkerState{ + ready: make(chan struct{}), + } + registry.workers[name] = bgw + + return bgw, false, nil +} + +func (registry *backgroundWorkerRegistry) remove(name string, bgw *backgroundWorkerState) { + registry.mu.Lock() + defer registry.mu.Unlock() + + if registry.workers[name] == bgw { + delete(registry.workers, name) + } +} + +func startBackgroundWorker(thread *phpThread, bgWorkerName string) error { + if bgWorkerName == "" { + return fmt.Errorf("background worker name must not be empty") + } + + lookup := getLookup(thread) + if lookup == nil { + return fmt.Errorf("no background worker configured in this php_server") + } + + registry := lookup.Resolve(bgWorkerName) + if registry == nil || registry.entrypoint == "" { + return fmt.Errorf("no background worker configured in this php_server") + } + + return startBackgroundWorkerWithRegistry(registry, bgWorkerName) +} + +func startBackgroundWorkerWithRegistry(registry *backgroundWorkerRegistry, bgWorkerName string) error { + bgw, exists, err := registry.reserve(bgWorkerName) + if err != nil { + return err + } + if exists { + return nil + } + + numThreads := registry.MaxThreads() + + worker, err := newWorker(workerOpt{ + name: bgWorkerName, + fileName: registry.entrypoint, + num: numThreads, + env: PrepareEnv(nil), + watch: []string{}, + maxConsecutiveFailures: -1, + }) + if err != nil { + registry.remove(bgWorkerName, bgw) + + return fmt.Errorf("failed to create background worker: %w", err) + } + + worker.isBackgroundWorker = true + worker.backgroundWorker = bgw + worker.backgroundRegistry = registry + + for i := 0; i < numThreads; i++ { + bgWorkerThread := getInactivePHPThread() + if bgWorkerThread == nil { + if i == 0 { + registry.remove(bgWorkerName, bgw) + } + + return fmt.Errorf("no available PHP thread for background worker (increase max_threads)") + } + + scalingMu.Lock() + workers = append(workers, worker) + scalingMu.Unlock() + + convertToBackgroundWorkerThread(bgWorkerThread, worker) + } + + if globalLogger.Enabled(globalCtx, slog.LevelInfo) { + globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started", slog.String("name", bgWorkerName), slog.Int("threads", numThreads)) + } + + return nil +} + +func getLookup(thread *phpThread) *backgroundWorkerLookup { + if handler, ok := thread.handler.(*workerThread); ok && handler.worker.backgroundLookup != nil { + return handler.worker.backgroundLookup + } + if handler, ok := thread.handler.(*backgroundWorkerThread); ok && handler.worker.backgroundLookup != nil { + return handler.worker.backgroundLookup + } + + return globalBackgroundLookup +} + +// go_frankenphp_worker_get_vars starts background workers if needed, waits for them +// to be ready, takes read locks, copies vars via C helper, and releases locks. +// All locking/unlocking happens within this single Go call. +// +// callerVersions/outVersions: if callerVersions is non-nil and all versions match, +// the copy is skipped entirely (returns 1). outVersions receives current versions. +// +//export go_frankenphp_worker_get_vars +func go_frankenphp_worker_get_vars(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int, timeoutMs C.int, returnValue *C.zval, callerVersions *C.uint64_t, outVersions *C.uint64_t) *C.char { + thread := phpThreads[threadIndex] + lookup := getLookup(thread) + if lookup == nil { + return C.CString("no background worker configured in this php_server") + } + + n := int(nameCount) + nameSlice := unsafe.Slice(names, n) + nameLenSlice := unsafe.Slice(nameLens, n) + + sks := make([]*backgroundWorkerState, n) + goNames := make([]string, n) + for i := 0; i < n; i++ { + goNames[i] = C.GoStringN(nameSlice[i], C.int(nameLenSlice[i])) + + // Start background worker if not already running + if err := startBackgroundWorker(thread, goNames[i]); err != nil { + return C.CString(err.Error()) + } + + registry := lookup.Resolve(goNames[i]) + if registry == nil { + return C.CString("background worker not found: " + goNames[i]) + } + registry.mu.Lock() + sks[i] = registry.workers[goNames[i]] + registry.mu.Unlock() + if sks[i] == nil { + return C.CString("background worker not found: " + goNames[i]) + } + } + + // Wait for all workers to be ready + timeout := time.Duration(timeoutMs) * time.Millisecond + timer := time.NewTimer(timeout) + defer timer.Stop() + for i, sk := range sks { + select { + case <-sk.ready: + // background worker has called set_vars + case <-timer.C: + return C.CString(fmt.Sprintf("timeout waiting for background worker: %s", goNames[i])) + } + } + + // Fast path: if all caller versions match, skip lock + copy entirely. + // Read each version once and write to outVersions for the C side to compare. + if callerVersions != nil && outVersions != nil { + callerVSlice := unsafe.Slice(callerVersions, n) + outVSlice := unsafe.Slice(outVersions, n) + allMatch := true + for i, sk := range sks { + v := sk.varsVersion.Load() + outVSlice[i] = C.uint64_t(v) + if uint64(callerVSlice[i]) != v { + allMatch = false + } + } + if allMatch { + return nil // C side sees out == caller, uses cached value + } + } + + // Take all read locks, collect pointers, copy via C helper, then release + ptrs := make([]unsafe.Pointer, n) + for i, sk := range sks { + sk.mu.RLock() + ptrs[i] = sk.varsPtr + } + + C.frankenphp_worker_copy_vars(returnValue, C.int(n), names, nameLens, (*unsafe.Pointer)(unsafe.Pointer(&ptrs[0]))) + + // Write versions while locks are still held + if outVersions != nil { + outVSlice := unsafe.Slice(outVersions, n) + for i, sk := range sks { + outVSlice[i] = C.uint64_t(sk.varsVersion.Load()) + } + } + + for _, sk := range sks { + sk.mu.RUnlock() + } + + return nil +} + +//export go_frankenphp_worker_set_vars +func go_frankenphp_worker_set_vars(threadIndex C.uintptr_t, varsPtr unsafe.Pointer, oldPtr *unsafe.Pointer) *C.char { + thread := phpThreads[threadIndex] + + bgHandler, ok := thread.handler.(*backgroundWorkerThread) + if !ok || bgHandler.worker.backgroundWorker == nil { + return C.CString("frankenphp_worker_set_vars() can only be called from a background worker") + } + handler := &bgHandler.workerThread + + sk := handler.worker.backgroundWorker + + sk.mu.Lock() + *oldPtr = sk.varsPtr + sk.varsPtr = varsPtr + sk.varsVersion.Add(1) + sk.mu.Unlock() + + sk.readyOnce.Do(func() { + handler.markBackgroundReady() + close(sk.ready) + }) + + return nil +} diff --git a/background_worker_test.go b/background_worker_test.go new file mode 100644 index 0000000000..5e226919cf --- /dev/null +++ b/background_worker_test.go @@ -0,0 +1,124 @@ +package frankenphp + +import ( + "testing" + "time" + + "github.com/dunglas/frankenphp/internal/state" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type backgroundWorkerTestMetrics struct { + readyCalls int + stopCalls []StopReason +} + +func (m *backgroundWorkerTestMetrics) StartWorker(string) {} + +func (m *backgroundWorkerTestMetrics) ReadyWorker(string) { + m.readyCalls++ +} + +func (m *backgroundWorkerTestMetrics) StopWorker(_ string, reason StopReason) { + m.stopCalls = append(m.stopCalls, reason) +} + +func (m *backgroundWorkerTestMetrics) TotalWorkers(string, int) {} + +func (m *backgroundWorkerTestMetrics) TotalThreads(int) {} + +func (m *backgroundWorkerTestMetrics) StartRequest() {} + +func (m *backgroundWorkerTestMetrics) StopRequest() {} + +func (m *backgroundWorkerTestMetrics) StopWorkerRequest(string, time.Duration) {} + +func (m *backgroundWorkerTestMetrics) StartWorkerRequest(string) {} + +func (m *backgroundWorkerTestMetrics) Shutdown() {} + +func (m *backgroundWorkerTestMetrics) QueuedWorkerRequest(string) {} + +func (m *backgroundWorkerTestMetrics) DequeuedWorkerRequest(string) {} + +func (m *backgroundWorkerTestMetrics) QueuedRequest() {} + +func (m *backgroundWorkerTestMetrics) DequeuedRequest() {} + +func TestStartBackgroundWorkerFailureIsRetryable(t *testing.T) { + lookup := newBackgroundWorkerLookupWithCatchAll(testDataPath + "/background-worker-with-argv.php") + globalBackgroundLookup = lookup + thread := newPHPThread(0) + thread.state.Set(state.Ready) + thread.handler = &workerThread{ + thread: thread, + worker: &worker{backgroundLookup: lookup}, + } + phpThreads = []*phpThread{thread} + t.Cleanup(func() { + phpThreads = nil + }) + + registry := lookup.Resolve("retryable-background-worker") + + err := startBackgroundWorker(thread, "retryable-background-worker") + require.EqualError(t, err, "no available PHP thread for background worker (increase max_threads)") + assert.Empty(t, registry.workers) + + err = startBackgroundWorker(thread, "retryable-background-worker") + require.EqualError(t, err, "no available PHP thread for background worker (increase max_threads)") + assert.Empty(t, registry.workers) +} + +func TestBackgroundWorkerSetVarsMarksWorkerReady(t *testing.T) { + originalMetrics := metrics + testMetrics := &backgroundWorkerTestMetrics{} + metrics = testMetrics + t.Cleanup(func() { + metrics = originalMetrics + }) + + handler := &workerThread{ + thread: newPHPThread(0), + worker: &worker{name: "background-worker", fileName: "background-worker.php", maxConsecutiveFailures: -1}, + isBootingScript: true, + } + + handler.markBackgroundReady() + handler.markBackgroundReady() + + assert.False(t, handler.isBootingScript) + assert.Equal(t, 0, handler.failureCount) + assert.Equal(t, 1, testMetrics.readyCalls) +} + +func TestBackgroundWorkerBootFailureStaysBootFailureUntilReady(t *testing.T) { + originalMetrics := metrics + testMetrics := &backgroundWorkerTestMetrics{} + metrics = testMetrics + t.Cleanup(func() { + metrics = originalMetrics + }) + + handler := &workerThread{ + thread: newPHPThread(0), + worker: &worker{ + name: "background-worker", + fileName: "background-worker.php", + maxConsecutiveFailures: -1, + }, + isBootingScript: true, + } + + tearDownWorkerScript(handler, 1) + require.Len(t, testMetrics.stopCalls, 1) + assert.Equal(t, StopReason(StopReasonBootFailure), testMetrics.stopCalls[0]) + + testMetrics.stopCalls = nil + handler.isBootingScript = true + handler.markBackgroundReady() + tearDownWorkerScript(handler, 1) + require.Len(t, testMetrics.stopCalls, 1) + assert.Equal(t, StopReason(StopReasonCrash), testMetrics.stopCalls[0]) +} diff --git a/caddy/app.go b/caddy/app.go index 9242d870c6..50c1e3aa4a 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -20,6 +20,7 @@ import ( "github.com/dunglas/frankenphp/internal/fastabs" ) + var ( options []frankenphp.Option optionsMU sync.RWMutex @@ -164,6 +165,10 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithWorkerRequestOptions(w.requestOptions...), ) + if w.Background { + w.options = append(w.options, frankenphp.WithWorkerBackground(w.FileName, w.MaxThreads)) + } + f.opts = append(f.opts, frankenphp.WithWorkers(w.Name, repl.ReplaceKnown(w.FileName, ""), w.Num, w.options...)) } diff --git a/caddy/module.go b/caddy/module.go index 2241e216e2..1f5c617957 100644 --- a/caddy/module.go +++ b/caddy/module.go @@ -317,8 +317,12 @@ func (f *FrankenPHPModule) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } // Check if a worker with this filename already exists in this module + // Background workers are excluded (they share filenames by design) fileNames := make(map[string]struct{}, len(f.Workers)) for _, w := range f.Workers { + if w.Background { + continue + } if _, ok := fileNames[w.FileName]; ok { return fmt.Errorf(`workers in a single "php" or "php_server" block must not have duplicate filenames: %q`, w.FileName) } diff --git a/caddy/workerconfig.go b/caddy/workerconfig.go index c50f0d0688..d1c6aff637 100644 --- a/caddy/workerconfig.go +++ b/caddy/workerconfig.go @@ -41,7 +41,8 @@ type workerConfig struct { MatchPath []string `json:"match_path,omitempty"` // MaxConsecutiveFailures sets the maximum number of consecutive failures before panicking (defaults to 6, set to -1 to never panick) MaxConsecutiveFailures int `json:"max_consecutive_failures,omitempty"` - + // Background marks this worker as a background worker (non-HTTP) + Background bool `json:"background,omitempty"` options []frankenphp.WorkerOption requestOptions []frankenphp.RequestOption absFileName string @@ -123,9 +124,13 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { wc.Watch = append(wc.Watch, patterns...) } case "match": - // provision the path so it's identical to Caddy match rules + if wc.Background { + return wc, d.Err(`"match" is not supported for background workers, use "name" instead`) + } + args := d.RemainingArgs() + // For HTTP workers, provision as Caddy path matchers // see: https://github.com/caddyserver/caddy/blob/master/modules/caddyhttp/matchers.go - caddyMatchPath := (caddyhttp.MatchPath)(d.RemainingArgs()) + caddyMatchPath := (caddyhttp.MatchPath)(args) if err := caddyMatchPath.Provision(caddy.Context{}); err != nil { return wc, d.WrapErr(err) } @@ -145,8 +150,10 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { } wc.MaxConsecutiveFailures = v + case "background": + wc.Background = true default: - return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads", v) + return wc, wrongSubDirectiveError("worker", "name, file, num, env, watch, match, max_consecutive_failures, max_threads, background", v) } } @@ -154,6 +161,18 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) { return wc, d.Err(`the "file" argument must be specified`) } + if wc.Background { + // Validate background worker constraints + if wc.Num > 1 { + return wc, d.Err(`"num" > 1 is not yet supported for background workers`) + } + if wc.MaxThreads > 1 { + return wc, d.Err(`"max_threads" > 1 is not yet supported for background workers`) + } + // MaxConsecutiveFailures defaults to 6 (same as HTTP workers) + // via defaultMaxConsecutiveFailures in options.go + } + if frankenphp.EmbeddedAppPath != "" && filepath.IsLocal(wc.FileName) { wc.FileName = filepath.Join(frankenphp.EmbeddedAppPath, wc.FileName) } @@ -174,6 +193,11 @@ func (wc *workerConfig) inheritEnv(env map[string]string) { } func (wc *workerConfig) matchesPath(r *http.Request, documentRoot string) bool { + // background workers don't handle HTTP requests + if wc.Background { + return false + } + // try to match against a pattern if one is assigned if len(wc.MatchPath) != 0 { return (caddyhttp.MatchPath)(wc.MatchPath).Match(r) diff --git a/context.go b/context.go index 92f3b7471c..95bcf64954 100644 --- a/context.go +++ b/context.go @@ -16,13 +16,14 @@ import ( type frankenPHPContext struct { mercureContext - documentRoot string - splitPath []string - env PreparedEnv - logger *slog.Logger - request *http.Request - originalRequest *http.Request - worker *worker + documentRoot string + splitPath []string + env PreparedEnv + logger *slog.Logger + request *http.Request + originalRequest *http.Request + worker *worker + backgroundLookup *backgroundWorkerLookup docURI string pathInfo string diff --git a/docs/background-workers.md b/docs/background-workers.md new file mode 100644 index 0000000000..f445c53e9c --- /dev/null +++ b/docs/background-workers.md @@ -0,0 +1,225 @@ +# Background Workers + +Background workers are long-running PHP scripts that run outside the HTTP request cycle. +They observe their environment and publish configuration that HTTP [workers](worker.md) can read in real time. + +## How It Works + +1. A background worker runs its own event loop (subscribe to Redis, watch files, poll an API...) +2. It calls `frankenphp_worker_set_vars()` to publish a snapshot of key-value pairs +3. HTTP workers call `frankenphp_worker_get_vars()` to read the latest snapshot +4. The first `get_vars()` call blocks until the background worker has published - no startup race condition + +## Configuration + +Add `worker` directives with `background` to your [`php_server` or `php` block](config.md#caddyfile-config): + +```caddyfile +example.com { + php_server { + # Named background workers - lazy-started on first get_vars() + worker /app/bin/console { background; name config-watcher } + worker /app/bin/console { background; name feature-flags } + + # Catch-all - handles any unlisted name via get_vars() + worker /app/bin/console { background } + } +} +``` + +### Named vs catch-all + +- **Named** (with `name`): the worker name is a known identifier. Lazy-started on first `get_vars()` call. Multiple named workers can share the same entrypoint file. +- **Catch-all** (no `name`): also lazy-started on first `get_vars()`. Use `max_threads` to set a safety cap on how many can be created (defaults to 16). Not declaring a catch-all forbids unlisted names. + +### Config constraints + +- `num` and `max_threads` are accepted but capped at 1 for now (pooling is a future feature). Values > 1 are rejected with a clear error. +- `max_threads` on catch-all workers sets a safety cap for lazy-started instances (defaults to 16). +- `max_consecutive_failures` defaults to 6 (same as HTTP workers). +- `env` and `watch` work the same as HTTP workers. + +### Thread reservation + +Background workers get dedicated thread slots outside the global `max_threads` budget. +They don't compete with HTTP auto-scaling. For catch-all workers, `max_threads` determines +the reservation (default 16). Named workers with `num 0` (default) are lazy-started but +still reserve 1 thread (`max_threads` defaults to `max(num, 1)`). + +Each `php_server` block has its own isolated background worker scope. + +## PHP API + +### `frankenphp_worker_get_vars(string|array $name, float $timeout = 30.0): array` + +Starts a background worker (at-most-once) and returns its published variables. + +- First call blocks until the background worker calls `set_vars()` or the timeout expires +- Subsequent calls return the latest snapshot immediately +- When `$name` is an array, all background workers start in parallel and vars are returned keyed by name: + +```php +$redis = frankenphp_worker_get_vars('redis-watcher'); +// ['MASTER_HOST' => '10.0.0.1', 'MASTER_PORT' => '6379'] + +$all = frankenphp_worker_get_vars(['redis-watcher', 'feature-flags']); +// ['redis-watcher' => [...], 'feature-flags' => [...]] +``` + +- `$name` is available as `$_SERVER['FRANKENPHP_WORKER_NAME']` (set for all workers, HTTP and background) and `$_SERVER['argv'][1]` in background workers (for CLI compatibility with `bin/console`) +- Throws `RuntimeException` on timeout, missing entrypoint, or background worker crash +- Throws `LogicException` if the vars contain an enum class or case that cannot be resolved +- Works in both worker and non-worker mode +- **Per-request cache**: within a single HTTP request, repeated calls with the same name return the same array instance if the data hasn't changed. This means `===` comparisons are O(1) (pointer equality), and no lock or copy is needed on cache hit + +### `frankenphp_worker_set_vars(array $vars): void` + +Publishes a snapshot of key-value pairs from inside a background worker. +Each call **replaces** the entire snapshot atomically. + +Supported value types: `null`, `bool`, `int`, `float`, `string`, `array` (nested), and **enums**. +Objects, resources, and references are rejected. + +- Throws `RuntimeException` if not called from a background worker context +- Throws `ValueError` if values contain objects, resources, or references + +### `frankenphp_worker_get_signaling_stream(): resource` + +Returns a readable stream used for receiving signals from FrankenPHP. +Signals are newline-terminated strings: `"stop\n"` signals shutdown or restart. +Read with `fgets()` to identify the signal type. + +Use `stream_select()` instead of `sleep()` or `usleep()` to wait between iterations: + +```php +function background_worker_should_stop(float $timeout = 0): bool +{ + static $signalingStream; + $signalingStream ??= frankenphp_worker_get_signaling_stream(); + $s = (int) $timeout; + + return match (@stream_select(...[[$signalingStream], [], [], $s, (int) (($timeout - $s) * 1e6)])) { + 0 => false, // timeout + false => true, // error (pipe closed) = stop + default => "stop\n" === fgets($signalingStream), + }; +} + +do { + // ... do work, call set_vars() ... +} while (!background_worker_should_stop(5)); +``` + +> [!WARNING] +> Avoid using `sleep()` or `usleep()` in background workers. They block at the C level and cannot be interrupted. +> A background worker using `sleep(60)` would delay shutdown or worker restart by up to 60 seconds. +> Use `stream_select()` with the signaling stream instead - it wakes up immediately when FrankenPHP needs the thread to stop. + +- Throws `RuntimeException` if not called from a background worker context + +## Example + +### Background Worker Entrypoint + +```php + run_redis_watcher(), + default => throw new \RuntimeException("Unknown background worker: $command"), +}; + +function run_redis_watcher(): void +{ + $signalingStream = frankenphp_worker_get_signaling_stream(); + $sentinel = Amp\Redis\createRedisClient('tcp://sentinel-host:26379'); + + $subscription = $sentinel->subscribe('+switch-master'); + + Amp\async(function () use ($subscription) { + foreach ($subscription as $message) { + [$name, $oldIp, $oldPort, $newIp, $newPort] = explode(' ', $message); + frankenphp_worker_set_vars([ + 'MASTER_HOST' => $newIp, + 'MASTER_PORT' => (int) $newPort, + ]); + } + }); + + $master = $sentinel->rawCommand('SENTINEL', 'get-master-addr-by-name', 'mymaster'); + frankenphp_worker_set_vars([ + 'MASTER_HOST' => $master[0], + 'MASTER_PORT' => (int) $master[1], + ]); + + Amp\EventLoop::onReadable($signalingStream, function ($id) use ($signalingStream) { + if ("stop\n" === fgets($signalingStream)) { + Amp\EventLoop::cancel($id); + } + }); + Amp\EventLoop::run(); +} +``` + +### HTTP Worker + +```php +boot(); + +while (frankenphp_handle_request(function () use ($app) { + $redis = frankenphp_worker_get_vars('redis-watcher'); + + $app->handle($_GET, $_POST, $_COOKIE, $_FILES, $_SERVER + $redis); +})) { + gc_collect_cycles(); +} +``` + +### Graceful Degradation + +```php +if (function_exists('frankenphp_worker_get_vars')) { + $config = frankenphp_worker_get_vars('config-watcher'); +} else { + $config = ['MASTER_HOST' => getenv('REDIS_HOST') ?: '127.0.0.1']; +} +``` + +## Runtime Behavior + +- Background workers get their own dedicated thread from the reserved pool; they don't reduce HTTP capacity +- Execution timeout is automatically disabled +- Shebangs (`#!/usr/bin/env php`) are silently skipped +- `SCRIPT_FILENAME` is set to the entrypoint's full path +- `$_SERVER['FRANKENPHP_WORKER_NAME']` contains the worker name (set for all workers, HTTP and background) +- `$_SERVER['FRANKENPHP_WORKER_BACKGROUND']` is `true` for background workers, `false` for HTTP workers +- Background workers also get `$_SERVER['argv']` = `[entrypoint, name]` for CLI compatibility +- Crash recovery: automatic restart with exponential backoff +- Graceful shutdown via `frankenphp_worker_get_signaling_stream()` and `stream_select()` +- Grace period: on restart/shutdown, background workers receive `"stop\n"` on the signaling stream and have 5 seconds to exit gracefully. Workers still blocked after 5 seconds are force-killed (Linux ZTS, Windows) or abandoned (macOS). +- Worker restarts stop and immediately restart background workers (same as HTTP workers) +- Use `error_log()` or `frankenphp_log()` for logging - avoid `echo` + +For advanced use cases (amphp, ReactPHP), the signaling stream can be registered directly +in the event loop - see `frankenphp_worker_get_signaling_stream()`. + +## Performance + +`get_vars` is designed to be called on every HTTP request with minimal overhead: + +- **Per-request cache**: repeated calls within the same request return the cached result. No Go call, no lock, no copy. `===` comparisons between cached results are O(1) (same `HashTable` pointer). +- **Generational check**: on cache miss, an atomic version counter is checked before taking any lock. If the data hasn't changed since the last request, the lock and copy are skipped entirely. +- **Immutable array zero-copy**: when opcache produces an `IS_ARRAY_IMMUTABLE` array (e.g. from a `const` or literal), both `set_vars` and `get_vars` skip allocation entirely - the pointer is shared directly. +- **Interned string optimization**: string keys and enum names that live in shared memory (`ZSTR_IS_INTERNED`) skip copy and free in all code paths. +- **Precomputed hashes**: when copying persistent arrays to request memory, string key hashes are preserved from the persistent copy. diff --git a/frankenphp.c b/frankenphp.c index c25a3505f3..b04170ca04 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -1,6 +1,7 @@ #include "frankenphp.h" #include #include +#include #include #include #include @@ -13,6 +14,7 @@ #include #ifdef PHP_WIN32 #include +#include #else #include #endif @@ -85,8 +87,153 @@ HashTable *main_thread_env = NULL; __thread uintptr_t thread_index; __thread bool is_worker_thread = false; +__thread char *worker_name = NULL; +__thread bool is_background_worker = false; +__thread int worker_stop_fds[2] = {-1, -1}; +__thread php_stream *worker_signaling_stream = NULL; __thread HashTable *sandboxed_env = NULL; +/* Best-effort force-kill for stuck background workers after grace period. + * - Linux ZTS: arm PHP's per-thread timer -> "max execution time" fatal + * - Windows: CancelSynchronousIo + QueueUserAPC -> interrupts I/O and sleeps + * - macOS/other: no-op (threads abandoned, exit when blocking call returns) */ +static int force_kill_num_threads = 0; +#ifdef ZEND_MAX_EXECUTION_TIMERS +static timer_t *thread_php_timers = NULL; +static bool *thread_php_timer_saved = NULL; +#elif defined(PHP_WIN32) +static HANDLE *thread_handles = NULL; +static bool *thread_handle_saved = NULL; +static void CALLBACK frankenphp_noop_apc(ULONG_PTR param) { (void)param; } +#endif + +void frankenphp_init_force_kill(int num_threads) { + force_kill_num_threads = num_threads; +#ifdef ZEND_MAX_EXECUTION_TIMERS + thread_php_timers = calloc(num_threads, sizeof(timer_t)); + thread_php_timer_saved = calloc(num_threads, sizeof(bool)); +#elif defined(PHP_WIN32) + thread_handles = calloc(num_threads, sizeof(HANDLE)); + thread_handle_saved = calloc(num_threads, sizeof(bool)); +#endif +} + +void frankenphp_save_php_timer(uintptr_t idx) { + if (idx >= (uintptr_t)force_kill_num_threads) { + return; + } +#ifdef ZEND_MAX_EXECUTION_TIMERS + if (thread_php_timers && EG(pid)) { + thread_php_timers[idx] = EG(max_execution_timer_timer); + thread_php_timer_saved[idx] = true; + } +#elif defined(PHP_WIN32) + if (thread_handles) { + DuplicateHandle(GetCurrentProcess(), GetCurrentThread(), + GetCurrentProcess(), &thread_handles[idx], 0, FALSE, + DUPLICATE_SAME_ACCESS); + thread_handle_saved[idx] = true; + } +#endif + (void)idx; +} + +void frankenphp_force_kill_thread(uintptr_t idx) { + if (idx >= (uintptr_t)force_kill_num_threads) { + return; + } +#ifdef ZEND_MAX_EXECUTION_TIMERS + if (thread_php_timers && thread_php_timer_saved[idx]) { + struct itimerspec its; + its.it_value.tv_sec = 0; + its.it_value.tv_nsec = 1; + its.it_interval.tv_sec = 0; + its.it_interval.tv_nsec = 0; + timer_settime(thread_php_timers[idx], 0, &its, NULL); + } +#elif defined(PHP_WIN32) + if (thread_handles && thread_handle_saved[idx]) { + CancelSynchronousIo(thread_handles[idx]); + QueueUserAPC((PAPCFUNC)frankenphp_noop_apc, thread_handles[idx], 0); + } +#endif + (void)idx; +} + +void frankenphp_destroy_force_kill(void) { +#ifdef ZEND_MAX_EXECUTION_TIMERS + if (thread_php_timers) { + free(thread_php_timers); + thread_php_timers = NULL; + } + if (thread_php_timer_saved) { + free(thread_php_timer_saved); + thread_php_timer_saved = NULL; + } +#elif defined(PHP_WIN32) + if (thread_handles) { + for (int i = 0; i < force_kill_num_threads; i++) { + if (thread_handle_saved && thread_handle_saved[i]) { + CloseHandle(thread_handles[i]); + } + } + free(thread_handles); + thread_handles = NULL; + } + if (thread_handle_saved) { + free(thread_handle_saved); + thread_handle_saved = NULL; + } +#endif + force_kill_num_threads = 0; +} + +/* Per-thread cache for get_vars results. + * Maps worker name (string) -> {version, cached_zval}. + * When the version matches, the cached zval is returned with a refcount bump, + * giving the same HashTable pointer -> === comparisons are O(1). */ +typedef struct { + uint64_t version; + zval value; +} bg_worker_vars_cache_entry; +__thread HashTable *worker_vars_cache = NULL; + +static void frankenphp_worker_close_stop_fds(void) { + if (worker_stop_fds[0] >= 0) { +#ifdef PHP_WIN32 + _close(worker_stop_fds[0]); +#else + close(worker_stop_fds[0]); +#endif + worker_stop_fds[0] = -1; + } + + if (worker_stop_fds[1] >= 0) { +#ifdef PHP_WIN32 + _close(worker_stop_fds[1]); +#else + close(worker_stop_fds[1]); +#endif + worker_stop_fds[1] = -1; + } +} + +static int frankenphp_worker_open_stop_pipe(void) { +#ifdef PHP_WIN32 + return _pipe(worker_stop_fds, 4096, _O_BINARY); +#else + return pipe(worker_stop_fds); +#endif +} + +static int frankenphp_worker_dup_fd(int fd) { +#ifdef PHP_WIN32 + return _dup(fd); +#else + return dup(fd); +#endif +} + void frankenphp_update_local_thread_context(bool is_worker) { is_worker_thread = is_worker; @@ -94,6 +241,76 @@ void frankenphp_update_local_thread_context(bool is_worker) { PG(ignore_user_abort) = is_worker ? 1 : original_user_abort_setting; } +static void bg_worker_vars_cache_dtor(zval *zv) { + bg_worker_vars_cache_entry *entry = Z_PTR_P(zv); + zval_ptr_dtor(&entry->value); + free(entry); +} + +static void bg_worker_vars_cache_reset(void) { + if (worker_vars_cache) { + zend_hash_destroy(worker_vars_cache); + free(worker_vars_cache); + worker_vars_cache = NULL; + } +} + +void frankenphp_set_worker_name(char *name, bool background) { + free(worker_name); + if (name) { + size_t len = strlen(name) + 1; + worker_name = malloc(len); + memcpy(worker_name, name, len); + } else { + worker_name = NULL; + } + is_background_worker = background; + if (!background) { + return; + } + worker_signaling_stream = NULL; + zend_unset_timeout(); + + /* Create a pipe for stop signaling */ + frankenphp_worker_close_stop_fds(); + if (frankenphp_worker_open_stop_pipe() != 0) { + worker_stop_fds[0] = -1; + worker_stop_fds[1] = -1; + } +} + +int frankenphp_worker_get_stop_fd_write(void) { return worker_stop_fds[1]; } + +static int bg_worker_pipe_write_impl(int fd, const char *buf, int len) { + if (fd < 0) { + return -1; + } + +#ifdef PHP_WIN32 + return _write(fd, buf, len); +#else + return (int)write(fd, buf, len); +#endif +} + +int frankenphp_worker_write_stop_fd(int fd) { + return bg_worker_pipe_write_impl(fd, "stop\n", 5); +} + +int frankenphp_worker_write_task_signal(int fd) { + return bg_worker_pipe_write_impl(fd, "task\n", 5); +} + +void frankenphp_worker_close_fd(int fd) { + if (fd >= 0) { +#ifdef PHP_WIN32 + _close(fd); +#else + close(fd); +#endif + } +} + static void frankenphp_update_request_context() { /* the server context is stored on the go side, still SG(server_context) needs * to not be NULL */ @@ -534,11 +751,14 @@ PHP_FUNCTION(frankenphp_handle_request) { Z_PARAM_FUNC(fci, fcc) ZEND_PARSE_PARAMETERS_END(); - if (!is_worker_thread) { - /* not a worker, throw an error */ + if (!is_worker_thread || is_background_worker) { zend_throw_exception( spl_ce_RuntimeException, - "frankenphp_handle_request() called while not in worker mode", 0); + is_background_worker + ? "frankenphp_handle_request() cannot be called from a background " + "worker" + : "frankenphp_handle_request() called while not in worker mode", + 0); RETURN_THROWS(); } @@ -596,6 +816,7 @@ PHP_FUNCTION(frankenphp_handle_request) { } } + bg_worker_vars_cache_reset(); frankenphp_worker_request_shutdown(); go_frankenphp_finish_worker_request(thread_index, callback_ret); if (result.r1 != NULL) { @@ -608,6 +829,506 @@ PHP_FUNCTION(frankenphp_handle_request) { RETURN_TRUE; } +/* Persistent enum storage */ +typedef struct { + zend_string *class_name; + zend_string *case_name; +} bg_worker_enum_t; + +/* Forward declarations */ +static void bg_worker_free_persistent_zval(zval *z); +static void bg_worker_request_copy_zval(zval *dst, zval *src); + +/* Check if a HashTable is an opcache immutable array - safe to share + * across threads without copying. */ +static bool bg_worker_is_immutable(HashTable *ht) { + return (GC_FLAGS(ht) & IS_ARRAY_IMMUTABLE) != 0; +} + +/* Free a stored vars pointer only if it's a persistent copy (not immutable). */ +static void bg_worker_free_stored_vars(void *ptr) { + if (ptr != NULL) { + HashTable *ht = (HashTable *)ptr; + if (!bg_worker_is_immutable(ht)) { + zval z; + ZVAL_ARR(&z, ht); + bg_worker_free_persistent_zval(&z); + } + } +} + +/* Copy or reference a stored vars pointer to request memory. + * Immutable arrays are returned as zero-copy references. */ +static void bg_worker_read_stored_vars(zval *dst, void *ptr) { + HashTable *ht = (HashTable *)ptr; + if (bg_worker_is_immutable(ht)) { + ZVAL_ARR(dst, ht); /* zero-copy: immutable = safe to share */ + } else { + zval src; + ZVAL_ARR(&src, ht); + bg_worker_request_copy_zval(dst, &src); + } +} + +/* Validate that a zval tree contains only scalars, arrays, and enums */ +static bool bg_worker_validate_zval(zval *z) { + switch (Z_TYPE_P(z)) { + case IS_NULL: + case IS_FALSE: + case IS_TRUE: + case IS_LONG: + case IS_DOUBLE: + case IS_STRING: + return true; + case IS_OBJECT: + return (Z_OBJCE_P(z)->ce_flags & ZEND_ACC_ENUM) != 0; + case IS_ARRAY: { + zval *val; + ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(z), val) { + if (!bg_worker_validate_zval(val)) + return false; + } + ZEND_HASH_FOREACH_END(); + return true; + } + default: + return false; + } +} + +/* Deep-copy a zval into persistent memory */ +static void bg_worker_persist_zval(zval *dst, zval *src) { + switch (Z_TYPE_P(src)) { + case IS_NULL: + case IS_FALSE: + case IS_TRUE: + ZVAL_COPY_VALUE(dst, src); + break; + case IS_LONG: + ZVAL_LONG(dst, Z_LVAL_P(src)); + break; + case IS_DOUBLE: + ZVAL_DOUBLE(dst, Z_DVAL_P(src)); + break; + case IS_STRING: { + zend_string *s = Z_STR_P(src); + if (ZSTR_IS_INTERNED(s)) { + ZVAL_STR(dst, s); /* interned = shared memory, no copy needed */ + } else { + ZVAL_NEW_STR(dst, zend_string_init(ZSTR_VAL(s), ZSTR_LEN(s), 1)); + } + break; + } + case IS_OBJECT: { + /* Must be an enum (validated earlier) */ + zend_class_entry *ce = Z_OBJCE_P(src); + bg_worker_enum_t *e = pemalloc(sizeof(bg_worker_enum_t), 1); + e->class_name = + ZSTR_IS_INTERNED(ce->name) + ? ce->name + : zend_string_init(ZSTR_VAL(ce->name), ZSTR_LEN(ce->name), 1); + zval *case_name_zval = zend_enum_fetch_case_name(Z_OBJ_P(src)); + zend_string *case_str = Z_STR_P(case_name_zval); + e->case_name = + ZSTR_IS_INTERNED(case_str) + ? case_str + : zend_string_init(ZSTR_VAL(case_str), ZSTR_LEN(case_str), 1); + ZVAL_PTR(dst, e); + break; + } + case IS_ARRAY: { + HashTable *src_ht = Z_ARRVAL_P(src); + HashTable *dst_ht = pemalloc(sizeof(HashTable), 1); + zend_hash_init(dst_ht, zend_hash_num_elements(src_ht), NULL, NULL, 1); + ZVAL_ARR(dst, dst_ht); + + zend_string *key; + zend_ulong idx; + zval *val; + ZEND_HASH_FOREACH_KEY_VAL(src_ht, idx, key, val) { + zval pval; + bg_worker_persist_zval(&pval, val); + if (key) { + if (ZSTR_IS_INTERNED(key)) { + zend_hash_add_new(dst_ht, key, &pval); + } else { + zend_string *pkey = zend_string_init(ZSTR_VAL(key), ZSTR_LEN(key), 1); + zend_hash_add_new(dst_ht, pkey, &pval); + zend_string_release(pkey); + } + } else { + zend_hash_index_add_new(dst_ht, idx, &pval); + } + } + ZEND_HASH_FOREACH_END(); + break; + } + default: + ZVAL_NULL(dst); + break; + } +} + +/* Deep-free a persistent zval tree */ +static void bg_worker_free_persistent_zval(zval *z) { + switch (Z_TYPE_P(z)) { + case IS_STRING: + if (!ZSTR_IS_INTERNED(Z_STR_P(z))) { + zend_string_free(Z_STR_P(z)); + } + break; + case IS_PTR: { + bg_worker_enum_t *e = (bg_worker_enum_t *)Z_PTR_P(z); + if (!ZSTR_IS_INTERNED(e->class_name)) + zend_string_free(e->class_name); + if (!ZSTR_IS_INTERNED(e->case_name)) + zend_string_free(e->case_name); + pefree(e, 1); + break; + } + case IS_ARRAY: { + zval *val; + ZEND_HASH_FOREACH_VAL(Z_ARRVAL_P(z), val) { + bg_worker_free_persistent_zval(val); + } + ZEND_HASH_FOREACH_END(); + zend_hash_destroy(Z_ARRVAL_P(z)); + pefree(Z_ARRVAL_P(z), 1); + break; + } + default: + break; + } +} + +/* Go-callable wrapper to free a persistent HashTable */ +void frankenphp_worker_free_persistent_ht(void *ptr) { + if (ptr && !bg_worker_is_immutable((HashTable *)ptr)) { + zval z; + ZVAL_ARR(&z, (HashTable *)ptr); + bg_worker_free_persistent_zval(&z); + } +} + +/* Deep-copy a persistent zval tree into request memory */ +static void bg_worker_request_copy_zval(zval *dst, zval *src) { + switch (Z_TYPE_P(src)) { + case IS_NULL: + case IS_FALSE: + case IS_TRUE: + ZVAL_COPY_VALUE(dst, src); + break; + case IS_LONG: + ZVAL_LONG(dst, Z_LVAL_P(src)); + break; + case IS_DOUBLE: + ZVAL_DOUBLE(dst, Z_DVAL_P(src)); + break; + case IS_STRING: + if (ZSTR_IS_INTERNED(Z_STR_P(src))) { + ZVAL_STR(dst, Z_STR_P(src)); + } else { + ZVAL_STRINGL(dst, Z_STRVAL_P(src), Z_STRLEN_P(src)); + } + break; + case IS_PTR: { + bg_worker_enum_t *e = (bg_worker_enum_t *)Z_PTR_P(src); + zend_class_entry *ce = zend_lookup_class(e->class_name); + if (!ce || !(ce->ce_flags & ZEND_ACC_ENUM)) { + zend_throw_exception_ex(spl_ce_LogicException, 0, + "Background worker enum class \"%s\" not found", + ZSTR_VAL(e->class_name)); + ZVAL_NULL(dst); + break; + } + zend_object *enum_obj = zend_enum_get_case_cstr(ce, ZSTR_VAL(e->case_name)); + if (!enum_obj) { + zend_throw_exception_ex( + spl_ce_LogicException, 0, + "Background worker enum case \"%s::%s\" not found", + ZSTR_VAL(e->class_name), ZSTR_VAL(e->case_name)); + ZVAL_NULL(dst); + break; + } + ZVAL_OBJ_COPY(dst, enum_obj); + break; + } + case IS_ARRAY: { + HashTable *src_ht = Z_ARRVAL_P(src); + array_init_size(dst, zend_hash_num_elements(src_ht)); + HashTable *dst_ht = Z_ARRVAL_P(dst); + + zend_string *key; + zend_ulong idx; + zval *val; + ZEND_HASH_FOREACH_KEY_VAL(src_ht, idx, key, val) { + zval rval; + bg_worker_request_copy_zval(&rval, val); + if (EG(exception)) { + zval_ptr_dtor(&rval); + break; + } + if (key) { + if (ZSTR_IS_INTERNED(key)) { + zend_hash_add_new(dst_ht, key, &rval); + } else { + zend_string *rkey = zend_string_init(ZSTR_VAL(key), ZSTR_LEN(key), 0); + ZSTR_H(rkey) = ZSTR_H(key); + zend_hash_add_new(dst_ht, rkey, &rval); + zend_string_release(rkey); + } + } else { + zend_hash_index_add_new(dst_ht, idx, &rval); + } + } + ZEND_HASH_FOREACH_END(); + break; + } + default: + ZVAL_NULL(dst); + break; + } +} + +PHP_FUNCTION(frankenphp_worker_set_vars) { + zval *vars_array = NULL; + + ZEND_PARSE_PARAMETERS_START(1, 1); + Z_PARAM_ARRAY(vars_array); + ZEND_PARSE_PARAMETERS_END(); + + HashTable *ht = Z_ARRVAL_P(vars_array); + + if (bg_worker_is_immutable(ht)) { + /* Fast path: immutable arrays are already in shared memory. + * No validation needed (immutable arrays contain only safe types). + * No deep-copy needed. Store the pointer directly. */ + void *old = NULL; + char *error = go_frankenphp_worker_set_vars(thread_index, ht, &old); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + bg_worker_free_stored_vars(old); + } else { + /* Slow path: validate, deep-copy to persistent memory */ + zval *val; + ZEND_HASH_FOREACH_VAL(ht, val) { + if (!bg_worker_validate_zval(val)) { + zend_value_error("Values must be null, scalars, arrays, or enums; " + "objects and resources are not allowed"); + RETURN_THROWS(); + } + } + ZEND_HASH_FOREACH_END(); + + zval persistent; + bg_worker_persist_zval(&persistent, vars_array); + + void *old = NULL; + char *error = + go_frankenphp_worker_set_vars(thread_index, Z_ARRVAL(persistent), &old); + if (error) { + bg_worker_free_persistent_zval(&persistent); + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + bg_worker_free_stored_vars(old); + } +} + +/* Copy vars from persistent storage into a PHP zval. + * For count == 1: copies directly into dst. + * For count > 1: creates a keyed array in dst. + * Returns true on success, false if an exception occurred. */ +bool frankenphp_worker_copy_vars(zval *dst, int count, char **names, + size_t *name_lens, void **ptrs) { + if (count == 1) { + if (ptrs[0]) { + bg_worker_read_stored_vars(dst, ptrs[0]); + } else { + array_init(dst); + } + return !EG(exception); + } + + array_init(dst); + for (int i = 0; i < count; i++) { + zval worker_vars; + if (ptrs[i]) { + bg_worker_read_stored_vars(&worker_vars, ptrs[i]); + if (EG(exception)) { + zval_ptr_dtor(&worker_vars); + return false; + } + } else { + array_init(&worker_vars); + } + add_assoc_zval_ex(dst, names[i], name_lens[i], &worker_vars); + } + return true; +} + +PHP_FUNCTION(frankenphp_worker_get_vars) { + zval *names = NULL; + double timeout = 30.0; + + ZEND_PARSE_PARAMETERS_START(1, 2); + Z_PARAM_ZVAL(names); + Z_PARAM_OPTIONAL + Z_PARAM_DOUBLE(timeout); + ZEND_PARSE_PARAMETERS_END(); + + if (timeout < 0) { + zend_value_error("Timeout must not be negative"); + RETURN_THROWS(); + } + int timeout_ms = (int)(timeout * 1000); + + if (Z_TYPE_P(names) == IS_STRING) { + if (Z_STRLEN_P(names) == 0) { + zend_value_error("Background worker name must not be empty"); + RETURN_THROWS(); + } + + char *name_ptr = Z_STRVAL_P(names); + size_t name_len_val = Z_STRLEN_P(names); + + /* Check per-request cache */ + uint64_t caller_version = 0; + uint64_t out_version = 0; + bg_worker_vars_cache_entry *cached = NULL; + if (worker_vars_cache) { + zval *entry_zv = + zend_hash_str_find(worker_vars_cache, name_ptr, name_len_val); + if (entry_zv) { + cached = Z_PTR_P(entry_zv); + caller_version = cached->version; + } + } + + char *error = go_frankenphp_worker_get_vars( + thread_index, &name_ptr, &name_len_val, 1, timeout_ms, return_value, + cached ? &caller_version : NULL, &out_version); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } + if (EG(exception)) { + RETURN_THROWS(); + } + + /* Cache hit: Go skipped the copy because version matched */ + if (cached && out_version == caller_version) { + ZVAL_COPY(return_value, &cached->value); + return; + } + + /* Cache miss: store the new result */ + if (!worker_vars_cache) { + worker_vars_cache = malloc(sizeof(HashTable)); + zend_hash_init(worker_vars_cache, 4, NULL, bg_worker_vars_cache_dtor, 0); + } + bg_worker_vars_cache_entry *entry = malloc(sizeof(*entry)); + entry->version = out_version; + ZVAL_COPY(&entry->value, return_value); + zval entry_zv; + ZVAL_PTR(&entry_zv, entry); + zend_hash_str_update(worker_vars_cache, name_ptr, name_len_val, &entry_zv); + + return; + } + + if (Z_TYPE_P(names) != IS_ARRAY) { + zend_type_error("Argument #1 ($name) must be of type string|array, %s " + "given", + zend_zval_type_name(names)); + RETURN_THROWS(); + } + + HashTable *ht = Z_ARRVAL_P(names); + zval *val; + + ZEND_HASH_FOREACH_VAL(ht, val) { + if (Z_TYPE_P(val) != IS_STRING || Z_STRLEN_P(val) == 0) { + zend_value_error("All background worker names must be non-empty strings"); + RETURN_THROWS(); + } + } + ZEND_HASH_FOREACH_END(); + + int name_count = zend_hash_num_elements(ht); + char **name_ptrs = malloc(sizeof(char *) * name_count); + size_t *name_lens_arr = malloc(sizeof(size_t) * name_count); + int idx = 0; + ZEND_HASH_FOREACH_VAL(ht, val) { + name_ptrs[idx] = Z_STRVAL_P(val); + name_lens_arr[idx] = Z_STRLEN_P(val); + idx++; + } + ZEND_HASH_FOREACH_END(); + + char *error = go_frankenphp_worker_get_vars( + thread_index, name_ptrs, name_lens_arr, name_count, timeout_ms, + return_value, NULL, NULL); + free(name_ptrs); + free(name_lens_arr); + if (error) { + zend_throw_exception(spl_ce_RuntimeException, error, 0); + free(error); + RETURN_THROWS(); + } +} + +PHP_FUNCTION(frankenphp_worker_get_signaling_stream) { + ZEND_PARSE_PARAMETERS_NONE(); + + if (!is_background_worker) { + zend_throw_exception(spl_ce_RuntimeException, + "frankenphp_worker_get_signaling_stream() can only " + "be called from a background worker", + 0); + RETURN_THROWS(); + } + + /* Return the cached stream if already created */ + if (worker_signaling_stream != NULL) { + php_stream_to_zval(worker_signaling_stream, return_value); + GC_ADDREF(Z_COUNTED_P(return_value)); + return; + } + + if (worker_stop_fds[0] < 0) { + zend_throw_exception(spl_ce_RuntimeException, + "failed to create background worker stop pipe", 0); + RETURN_THROWS(); + } + + int fd = frankenphp_worker_dup_fd(worker_stop_fds[0]); + if (fd < 0) { + zend_throw_exception(spl_ce_RuntimeException, + "failed to dup background worker stop fd", 0); + RETURN_THROWS(); + } + + php_stream *stream = php_stream_fopen_from_fd(fd, "rb", NULL); + if (!stream) { + frankenphp_worker_close_fd(fd); + zend_throw_exception(spl_ce_RuntimeException, + "failed to create stream from stop fd", 0); + RETURN_THROWS(); + } + + worker_signaling_stream = stream; + php_stream_to_zval(stream, return_value); + + /* Keep an extra ref so PHP can't destroy the stream while TLS caches it */ + GC_ADDREF(Z_COUNTED_P(return_value)); +} + PHP_FUNCTION(headers_send) { zend_long response_code = 200; @@ -1047,6 +1768,9 @@ static void *php_thread(void *arg) { #endif #endif + /* Save PHP's timer handle for best-effort force-kill after grace period */ + frankenphp_save_php_timer(thread_index); + // loop until Go signals to stop char *scriptName = NULL; while ((scriptName = go_frankenphp_before_script_execution(thread_index))) { @@ -1058,6 +1782,7 @@ static void *php_thread(void *arg) { ts_free_thread(); #endif + frankenphp_worker_close_stop_fds(); go_frankenphp_on_thread_shutdown(thread_index); return NULL; @@ -1217,6 +1942,46 @@ int frankenphp_execute_script(char *file_name) { file_handle.primary_script = 1; + if (worker_name != NULL) { + zend_is_auto_global_str("_SERVER", sizeof("_SERVER") - 1); + zval *server = &PG(http_globals)[TRACK_VARS_SERVER]; + if (server && Z_TYPE_P(server) == IS_ARRAY) { + zval name_zval; + ZVAL_STRING(&name_zval, worker_name); + zend_hash_str_update(Z_ARRVAL_P(server), "FRANKENPHP_WORKER_NAME", + sizeof("FRANKENPHP_WORKER_NAME") - 1, &name_zval); + + zval bg_zval; + ZVAL_BOOL(&bg_zval, is_background_worker); + zend_hash_str_update(Z_ARRVAL_P(server), "FRANKENPHP_WORKER_BACKGROUND", + sizeof("FRANKENPHP_WORKER_BACKGROUND") - 1, + &bg_zval); + } + } + + if (is_background_worker) { + CG(skip_shebang) = 1; + + /* Background workers run indefinitely - disable max_execution_time */ + zend_set_timeout(0, 0); + + zval *server = &PG(http_globals)[TRACK_VARS_SERVER]; + if (server && Z_TYPE_P(server) == IS_ARRAY) { + zval argv_array; + array_init(&argv_array); + add_next_index_string(&argv_array, file_name); + add_next_index_string(&argv_array, worker_name); + + zval argc_zval; + ZVAL_LONG(&argc_zval, 2); + + zend_hash_str_update(Z_ARRVAL_P(server), "argv", sizeof("argv") - 1, + &argv_array); + zend_hash_str_update(Z_ARRVAL_P(server), "argc", sizeof("argc") - 1, + &argc_zval); + } + } + zend_first_try { EG(exit_status) = 0; php_execute_script(&file_handle); @@ -1233,6 +1998,7 @@ int frankenphp_execute_script(char *file_name) { sandboxed_env = NULL; } + bg_worker_vars_cache_reset(); php_request_shutdown((void *)0); frankenphp_free_request_context(); diff --git a/frankenphp.go b/frankenphp.go index d2aaa3c7d9..f13d937b58 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -48,6 +48,7 @@ var ( ErrMainThreadCreation = errors.New("error creating the main thread") ErrScriptExecution = errors.New("error during PHP script execution") ErrNotRunning = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config") + ErrNotHTTPWorker = errors.New("worker is not an HTTP worker") ErrInvalidRequestPath = ErrRejected{"invalid request path", http.StatusBadRequest} ErrInvalidContentLengthHeader = ErrRejected{"invalid Content-Length header", http.StatusBadRequest} @@ -156,8 +157,20 @@ func Config() PHPConfig { func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { maxProcs := runtime.GOMAXPROCS(0) * 2 maxThreadsFromWorkers := 0 + reservedThreads := 0 for i, w := range opt.workers { + // Background workers contribute to thread budget + // but not to numWorkers (so num_threads stays the same for HTTP). + if w.isBackgroundWorker { + extra := w.num + if extra < 1 { + extra = 1 + } + reservedThreads += extra + continue + } + if w.num <= 0 { // https://github.com/php/frankenphp/issues/126 opt.workers[i].num = maxProcs @@ -199,6 +212,8 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return 0, fmt.Errorf("num_threads (%d) must be greater than the number of worker threads (%d)", opt.numThreads, numWorkers) } + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads return numWorkers, nil } @@ -208,6 +223,8 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return 0, fmt.Errorf("max_threads (%d) must be greater than the number of worker threads (%d)", opt.maxThreads, numWorkers) } + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads return numWorkers, nil } @@ -220,6 +237,8 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { } opt.maxThreads = opt.numThreads + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads return numWorkers, nil } @@ -232,6 +251,8 @@ func calculateMaxThreads(opt *opt) (numWorkers int, _ error) { return 0, fmt.Errorf("max_threads (%d) must be greater than or equal to num_threads (%d)", opt.maxThreads, opt.numThreads) } + opt.numThreads += reservedThreads + opt.maxThreads += reservedThreads return numWorkers, nil } @@ -414,6 +435,9 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error // Detect if a worker is available to handle this request if fc.worker != nil { + if fc.worker.isBackgroundWorker { + return ErrNotHTTPWorker + } return fc.worker.handleRequest(ch) } diff --git a/frankenphp.h b/frankenphp.h index f25cb85128..7742f3d672 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -171,7 +171,14 @@ bool frankenphp_new_php_thread(uintptr_t thread_index); bool frankenphp_shutdown_dummy_request(void); int frankenphp_execute_script(char *file_name); void frankenphp_update_local_thread_context(bool is_worker); - +void frankenphp_set_worker_name(char *name, bool background); +int frankenphp_worker_get_stop_fd_write(void); +int frankenphp_worker_write_stop_fd(int fd); +int frankenphp_worker_write_task_signal(int fd); +void frankenphp_worker_free_persistent_ht(void *ptr); +bool frankenphp_worker_copy_vars(zval *dst, int count, char **names, + size_t *name_lens, void **ptrs); +void frankenphp_worker_close_fd(int fd); int frankenphp_execute_script_cli(char *script, int argc, char **argv, bool eval); @@ -188,4 +195,9 @@ int frankenphp_get_current_memory_limit(); void register_extensions(zend_module_entry **m, int len); +void frankenphp_init_force_kill(int num_threads); +void frankenphp_save_php_timer(uintptr_t thread_index); +void frankenphp_force_kill_thread(uintptr_t thread_index); +void frankenphp_destroy_force_kill(void); + #endif diff --git a/frankenphp.stub.php b/frankenphp.stub.php index d6c85aa05f..fbc258fb5b 100644 --- a/frankenphp.stub.php +++ b/frankenphp.stub.php @@ -16,6 +16,19 @@ function frankenphp_handle_request(callable $callback): bool {} +/** + * @param array> $vars Nested arrays must recursively follow the same type constraints + */ +function frankenphp_worker_set_vars(array $vars): void {} + +/** + * @return array|\UnitEnum> Nested arrays recursively follow the same type constraints + */ +function frankenphp_worker_get_vars(string|array $name, float $timeout = 30.0): array {} + +/** @return resource */ +function frankenphp_worker_get_signaling_stream() {} + function headers_send(int $status = 200): int {} function frankenphp_finish_request(): bool {} @@ -50,7 +63,7 @@ function apache_response_headers(): array|bool {} function mercure_publish(string|array $topics, string $data = '', bool $private = false, ?string $id = null, ?string $type = null, ?int $retry = null): string {} /** - * @param int $level The importance or severity of a log event. The higher the level, the more important or severe the event. For more details, see: https://pkg.go.dev/log/slog#Level - * array $context Values of the array will be converted to the corresponding Go type (if supported by FrankenPHP) and added to the context of the structured logs using https://pkg.go.dev/log/slog#Attr + * @param int $level The importance or severity of a log event. The higher the level, the more important or severe the event. For more details, see: https://pkg.go.dev/log/slog#Level + * @param array $context Values of the array will be converted to the corresponding Go type (if supported by FrankenPHP) and added to the context of the structured logs using https://pkg.go.dev/log/slog#Attr */ function frankenphp_log(string $message, int $level = 0, array $context = []): void {} diff --git a/frankenphp_arginfo.h b/frankenphp_arginfo.h index 4f2707cbca..55620eed74 100644 --- a/frankenphp_arginfo.h +++ b/frankenphp_arginfo.h @@ -5,6 +5,18 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_handle_request, 0, 1, ZEND_ARG_TYPE_INFO(0, callback, IS_CALLABLE, 0) ZEND_END_ARG_INFO() +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_worker_set_vars, 0, 1, IS_VOID, 0) + ZEND_ARG_TYPE_INFO(0, vars, IS_ARRAY, 0) +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_frankenphp_worker_get_vars, 0, 1, IS_ARRAY, 0) + ZEND_ARG_TYPE_MASK(0, name, MAY_BE_STRING|MAY_BE_ARRAY, NULL) + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timeout, IS_DOUBLE, 0, "30.0") +ZEND_END_ARG_INFO() + +ZEND_BEGIN_ARG_INFO_EX(arginfo_frankenphp_worker_get_signaling_stream, 0, 0, 0) +ZEND_END_ARG_INFO() + ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_headers_send, 0, 0, IS_LONG, 0) ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, status, IS_LONG, 0, "200") ZEND_END_ARG_INFO() @@ -43,6 +55,9 @@ ZEND_END_ARG_INFO() ZEND_FUNCTION(frankenphp_handle_request); +ZEND_FUNCTION(frankenphp_worker_set_vars); +ZEND_FUNCTION(frankenphp_worker_get_vars); +ZEND_FUNCTION(frankenphp_worker_get_signaling_stream); ZEND_FUNCTION(headers_send); ZEND_FUNCTION(frankenphp_finish_request); ZEND_FUNCTION(frankenphp_request_headers); @@ -53,6 +68,9 @@ ZEND_FUNCTION(frankenphp_log); static const zend_function_entry ext_functions[] = { ZEND_FE(frankenphp_handle_request, arginfo_frankenphp_handle_request) + ZEND_FE(frankenphp_worker_set_vars, arginfo_frankenphp_worker_set_vars) + ZEND_FE(frankenphp_worker_get_vars, arginfo_frankenphp_worker_get_vars) + ZEND_FE(frankenphp_worker_get_signaling_stream, arginfo_frankenphp_worker_get_signaling_stream) ZEND_FE(headers_send, arginfo_headers_send) ZEND_FE(frankenphp_finish_request, arginfo_frankenphp_finish_request) ZEND_FALIAS(fastcgi_finish_request, frankenphp_finish_request, arginfo_fastcgi_finish_request) diff --git a/frankenphp_test.go b/frankenphp_test.go index 47e65c490b..f3cde8fc12 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -7,6 +7,8 @@ package frankenphp_test import ( "bytes" "context" + "crypto/md5" + "encoding/hex" "errors" "flag" "fmt" @@ -23,8 +25,8 @@ import ( "os" "os/exec" "os/user" - "runtime" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -46,6 +48,7 @@ type testOptions struct { realServer bool logger *slog.Logger initOpts []frankenphp.Option + workerOpts []frankenphp.WorkerOption requestOpts []frankenphp.RequestOption phpIni map[string]string } @@ -67,6 +70,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * frankenphp.WithWorkerEnv(opts.env), frankenphp.WithWorkerWatchMode(opts.watch), } + workerOpts = append(workerOpts, opts.workerOpts...) initOpts = append(initOpts, frankenphp.WithWorkers("workerName", testDataDir+opts.workerScript, opts.nbWorkers, workerOpts...)) } initOpts = append(initOpts, opts.initOpts...) @@ -803,6 +807,296 @@ func testFileUpload(t *testing.T, opts *testOptions) { }, opts) } +func TestBackgroundWorkerGetVars(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-with-argv.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + // get_vars blocks until the background worker calls set_vars - no polling needed + body, _ := testGet("http://example.com/background-worker-start.php", handler, t) + assert.Equal(t, "test-worker", body) + }, &testOptions{ + workerScript: "background-worker-start.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerGetVarsIdentity(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-with-argv.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-identity.php", handler, t) + assert.Equal(t, "IDENTICAL", body) + }, &testOptions{ + workerScript: "background-worker-identity.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerAtMostOnce(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-dedup.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-start-twice.php", handler, t) + assert.Equal(t, "dedup-worker", body) + }, &testOptions{ + workerScript: "background-worker-start-twice.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerNoEntrypoint(t *testing.T) { + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-no-entrypoint.php", handler, t) + assert.Equal(t, "no background worker configured in this php_server", body) + }, &testOptions{ + workerScript: "background-worker-no-entrypoint.php", + nbWorkers: 1, + nbParallelRequests: 1, + }) +} + +func TestBackgroundWorkerSetVarsValidation(t *testing.T) { + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-set-server-var-validation.php", handler, t) + assert.Contains(t, body, "NON_BACKGROUND:blocked") + assert.Contains(t, body, "STREAM_NON_BACKGROUND:blocked") + }, &testOptions{ + workerScript: "background-worker-set-server-var-validation.php", + nbWorkers: 1, + nbParallelRequests: 1, + }) +} + +func TestBackgroundWorkerTypeValidation(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-type-validation-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-type-validation.php", handler, t) + assert.Contains(t, body, "INT_VAL:allowed") + assert.Contains(t, body, "INT_KEY:allowed") + assert.Contains(t, body, "NESTED:allowed") + assert.Contains(t, body, "OBJECT:blocked") + assert.Contains(t, body, "REFERENCE:blocked") + assert.Contains(t, body, "ENUM:allowed") + assert.Contains(t, body, "ENUM_RESTORED:match") + }, &testOptions{ + workerScript: "background-worker-type-validation.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerBinarySafe(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-binary-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-binary-safe.php", handler, t) + assert.Contains(t, body, "BINARY_LEN:11") + assert.Contains(t, body, "BINARY_CONTENT:"+hex.EncodeToString([]byte("hello\x00world"))) + assert.Contains(t, body, "UTF8:héllo wörld 🚀") + assert.Contains(t, body, "EMPTY_EXISTS:yes") + assert.Contains(t, body, "EMPTY_LEN:0") + }, &testOptions{ + workerScript: "background-worker-binary-safe.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerGetVarsMultiple(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-multi-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-multi.php", handler, t) + assert.Equal(t, "worker-a:NAME_WORKER_A=worker-a,worker-b:NAME_WORKER_B=worker-b", body) + }, &testOptions{ + workerScript: "background-worker-multi.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerEnumMissing(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-enum-missing-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-enum-missing.php", handler, t) + assert.Contains(t, body, "LogicException:") + assert.Contains(t, body, "SidekickOnlyEnum") + }, &testOptions{ + workerScript: "background-worker-enum-missing.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerCrashRestart(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-crash.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + // get_vars blocks - background worker crashes, restarts, then publishes + body, _ := testGet("http://example.com/background-worker-crash-starter.php", handler, t) + assert.Equal(t, "restarted", body) + }, &testOptions{ + workerScript: "background-worker-crash-starter.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerSignalingStream(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-stop-fd-entrypoint.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-stop-fd.php", handler, t) + assert.Equal(t, "stream", body) + }, &testOptions{ + workerScript: "background-worker-stop-fd.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerStopsOnWorkerRestart(t *testing.T) { + cwd, _ := os.Getwd() + entrypoint := cwd + "/testdata/background-worker-restart-entrypoint.php" + name := fmt.Sprintf("restart-background-worker-%d", time.Now().UnixNano()) + hash := fmt.Sprintf("%x", md5.Sum([]byte(name))) + runningMarker := filepath.Join(os.TempDir(), "background-worker-restart-running-"+hash) + t.Cleanup(func() { + _ = os.Remove(runningMarker) + }) + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-restart.php?name="+name, handler, t) + assert.Equal(t, "1", body) + require.FileExists(t, runningMarker) + + frankenphp.RestartWorkers() + + require.Eventually(t, func() bool { + _, err := os.Stat(runningMarker) + return errors.Is(err, os.ErrNotExist) + }, 5*time.Second, 50*time.Millisecond) + + body, _ = testGet("http://example.com/background-worker-restart.php?name="+name, handler, t) + assert.Equal(t, "2", body) + require.FileExists(t, runningMarker) + + frankenphp.RestartWorkers() + + require.Eventually(t, func() bool { + _, err := os.Stat(runningMarker) + return errors.Is(err, os.ErrNotExist) + }, 5*time.Second, 50*time.Millisecond) + }, &testOptions{ + workerScript: "background-worker-restart.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) +} + +func TestBackgroundWorkerSignalingStreamNonBackgroundWorker(t *testing.T) { + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-stop-fd-non-background-worker.php", handler, t) + assert.Equal(t, "thrown", body) + }, &testOptions{ + workerScript: "background-worker-stop-fd-non-background-worker.php", + nbWorkers: 1, + nbParallelRequests: 1, + }) +} + +func TestBackgroundWorkerMultipleEntrypoints(t *testing.T) { + cwd, _ := os.Getwd() + + t.Run("entrypoint-a", func(t *testing.T) { + entrypoint := cwd + "/testdata/background-worker-multi-entrypoint-a.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-multi-file.php", handler, t) + assert.Equal(t, "entrypoint-a:worker-from-a,entrypoint-a:worker-from-b", body) + }, &testOptions{ + workerScript: "background-worker-multi-file.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) + }) + + t.Run("entrypoint-b", func(t *testing.T) { + entrypoint := cwd + "/testdata/background-worker-multi-entrypoint-b.php" + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + body, _ := testGet("http://example.com/background-worker-multi-file.php", handler, t) + assert.Equal(t, "entrypoint-b:worker-from-a,entrypoint-b:worker-from-b", body) + }, &testOptions{ + workerScript: "background-worker-multi-file.php", + nbWorkers: 1, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithMaxThreads(50)}, + workerOpts: []frankenphp.WorkerOption{ + frankenphp.WithWorkerBackgroundEntrypoint(entrypoint, 0), + }, + }) + }) +} + func ExampleServeHTTP() { if err := frankenphp.Init(); err != nil { panic(err) diff --git a/options.go b/options.go index 9ba1f916f6..c745a8916a 100644 --- a/options.go +++ b/options.go @@ -49,6 +49,10 @@ type workerOpt struct { onThreadShutdown func(int) onServerStartup func() onServerShutdown func() + isBackgroundWorker bool + backgroundEntrypoint string // entrypoint file for background workers + backgroundMaxWorkers int // max lazy-started instances (catch-all only) + backgroundAutoStartNames []string } // WithContext sets the main context to use. @@ -85,6 +89,31 @@ func WithMetrics(m Metrics) Option { } } +// WithWorkerBackground marks this worker as a background (non-HTTP) worker. +// entrypoint is the PHP script file. maxWorkers sets a safety cap for +// lazy-started instances (0 = unlimited, only meaningful for catch-all workers). +func WithWorkerBackground(entrypoint string, maxWorkers int) WorkerOption { + return func(w *workerOpt) error { + w.isBackgroundWorker = true + w.backgroundEntrypoint = entrypoint + w.backgroundMaxWorkers = maxWorkers + + return nil + } +} + +// WithWorkerBackgroundEntrypoint declares a background worker entrypoint +// without marking this worker as a background worker. Used on HTTP workers +// that need to access background workers via get_vars. +func WithWorkerBackgroundEntrypoint(entrypoint string, maxWorkers int) WorkerOption { + return func(w *workerOpt) error { + w.backgroundEntrypoint = entrypoint + w.backgroundMaxWorkers = maxWorkers + + return nil + } +} + // WithWorkers configures the PHP workers to start func WithWorkers(name, fileName string, num int, options ...WorkerOption) Option { return func(o *opt) error { diff --git a/phpmainthread.go b/phpmainthread.go index ba3917e846..39ebf4b073 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -54,6 +54,9 @@ func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) return nil, err } + // initialize force-kill support for stuck background workers + C.frankenphp_init_force_kill(C.int(mainThread.maxThreads)) + // initialize all other threads phpThreads = make([]*phpThread, mainThread.maxThreads) phpThreads[0] = initialThread @@ -95,6 +98,7 @@ func drainPHPThreads() { } doneWG.Wait() + C.frankenphp_destroy_force_kill() mainThread.state.Set(state.Done) mainThread.state.WaitFor(state.Reserved) phpThreads = nil diff --git a/phpthread.go b/phpthread.go index fdf263717c..dbf4b7605d 100644 --- a/phpthread.go +++ b/phpthread.go @@ -31,6 +31,7 @@ type threadHandler interface { afterScriptExecution(exitStatus int) context() context.Context frankenPHPContext() *frankenPHPContext + drain() } func newPHPThread(threadIndex int) *phpThread { @@ -71,6 +72,7 @@ func (thread *phpThread) shutdown() { return } + thread.handler.drain() close(thread.drainChan) thread.state.WaitFor(state.Done) thread.drainChan = make(chan struct{}) diff --git a/requestoptions.go b/requestoptions.go index 42cc3cf7c0..6bfddf7801 100644 --- a/requestoptions.go +++ b/requestoptions.go @@ -164,3 +164,11 @@ func WithWorkerName(name string) RequestOption { return nil } } + +func withRequestBackgroundLookup(lookup *backgroundWorkerLookup) RequestOption { + return func(o *frankenPHPContext) error { + o.backgroundLookup = lookup + + return nil + } +} diff --git a/scaling.go b/scaling.go index 190c54d0df..abbe67c48a 100644 --- a/scaling.go +++ b/scaling.go @@ -75,7 +75,11 @@ func addWorkerThread(worker *worker) (*phpThread, error) { if thread == nil { return nil, ErrMaxThreadsReached } - convertToWorkerThread(thread, worker) + if worker.isBackgroundWorker { + convertToBackgroundWorkerThread(thread, worker) + } else { + convertToWorkerThread(thread, worker) + } thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Reserved) return thread, nil } diff --git a/testdata/background-worker-binary-entrypoint.php b/testdata/background-worker-binary-entrypoint.php new file mode 100644 index 0000000000..4692c15f0f --- /dev/null +++ b/testdata/background-worker-binary-entrypoint.php @@ -0,0 +1,12 @@ + "hello\x00world", + 'UTF8_TEST' => "héllo wörld 🚀", + 'EMPTY_VAL' => "", +]); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-binary-safe.php b/testdata/background-worker-binary-safe.php new file mode 100644 index 0000000000..ca3637ff23 --- /dev/null +++ b/testdata/background-worker-binary-safe.php @@ -0,0 +1,23 @@ +getMessage(); + return; + } + + $results = []; + + $bin = $vars['BINARY_TEST'] ?? 'NOT_SET'; + $results[] = 'BINARY_LEN:' . strlen($bin); + $results[] = 'BINARY_CONTENT:' . bin2hex($bin); + + $results[] = 'UTF8:' . ($vars['UTF8_TEST'] ?? 'NOT_SET'); + + $results[] = 'EMPTY_EXISTS:' . (array_key_exists('EMPTY_VAL', $vars) ? 'yes' : 'no'); + $results[] = 'EMPTY_LEN:' . strlen($vars['EMPTY_VAL'] ?? 'NOT_SET'); + + echo implode("\n", $results); +}); diff --git a/testdata/background-worker-crash-starter.php b/testdata/background-worker-crash-starter.php new file mode 100644 index 0000000000..59ace50998 --- /dev/null +++ b/testdata/background-worker-crash-starter.php @@ -0,0 +1,10 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-crash.php b/testdata/background-worker-crash.php new file mode 100644 index 0000000000..9b59e7b69f --- /dev/null +++ b/testdata/background-worker-crash.php @@ -0,0 +1,18 @@ + 'restarted']); + +while (!background_worker_should_stop(30)) { +} + +@unlink($marker); diff --git a/testdata/background-worker-dedup.php b/testdata/background-worker-dedup.php new file mode 100644 index 0000000000..874de3b203 --- /dev/null +++ b/testdata/background-worker-dedup.php @@ -0,0 +1,10 @@ + $name]); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-enum-missing-entrypoint.php b/testdata/background-worker-enum-missing-entrypoint.php new file mode 100644 index 0000000000..67903909e1 --- /dev/null +++ b/testdata/background-worker-enum-missing-entrypoint.php @@ -0,0 +1,13 @@ + SidekickOnlyEnum::Foo]); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-enum-missing.php b/testdata/background-worker-enum-missing.php new file mode 100644 index 0000000000..7088b5b73d --- /dev/null +++ b/testdata/background-worker-enum-missing.php @@ -0,0 +1,14 @@ +getMessage(); + } catch (\Throwable $e) { + echo 'other:' . get_class($e); + } +}); diff --git a/testdata/background-worker-helper.php b/testdata/background-worker-helper.php new file mode 100644 index 0000000000..fb9960c0c8 --- /dev/null +++ b/testdata/background-worker-helper.php @@ -0,0 +1,14 @@ + false, // timeout + false => true, // error (pipe closed) = stop + default => "stop\n" === fgets($signalingStream), + }; +} diff --git a/testdata/background-worker-identity.php b/testdata/background-worker-identity.php new file mode 100644 index 0000000000..dbae8400ae --- /dev/null +++ b/testdata/background-worker-identity.php @@ -0,0 +1,11 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-multi-entrypoint-a.php b/testdata/background-worker-multi-entrypoint-a.php new file mode 100644 index 0000000000..dc886f671a --- /dev/null +++ b/testdata/background-worker-multi-entrypoint-a.php @@ -0,0 +1,8 @@ + 'entrypoint-a', 'NAME' => $_SERVER['FRANKENPHP_WORKER_NAME'] ?? 'unknown']); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-multi-entrypoint-b.php b/testdata/background-worker-multi-entrypoint-b.php new file mode 100644 index 0000000000..ee23b075d3 --- /dev/null +++ b/testdata/background-worker-multi-entrypoint-b.php @@ -0,0 +1,8 @@ + 'entrypoint-b', 'NAME' => $_SERVER['FRANKENPHP_WORKER_NAME'] ?? 'unknown']); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-multi-entrypoint.php b/testdata/background-worker-multi-entrypoint.php new file mode 100644 index 0000000000..b63af382df --- /dev/null +++ b/testdata/background-worker-multi-entrypoint.php @@ -0,0 +1,10 @@ + $name]); + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-multi-file.php b/testdata/background-worker-multi-file.php new file mode 100644 index 0000000000..758809f9a1 --- /dev/null +++ b/testdata/background-worker-multi-file.php @@ -0,0 +1,8 @@ + $vars) { + foreach ($vars as $k => $v) { + $parts[] = "$name:$k=$v"; + } + } + echo implode(',', $parts); + } catch (\Throwable $e) { + echo 'ERROR:' . $e->getMessage(); + } +}); diff --git a/testdata/background-worker-no-entrypoint.php b/testdata/background-worker-no-entrypoint.php new file mode 100644 index 0000000000..715bc9037b --- /dev/null +++ b/testdata/background-worker-no-entrypoint.php @@ -0,0 +1,10 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-restart-entrypoint.php b/testdata/background-worker-restart-entrypoint.php new file mode 100644 index 0000000000..52783b6cfc --- /dev/null +++ b/testdata/background-worker-restart-entrypoint.php @@ -0,0 +1,19 @@ + (string) $generation]); + +try { + while (!background_worker_should_stop(30)) { + } +} finally { + @unlink($runningMarker); +} diff --git a/testdata/background-worker-restart.php b/testdata/background-worker-restart.php new file mode 100644 index 0000000000..2189573303 --- /dev/null +++ b/testdata/background-worker-restart.php @@ -0,0 +1,7 @@ + 'val']); + $results[] = 'NON_BACKGROUND:no_error'; + } catch (\RuntimeException $e) { + $results[] = 'NON_BACKGROUND:blocked'; + } + + // get_signaling_stream from non-background-worker context should throw + try { + frankenphp_worker_get_signaling_stream(); + $results[] = 'STREAM_NON_BACKGROUND:no_error'; + } catch (\RuntimeException $e) { + $results[] = 'STREAM_NON_BACKGROUND:blocked'; + } + + echo implode("\n", $results); +}); diff --git a/testdata/background-worker-start-twice.php b/testdata/background-worker-start-twice.php new file mode 100644 index 0000000000..56952a687c --- /dev/null +++ b/testdata/background-worker-start-twice.php @@ -0,0 +1,10 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-start.php b/testdata/background-worker-start.php new file mode 100644 index 0000000000..c4ea3fce1f --- /dev/null +++ b/testdata/background-worker-start.php @@ -0,0 +1,10 @@ +getMessage(); + } +}); diff --git a/testdata/background-worker-stop-fd-entrypoint.php b/testdata/background-worker-stop-fd-entrypoint.php new file mode 100644 index 0000000000..40bd3cfe99 --- /dev/null +++ b/testdata/background-worker-stop-fd-entrypoint.php @@ -0,0 +1,18 @@ + get_resource_type($stream), +]); + +$r = [$stream]; +$w = $e = []; +stream_select($r, $w, $e, 30); + +$signal = fgets($stream); + +frankenphp_worker_set_vars([ + 'STREAM_TYPE' => get_resource_type($stream), + 'SIGNAL' => $signal, +]); diff --git a/testdata/background-worker-stop-fd-non-background-worker.php b/testdata/background-worker-stop-fd-non-background-worker.php new file mode 100644 index 0000000000..50af635dd9 --- /dev/null +++ b/testdata/background-worker-stop-fd-non-background-worker.php @@ -0,0 +1,10 @@ + 123]); + $results[] = 'INT_VAL:allowed'; +} catch (\Throwable $e) { + $results[] = 'INT_VAL:blocked'; +} + +// int keys allowed +try { + frankenphp_worker_set_vars([0 => 'val']); + $results[] = 'INT_KEY:allowed'; +} catch (\Throwable $e) { + $results[] = 'INT_KEY:blocked'; +} + +// nested arrays allowed +try { + frankenphp_worker_set_vars(['nested' => ['a' => 1, 'b' => [true, null]]]); + $results[] = 'NESTED:allowed'; +} catch (\Throwable $e) { + $results[] = 'NESTED:blocked'; +} + +// objects rejected +try { + frankenphp_worker_set_vars(['KEY' => new \stdClass()]); + $results[] = 'OBJECT:allowed'; +} catch (\ValueError $e) { + $results[] = 'OBJECT:blocked'; +} + +// references rejected +try { + $ref = 'hello'; + frankenphp_worker_set_vars(['KEY' => &$ref]); + $results[] = 'REFERENCE:allowed'; +} catch (\ValueError $e) { + $results[] = 'REFERENCE:blocked'; +} + +// enums allowed - final set_vars with all results +try { + $results[] = 'ENUM:allowed'; // if we get here, the call below will confirm it + frankenphp_worker_set_vars(['status' => TestStatus::Active, 'RESULTS' => implode(',', $results)]); +} catch (\Throwable $e) { + $results[array_key_last($results)] = 'ENUM:blocked'; + frankenphp_worker_set_vars(['RESULTS' => implode(',', $results)]); +} + +while (!background_worker_should_stop(30)) { +} diff --git a/testdata/background-worker-type-validation.php b/testdata/background-worker-type-validation.php new file mode 100644 index 0000000000..7ed892f3bb --- /dev/null +++ b/testdata/background-worker-type-validation.php @@ -0,0 +1,23 @@ + $name]); + +while (!background_worker_should_stop(30)) { +} diff --git a/threadbackgroundworker.go b/threadbackgroundworker.go new file mode 100644 index 0000000000..8e75d060fc --- /dev/null +++ b/threadbackgroundworker.go @@ -0,0 +1,110 @@ +package frankenphp + +// #include "frankenphp.h" +import "C" +import ( + "context" + "log/slog" + "path/filepath" + + "github.com/dunglas/frankenphp/internal/state" +) + +// backgroundWorkerThread handles background worker scripts. +// Embeds workerThread for shared functionality (failure counting, metrics, +// dummy context management) and overrides lifecycle methods. +type backgroundWorkerThread struct { + workerThread +} + +func convertToBackgroundWorkerThread(thread *phpThread, worker *worker) { + handler := &backgroundWorkerThread{ + workerThread: workerThread{ + state: thread.state, + thread: thread, + worker: worker, + }, + } + thread.setHandler(handler) + worker.attachThread(thread) +} + +func (handler *backgroundWorkerThread) name() string { + return "Background Worker PHP Thread - " + handler.worker.fileName +} + +func (handler *backgroundWorkerThread) drain() { + if fd := handler.worker.backgroundStopFdWrite.Load(); fd >= 0 { + C.frankenphp_worker_write_stop_fd(C.int(fd)) + } +} + +func (handler *backgroundWorkerThread) beforeScriptExecution() string { + switch handler.state.Get() { + case state.TransitionRequested: + if handler.worker.onThreadShutdown != nil { + handler.worker.onThreadShutdown(handler.thread.threadIndex) + } + handler.worker.detachThread(handler.thread) + return handler.thread.transitionToNewHandler() + case state.Restarting: + if handler.worker.onThreadShutdown != nil { + handler.worker.onThreadShutdown(handler.thread.threadIndex) + } + handler.state.Set(state.Yielding) + handler.state.WaitFor(state.Ready, state.ShuttingDown) + return handler.beforeScriptExecution() + case state.Ready, state.TransitionComplete: + handler.thread.updateContext(true) + if handler.worker.onThreadReady != nil { + handler.worker.onThreadReady(handler.thread.threadIndex) + } + + handler.setupScript() + + return handler.worker.fileName + case state.ShuttingDown: + if handler.worker.onThreadShutdown != nil { + handler.worker.onThreadShutdown(handler.thread.threadIndex) + } + handler.worker.detachThread(handler.thread) + return "" + } + + panic("unexpected state: " + handler.state.Name()) +} + +func (handler *backgroundWorkerThread) setupScript() { + metrics.StartWorker(handler.worker.name) + + opts := append([]RequestOption(nil), handler.worker.requestOptions...) + C.frankenphp_set_worker_name(handler.thread.pinCString(handler.worker.name), C._Bool(true)) + handler.worker.backgroundStopFdWrite.Store(int32(C.frankenphp_worker_get_stop_fd_write())) + + fc, err := newDummyContext( + filepath.Base(handler.worker.fileName), + opts..., + ) + if err != nil { + panic(err) + } + + ctx := context.WithValue(globalCtx, contextKey, fc) + + fc.worker = handler.worker + handler.dummyFrankenPHPContext = fc + handler.dummyContext = ctx + handler.isBootingScript = true + + if globalLogger.Enabled(ctx, slog.LevelDebug) { + globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", handler.worker.name), slog.Int("thread", handler.thread.threadIndex)) + } + + handler.thread.state.Set(state.Ready) + fc.scriptFilename = handler.worker.fileName +} + +func (handler *backgroundWorkerThread) afterScriptExecution(exitStatus int) { + handler.worker.backgroundStopFdWrite.Store(-1) + tearDownWorkerScript(&handler.workerThread, exitStatus) +} diff --git a/threadinactive.go b/threadinactive.go index b5d11fcdfc..a980c20195 100644 --- a/threadinactive.go +++ b/threadinactive.go @@ -58,3 +58,6 @@ func (handler *inactiveThread) context() context.Context { func (handler *inactiveThread) name() string { return "Inactive PHP Thread" } + +func (handler *inactiveThread) drain() { +} diff --git a/threadregular.go b/threadregular.go index 938d98350c..89b996f2f3 100644 --- a/threadregular.go +++ b/threadregular.go @@ -75,6 +75,9 @@ func (handler *regularThread) name() string { return "Regular PHP Thread" } +func (handler *regularThread) drain() { +} + func (handler *regularThread) waitForRequest() string { handler.state.MarkAsWaiting(true) diff --git a/threadtasks_test.go b/threadtasks_test.go index 2e74b12e93..b63820fcc9 100644 --- a/threadtasks_test.go +++ b/threadtasks_test.go @@ -79,6 +79,9 @@ func (handler *taskThread) name() string { return "Task PHP Thread" } +func (handler *taskThread) drain() { +} + func (handler *taskThread) waitForTasks() { for { select { diff --git a/threadworker.go b/threadworker.go index a0984afab7..7fabe12f61 100644 --- a/threadworker.go +++ b/threadworker.go @@ -98,13 +98,18 @@ func (handler *workerThread) name() string { return "Worker PHP Thread - " + handler.worker.fileName } +func (handler *workerThread) drain() { +} + func setupWorkerScript(handler *workerThread, worker *worker) { metrics.StartWorker(worker.name) - // Create a dummy request to set up the worker + opts := append([]RequestOption(nil), worker.requestOptions...) + C.frankenphp_set_worker_name(handler.thread.pinCString(worker.name), C._Bool(false)) + fc, err := newDummyContext( filepath.Base(worker.fileName), - worker.requestOptions..., + opts..., ) if err != nil { panic(err) @@ -120,6 +125,17 @@ func setupWorkerScript(handler *workerThread, worker *worker) { if globalLogger.Enabled(ctx, slog.LevelDebug) { globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) } + +} + +func (handler *workerThread) markBackgroundReady() { + if !handler.isBootingScript { + return + } + + handler.failureCount = 0 + handler.isBootingScript = false + metrics.ReadyWorker(handler.worker.name) } func tearDownWorkerScript(handler *workerThread, exitStatus int) { diff --git a/worker.go b/worker.go index c97cc4a3a7..df8b19fe92 100644 --- a/worker.go +++ b/worker.go @@ -16,6 +16,10 @@ import ( "github.com/dunglas/frankenphp/internal/state" ) +// backgroundWorkerGracePeriod is the time background workers have to stop +// gracefully after receiving the stop signal before being force-killed. +const backgroundWorkerGracePeriod = 5 * time.Second + // represents a worker script and can have many threads assigned to it type worker struct { mercureContext @@ -33,6 +37,11 @@ type worker struct { onThreadReady func(int) onThreadShutdown func(int) queuedRequests atomic.Int32 + isBackgroundWorker bool + backgroundLookup *backgroundWorkerLookup + backgroundRegistry *backgroundWorkerRegistry + backgroundWorker *backgroundWorkerState + backgroundStopFdWrite atomic.Int32 // write end of the stop pipe, -1 if not set } var ( @@ -71,12 +80,25 @@ func initWorkers(opt []workerOpt) error { } } + // Build background worker lookup from worker options + bgLookup := buildBackgroundWorkerLookup(workers, opt) + globalBackgroundLookup = bgLookup + if bgLookup != nil { + for _, w := range workers { + w.backgroundLookup = bgLookup + } + } + startupFailChan = make(chan error, totalThreadsToStart) for _, w := range workers { for i := 0; i < w.num; i++ { thread := getInactivePHPThread() - convertToWorkerThread(thread, w) + if w.isBackgroundWorker { + convertToBackgroundWorkerThread(thread, w) + } else { + convertToWorkerThread(thread, w) + } workersReady.Go(func() { thread.state.WaitFor(state.Ready, state.ShuttingDown, state.Done) @@ -148,8 +170,11 @@ func newWorker(o workerOpt) (*worker, error) { maxConsecutiveFailures: o.maxConsecutiveFailures, onThreadReady: o.onThreadReady, onThreadShutdown: o.onThreadShutdown, + isBackgroundWorker: o.isBackgroundWorker, } + w.backgroundStopFdWrite.Store(-1) + w.configureMercure(&o) w.requestOptions = append( @@ -162,11 +187,23 @@ func newWorker(o workerOpt) (*worker, error) { o.extensionWorkers.internalWorker = w } + // Reserve background worker state in the registry during init + if w.isBackgroundWorker && w.backgroundRegistry != nil { + bgw, _, err := w.backgroundRegistry.reserve(w.name) + if err != nil { + return nil, fmt.Errorf("failed to reserve background worker %q: %w", w.name, err) + } + w.backgroundWorker = bgw + } + return w, nil } // EXPERIMENTAL: DrainWorkers finishes all worker scripts before a graceful shutdown func DrainWorkers() { + scalingMu.Lock() + defer scalingMu.Unlock() + _ = drainWorkerThreads() } @@ -174,21 +211,34 @@ func drainWorkerThreads() []*phpThread { var ( ready sync.WaitGroup drainedThreads []*phpThread + bgThreads []*phpThread + bgWorkers []*worker ) for _, worker := range workers { worker.threadMutex.RLock() - ready.Add(len(worker.threads)) + threads := append([]*phpThread(nil), worker.threads...) + worker.threadMutex.RUnlock() - for _, thread := range worker.threads { - if !thread.state.RequestSafeStateChange(state.Restarting) { - ready.Done() + for _, thread := range threads { + if worker.isBackgroundWorker { + // Signal background workers to stop via the signaling stream + if !thread.state.RequestSafeStateChange(state.ShuttingDown) { + continue + } + thread.handler.drain() + close(thread.drainChan) + bgThreads = append(bgThreads, thread) + bgWorkers = append(bgWorkers, worker) + continue + } - // no state change allowed == thread is shutting down - // we'll proceed to restart all other threads anyway + if !thread.state.RequestSafeStateChange(state.Restarting) { continue } + ready.Add(1) + thread.handler.drain() close(thread.drainChan) drainedThreads = append(drainedThreads, thread) @@ -197,12 +247,65 @@ func drainWorkerThreads() []*phpThread { ready.Done() }(thread) } - - worker.threadMutex.RUnlock() } ready.Wait() + // Wait for background workers with a grace period. + // Well-written workers check the signaling stream and stop promptly. + // Stuck workers (e.g., blocking C calls) are abandoned after the timeout; + // new threads are created on restart, and the old thread exits when the + // blocking call eventually returns. + if len(bgThreads) > 0 { + bgDone := make(chan struct{}) + go func() { + for _, thread := range bgThreads { + thread.state.WaitFor(state.Done) + } + close(bgDone) + }() + + select { + case <-bgDone: + // all stopped gracefully + case <-time.After(backgroundWorkerGracePeriod): + // Best-effort force-kill: arm PHP's max_execution_time timer on + // stuck threads. Linux ZTS: arms PHP's timer. Windows: interrupts + // I/O and alertable waits. Other platforms: no-op. + // Safe because after 5s, stuck threads are guaranteed to be in C code. + for _, thread := range bgThreads { + if !thread.state.Is(state.Done) { + C.frankenphp_force_kill_thread(C.uintptr_t(thread.threadIndex)) + } + } + globalLogger.Warn("background workers did not stop within grace period, force-killing stuck threads") + } + + // Clean up registry entries for stopped workers + stopped := make(map[*worker]struct{}, len(bgWorkers)) + for _, w := range bgWorkers { + if w.backgroundRegistry != nil && w.backgroundWorker != nil { + w.backgroundRegistry.remove(w.name, w.backgroundWorker) + } + stopped[w] = struct{}{} + } + filtered := workers[:0] + for _, w := range workers { + if _, ok := stopped[w]; !ok { + filtered = append(filtered, w) + } + } + workers = filtered + + // Reset drained background threads for restart + for _, thread := range bgThreads { + thread.drainChan = make(chan struct{}) + if mainThread.state.Is(state.Ready) { + thread.state.Set(state.Reserved) + } + } + } + return drainedThreads } From c794caf4df3636e0a21828c05b52870a0c54df1e Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Wed, 25 Mar 2026 10:48:38 +0100 Subject: [PATCH 2/2] refactor: scope background workers per php_server block Each php_server block gets its own isolated background worker scope. Workers declared in one block cannot be accessed from another. Global workers (in the frankenphp block) have their own scope. This allows multiple php_server blocks to use the same worker names without conflict, and prevents cross-site access to background workers. --- background_worker.go | 48 +++++++++++++++++++++++++++----------- background_worker_test.go | 2 +- caddy/module.go | 6 +++++ context.go | 16 ++++++------- docs/background-workers.md | 5 ++++ options.go | 19 +++++++++++---- requestoptions.go | 6 +++-- worker.go | 12 ++++++---- 8 files changed, 81 insertions(+), 33 deletions(-) diff --git a/background_worker.go b/background_worker.go index df6154698e..cf7b0f3a7e 100644 --- a/background_worker.go +++ b/background_worker.go @@ -15,9 +15,10 @@ import ( // defaultMaxBackgroundWorkers is the default safety cap for catch-all background workers. const defaultMaxBackgroundWorkers = 16 -// globalBackgroundLookup is set during initWorkers and provides access -// to background workers from any thread (including non-worker requests). -var globalBackgroundLookup *backgroundWorkerLookup +// backgroundLookups maps scope IDs to their background worker lookups. +// Each php_server block gets its own scope. The global frankenphp block +// uses the empty string as its scope ID. +var backgroundLookups map[string]*backgroundWorkerLookup // backgroundWorkerLookup maps worker names to registries, enabling multiple entrypoint files. type backgroundWorkerLookup struct { @@ -114,28 +115,35 @@ func (registry *backgroundWorkerRegistry) SetMaxWorkers(max int) { registry.maxWorkers = max } -// buildBackgroundWorkerLookup constructs a backgroundWorkerLookup from worker -// options. Called during initWorkers — the lookup is then assigned to all workers. -func buildBackgroundWorkerLookup(workers []*worker, opts []workerOpt) *backgroundWorkerLookup { - var lookup *backgroundWorkerLookup - registries := make(map[string]*backgroundWorkerRegistry) +// buildBackgroundWorkerLookups constructs per-scope background worker lookups +// from worker options. Each scope (php_server block) gets its own lookup. +func buildBackgroundWorkerLookups(workers []*worker, opts []workerOpt) map[string]*backgroundWorkerLookup { + lookups := make(map[string]*backgroundWorkerLookup) + // per-scope registries: scope -> entrypoint -> registry + scopeRegistries := make(map[string]map[string]*backgroundWorkerRegistry) for i, o := range opts { if o.backgroundEntrypoint == "" { continue } - if lookup == nil { + scope := o.backgroundScope + lookup, ok := lookups[scope] + if !ok { lookup = newBackgroundWorkerLookup() + lookups[scope] = lookup + scopeRegistries[scope] = make(map[string]*backgroundWorkerRegistry) } entrypoint := o.backgroundEntrypoint - registry, ok := registries[entrypoint] + registry, ok := scopeRegistries[scope][entrypoint] if !ok { registry = newBackgroundWorkerRegistry(entrypoint) - registries[entrypoint] = registry + scopeRegistries[scope][entrypoint] = registry } + workers[i].backgroundScope = scope + if !o.isBackgroundWorker { // Non-background worker declaring a background entrypoint (catch-all) maxW := o.backgroundMaxWorkers @@ -168,7 +176,11 @@ func buildBackgroundWorkerLookup(workers []*worker, opts []workerOpt) *backgroun w.backgroundRegistry = registry } - return lookup + if len(lookups) == 0 { + return nil + } + + return lookups } func (registry *backgroundWorkerRegistry) reserve(name string) (*backgroundWorkerState, bool, error) { @@ -278,8 +290,18 @@ func getLookup(thread *phpThread) *backgroundWorkerLookup { if handler, ok := thread.handler.(*backgroundWorkerThread); ok && handler.worker.backgroundLookup != nil { return handler.worker.backgroundLookup } + // Non-worker requests: resolve scope from context + if fc, ok := fromContext(thread.context()); ok && fc.backgroundScope != "" { + if backgroundLookups != nil { + return backgroundLookups[fc.backgroundScope] + } + } + // Fall back to global scope + if backgroundLookups != nil { + return backgroundLookups[""] + } - return globalBackgroundLookup + return nil } // go_frankenphp_worker_get_vars starts background workers if needed, waits for them diff --git a/background_worker_test.go b/background_worker_test.go index 5e226919cf..b77f8db9bd 100644 --- a/background_worker_test.go +++ b/background_worker_test.go @@ -48,7 +48,7 @@ func (m *backgroundWorkerTestMetrics) DequeuedRequest() {} func TestStartBackgroundWorkerFailureIsRetryable(t *testing.T) { lookup := newBackgroundWorkerLookupWithCatchAll(testDataPath + "/background-worker-with-argv.php") - globalBackgroundLookup = lookup + backgroundLookups = map[string]*backgroundWorkerLookup{"": lookup} thread := newPHPThread(0) thread.state.Set(state.Ready) thread.handler = &workerThread{ diff --git a/caddy/module.go b/caddy/module.go index 1f5c617957..fdb9ce7094 100644 --- a/caddy/module.go +++ b/caddy/module.go @@ -51,6 +51,7 @@ type FrankenPHPModule struct { preparedEnvNeedsReplacement bool logger *slog.Logger requestOptions []frankenphp.RequestOption + backgroundScope string } // CaddyModule returns the Caddy module information. @@ -78,6 +79,9 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error { f.assignMercureHub(ctx) + // Each php_server block gets its own scope for background worker isolation + f.backgroundScope = fmt.Sprintf("php_server_%p", f) + loggerOpt := frankenphp.WithRequestLogger(f.logger) for i, wc := range f.Workers { // make the file path absolute from the public directory @@ -92,6 +96,7 @@ func (f *FrankenPHPModule) Provision(ctx caddy.Context) error { } wc.requestOptions = append(wc.requestOptions, loggerOpt) + wc.options = append(wc.options, frankenphp.WithWorkerBackgroundScope(f.backgroundScope)) f.Workers[i] = wc } @@ -241,6 +246,7 @@ func (f *FrankenPHPModule) ServeHTTP(w http.ResponseWriter, r *http.Request, _ c opts, frankenphp.WithOriginalRequest(new(ctx.Value(caddyhttp.OriginalRequestCtxKey).(http.Request))), frankenphp.WithWorkerName(workerName), + frankenphp.WithRequestBackgroundScope(f.backgroundScope), )..., ) diff --git a/context.go b/context.go index 95bcf64954..d87e7e29d8 100644 --- a/context.go +++ b/context.go @@ -16,14 +16,14 @@ import ( type frankenPHPContext struct { mercureContext - documentRoot string - splitPath []string - env PreparedEnv - logger *slog.Logger - request *http.Request - originalRequest *http.Request - worker *worker - backgroundLookup *backgroundWorkerLookup + documentRoot string + splitPath []string + env PreparedEnv + logger *slog.Logger + request *http.Request + originalRequest *http.Request + worker *worker + backgroundScope string docURI string pathInfo string diff --git a/docs/background-workers.md b/docs/background-workers.md index f445c53e9c..b18b13fe4b 100644 --- a/docs/background-workers.md +++ b/docs/background-workers.md @@ -46,7 +46,12 @@ They don't compete with HTTP auto-scaling. For catch-all workers, `max_threads` the reservation (default 16). Named workers with `num 0` (default) are lazy-started but still reserve 1 thread (`max_threads` defaults to `max(num, 1)`). +### Scope isolation + Each `php_server` block has its own isolated background worker scope. +Workers declared in one `php_server` block cannot be accessed from another. +Workers declared in the global `frankenphp` block have their own separate scope. +This means two `php_server` blocks can use the same worker name without conflict. ## PHP API diff --git a/options.go b/options.go index c745a8916a..5abd12df0a 100644 --- a/options.go +++ b/options.go @@ -49,10 +49,10 @@ type workerOpt struct { onThreadShutdown func(int) onServerStartup func() onServerShutdown func() - isBackgroundWorker bool - backgroundEntrypoint string // entrypoint file for background workers - backgroundMaxWorkers int // max lazy-started instances (catch-all only) - backgroundAutoStartNames []string + isBackgroundWorker bool + backgroundEntrypoint string // entrypoint file for background workers + backgroundMaxWorkers int // max lazy-started instances (catch-all only) + backgroundScope string // scope ID for background worker isolation } // WithContext sets the main context to use. @@ -114,6 +114,17 @@ func WithWorkerBackgroundEntrypoint(entrypoint string, maxWorkers int) WorkerOpt } } +// WithWorkerBackgroundScope sets the scope ID for background worker isolation. +// Workers in the same scope share a background worker lookup. Each php_server +// block gets its own scope. The global frankenphp block uses scope "". +func WithWorkerBackgroundScope(scope string) WorkerOption { + return func(w *workerOpt) error { + w.backgroundScope = scope + + return nil + } +} + // WithWorkers configures the PHP workers to start func WithWorkers(name, fileName string, num int, options ...WorkerOption) Option { return func(o *opt) error { diff --git a/requestoptions.go b/requestoptions.go index 6bfddf7801..bc869eb68c 100644 --- a/requestoptions.go +++ b/requestoptions.go @@ -165,9 +165,11 @@ func WithWorkerName(name string) RequestOption { } } -func withRequestBackgroundLookup(lookup *backgroundWorkerLookup) RequestOption { +// WithRequestBackgroundScope sets the background worker scope for this request. +// Requests in the same scope can access the same background workers. +func WithRequestBackgroundScope(scope string) RequestOption { return func(o *frankenPHPContext) error { - o.backgroundLookup = lookup + o.backgroundScope = scope return nil } diff --git a/worker.go b/worker.go index df8b19fe92..12ecc626f8 100644 --- a/worker.go +++ b/worker.go @@ -38,6 +38,7 @@ type worker struct { onThreadShutdown func(int) queuedRequests atomic.Int32 isBackgroundWorker bool + backgroundScope string backgroundLookup *backgroundWorkerLookup backgroundRegistry *backgroundWorkerRegistry backgroundWorker *backgroundWorkerState @@ -80,12 +81,13 @@ func initWorkers(opt []workerOpt) error { } } - // Build background worker lookup from worker options - bgLookup := buildBackgroundWorkerLookup(workers, opt) - globalBackgroundLookup = bgLookup - if bgLookup != nil { + // Build per-scope background worker lookups + backgroundLookups = buildBackgroundWorkerLookups(workers, opt) + if backgroundLookups != nil { for _, w := range workers { - w.backgroundLookup = bgLookup + if lookup := backgroundLookups[w.backgroundScope]; lookup != nil { + w.backgroundLookup = lookup + } } }