-
Notifications
You must be signed in to change notification settings - Fork 442
feat: background workers = non-HTTP workers with shared state #2287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
nicolas-grekas
wants to merge
3
commits into
php:main
Choose a base branch
from
nicolas-grekas:sidekicks
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+2,602
−29
Open
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Some comments aren't visible on the classic Files Changed page.
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,341 @@ | ||
| package frankenphp | ||
|
|
||
| // #include <stdint.h> | ||
| // #include "frankenphp.h" | ||
| import "C" | ||
| import ( | ||
| "fmt" | ||
| "log/slog" | ||
| "sync" | ||
| "sync/atomic" | ||
| "time" | ||
| "unsafe" | ||
| ) | ||
|
|
||
| // BackgroundWorkerLookup maps worker names to registries, enabling multiple entrypoint files. | ||
| type BackgroundWorkerLookup struct { | ||
| byName map[string]*BackgroundWorkerRegistry | ||
| catchAll *BackgroundWorkerRegistry | ||
| } | ||
|
|
||
| func NewBackgroundWorkerLookup() *BackgroundWorkerLookup { | ||
| return &BackgroundWorkerLookup{ | ||
| byName: make(map[string]*BackgroundWorkerRegistry), | ||
| } | ||
| } | ||
|
|
||
| // NewBackgroundWorkerLookupWithCatchAll is a convenience constructor for tests. | ||
| func NewBackgroundWorkerLookupWithCatchAll(entrypoint string) *BackgroundWorkerLookup { | ||
| l := NewBackgroundWorkerLookup() | ||
| l.catchAll = NewBackgroundWorkerRegistry(entrypoint) | ||
| return l | ||
| } | ||
|
|
||
| func (l *BackgroundWorkerLookup) AddNamed(name string, registry *BackgroundWorkerRegistry) { | ||
| l.byName[name] = registry | ||
| } | ||
|
|
||
| func (l *BackgroundWorkerLookup) SetCatchAll(registry *BackgroundWorkerRegistry) { | ||
| l.catchAll = registry | ||
| } | ||
|
|
||
| // Resolve returns the registry for the given name, falling back to catch-all. | ||
| func (l *BackgroundWorkerLookup) Resolve(name string) *BackgroundWorkerRegistry { | ||
| if r, ok := l.byName[name]; ok { | ||
| return r | ||
| } | ||
| return l.catchAll | ||
| } | ||
|
|
||
| // StartAutoWorkers iterates unique registries and calls startAutoWorkers on each. | ||
| // Deprecated: auto-start is now handled via initWorkers. Kept for test compatibility. | ||
| func (l *BackgroundWorkerLookup) StartAutoWorkers() error { | ||
| return nil | ||
| } | ||
|
|
||
| type backgroundWorkerState struct { | ||
| varsPtr unsafe.Pointer // *C.HashTable, persistent, managed by C | ||
| mu sync.RWMutex | ||
| varsVersion atomic.Uint64 // incremented on each set_vars call | ||
| ready chan struct{} | ||
| readyOnce sync.Once | ||
| } | ||
|
|
||
| type BackgroundWorkerRegistry struct { | ||
| entrypoint string | ||
| num int // threads per background worker (0 = lazy-start with 1 thread) | ||
| maxWorkers int // max lazy-started instances (0 = unlimited) | ||
| autoStartNames []string // names to start at boot when num >= 1 | ||
| mu sync.Mutex | ||
| workers map[string]*backgroundWorkerState | ||
| } | ||
|
|
||
| func NewBackgroundWorkerRegistry(entrypoint string) *BackgroundWorkerRegistry { | ||
| return &BackgroundWorkerRegistry{ | ||
| entrypoint: entrypoint, | ||
| workers: make(map[string]*backgroundWorkerState), | ||
| } | ||
| } | ||
nicolas-grekas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| func (registry *BackgroundWorkerRegistry) Entrypoint() string { | ||
| return registry.entrypoint | ||
| } | ||
|
|
||
| func (registry *BackgroundWorkerRegistry) Num() int { | ||
| if registry.num <= 0 { | ||
| return 0 | ||
| } | ||
| return registry.num | ||
| } | ||
|
|
||
| func (registry *BackgroundWorkerRegistry) MaxThreads() int { | ||
| if registry.num > 0 { | ||
| return registry.num | ||
| } | ||
| return 1 | ||
| } | ||
|
|
||
| func (registry *BackgroundWorkerRegistry) SetNum(num int) { | ||
| registry.num = num | ||
| } | ||
|
|
||
| func (registry *BackgroundWorkerRegistry) AddAutoStartNames(names ...string) { | ||
| registry.autoStartNames = append(registry.autoStartNames, names...) | ||
| } | ||
|
|
||
| func (registry *BackgroundWorkerRegistry) SetMaxWorkers(max int) { | ||
| registry.maxWorkers = max | ||
| } | ||
|
|
||
| func (registry *BackgroundWorkerRegistry) reserve(name string) (*backgroundWorkerState, bool, error) { | ||
| registry.mu.Lock() | ||
| defer registry.mu.Unlock() | ||
|
|
||
| if bgw := registry.workers[name]; bgw != nil { | ||
| return bgw, true, nil | ||
| } | ||
|
|
||
| if registry.maxWorkers > 0 && len(registry.workers) >= registry.maxWorkers { | ||
| return nil, false, fmt.Errorf("cannot start background worker %q: limit of %d reached - increase max_threads on the catch-all background worker or declare it as a named worker", name, registry.maxWorkers) | ||
| } | ||
|
|
||
| bgw := &backgroundWorkerState{ | ||
| ready: make(chan struct{}), | ||
| } | ||
| registry.workers[name] = bgw | ||
|
|
||
| return bgw, false, nil | ||
| } | ||
|
|
||
| func (registry *BackgroundWorkerRegistry) remove(name string, bgw *backgroundWorkerState) { | ||
| registry.mu.Lock() | ||
| defer registry.mu.Unlock() | ||
|
|
||
| if registry.workers[name] == bgw { | ||
| delete(registry.workers, name) | ||
| } | ||
| } | ||
|
|
||
| func startBackgroundWorker(thread *phpThread, bgWorkerName string) error { | ||
| if bgWorkerName == "" { | ||
| return fmt.Errorf("background worker name must not be empty") | ||
| } | ||
|
|
||
| lookup := getLookup(thread) | ||
| if lookup == nil { | ||
| return fmt.Errorf("no background worker configured in this php_server") | ||
| } | ||
|
|
||
| registry := lookup.Resolve(bgWorkerName) | ||
| if registry == nil || registry.entrypoint == "" { | ||
| return fmt.Errorf("no background worker configured in this php_server") | ||
| } | ||
|
|
||
| return startBackgroundWorkerWithRegistry(registry, bgWorkerName) | ||
| } | ||
|
|
||
| func startBackgroundWorkerWithRegistry(registry *BackgroundWorkerRegistry, bgWorkerName string) error { | ||
nicolas-grekas marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| bgw, exists, err := registry.reserve(bgWorkerName) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| if exists { | ||
| return nil | ||
| } | ||
|
|
||
| numThreads := registry.MaxThreads() | ||
|
|
||
| worker, err := newWorker(workerOpt{ | ||
| name: bgWorkerName, | ||
| fileName: registry.entrypoint, | ||
| num: numThreads, | ||
| env: PrepareEnv(nil), | ||
| watch: []string{}, | ||
| maxConsecutiveFailures: -1, | ||
| }) | ||
| if err != nil { | ||
| registry.remove(bgWorkerName, bgw) | ||
|
|
||
| return fmt.Errorf("failed to create background worker: %w", err) | ||
| } | ||
|
|
||
| worker.isBackgroundWorker = true | ||
| worker.backgroundWorker = bgw | ||
| worker.backgroundRegistry = registry | ||
|
|
||
| for i := 0; i < numThreads; i++ { | ||
| bgWorkerThread := getInactivePHPThread() | ||
| if bgWorkerThread == nil { | ||
| if i == 0 { | ||
| registry.remove(bgWorkerName, bgw) | ||
| } | ||
|
|
||
| return fmt.Errorf("no available PHP thread for background worker (increase max_threads)") | ||
| } | ||
|
|
||
| scalingMu.Lock() | ||
| workers = append(workers, worker) | ||
| scalingMu.Unlock() | ||
|
|
||
| convertToBackgroundWorkerThread(bgWorkerThread, worker) | ||
| } | ||
|
|
||
| if globalLogger.Enabled(globalCtx, slog.LevelInfo) { | ||
| globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started", slog.String("name", bgWorkerName), slog.Int("threads", numThreads)) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func getLookup(thread *phpThread) *BackgroundWorkerLookup { | ||
| if handler, ok := thread.handler.(*workerThread); ok && handler.worker.backgroundLookup != nil { | ||
| return handler.worker.backgroundLookup | ||
| } | ||
| if fc, ok := fromContext(thread.context()); ok { | ||
| return fc.backgroundLookup | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // go_frankenphp_worker_get_vars starts background workers if needed, waits for them | ||
| // to be ready, takes read locks, copies vars via C helper, and releases locks. | ||
| // All locking/unlocking happens within this single Go call. | ||
| // | ||
| // callerVersions/outVersions: if callerVersions is non-nil and all versions match, | ||
| // the copy is skipped entirely (returns 1). outVersions receives current versions. | ||
| // | ||
| //export go_frankenphp_worker_get_vars | ||
| func go_frankenphp_worker_get_vars(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int, timeoutMs C.int, returnValue *C.zval, callerVersions *C.uint64_t, outVersions *C.uint64_t) *C.char { | ||
| thread := phpThreads[threadIndex] | ||
| lookup := getLookup(thread) | ||
| if lookup == nil { | ||
| return C.CString("no background worker configured in this php_server") | ||
| } | ||
|
|
||
| n := int(nameCount) | ||
| nameSlice := unsafe.Slice(names, n) | ||
| nameLenSlice := unsafe.Slice(nameLens, n) | ||
|
|
||
| sks := make([]*backgroundWorkerState, n) | ||
| goNames := make([]string, n) | ||
| for i := 0; i < n; i++ { | ||
| goNames[i] = C.GoStringN(nameSlice[i], C.int(nameLenSlice[i])) | ||
|
|
||
| // Start background worker if not already running | ||
| if err := startBackgroundWorker(thread, goNames[i]); err != nil { | ||
| return C.CString(err.Error()) | ||
| } | ||
|
|
||
| registry := lookup.Resolve(goNames[i]) | ||
| if registry == nil { | ||
| return C.CString("background worker not found: " + goNames[i]) | ||
| } | ||
| registry.mu.Lock() | ||
| sks[i] = registry.workers[goNames[i]] | ||
| registry.mu.Unlock() | ||
| if sks[i] == nil { | ||
| return C.CString("background worker not found: " + goNames[i]) | ||
| } | ||
| } | ||
|
|
||
| // Wait for all workers to be ready | ||
| timeout := time.Duration(timeoutMs) * time.Millisecond | ||
| timer := time.NewTimer(timeout) | ||
| defer timer.Stop() | ||
| for i, sk := range sks { | ||
| select { | ||
| case <-sk.ready: | ||
| // background worker has called set_vars | ||
| case <-timer.C: | ||
| return C.CString(fmt.Sprintf("timeout waiting for background worker: %s", goNames[i])) | ||
| } | ||
| } | ||
|
|
||
| // Fast path: if all caller versions match, skip lock + copy entirely. | ||
| // Read each version once and write to outVersions for the C side to compare. | ||
| if callerVersions != nil && outVersions != nil { | ||
| callerVSlice := unsafe.Slice(callerVersions, n) | ||
| outVSlice := unsafe.Slice(outVersions, n) | ||
| allMatch := true | ||
| for i, sk := range sks { | ||
| v := sk.varsVersion.Load() | ||
| outVSlice[i] = C.uint64_t(v) | ||
| if uint64(callerVSlice[i]) != v { | ||
| allMatch = false | ||
| } | ||
| } | ||
| if allMatch { | ||
| return nil // C side sees out == caller, uses cached value | ||
| } | ||
| } | ||
|
|
||
| // Take all read locks, collect pointers, copy via C helper, then release | ||
| ptrs := make([]unsafe.Pointer, n) | ||
| for i, sk := range sks { | ||
| sk.mu.RLock() | ||
| ptrs[i] = sk.varsPtr | ||
| } | ||
|
|
||
| C.frankenphp_worker_copy_vars(returnValue, C.int(n), names, nameLens, (*unsafe.Pointer)(unsafe.Pointer(&ptrs[0]))) | ||
|
|
||
| // Write versions while locks are still held | ||
| if outVersions != nil { | ||
| outVSlice := unsafe.Slice(outVersions, n) | ||
| for i, sk := range sks { | ||
| outVSlice[i] = C.uint64_t(sk.varsVersion.Load()) | ||
| } | ||
| } | ||
|
|
||
| for _, sk := range sks { | ||
| sk.mu.RUnlock() | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| //export go_frankenphp_worker_set_vars | ||
| func go_frankenphp_worker_set_vars(threadIndex C.uintptr_t, varsPtr unsafe.Pointer, oldPtr *unsafe.Pointer) *C.char { | ||
| thread := phpThreads[threadIndex] | ||
|
|
||
| bgHandler, ok := thread.handler.(*backgroundWorkerThread) | ||
| if !ok || bgHandler.worker.backgroundWorker == nil { | ||
| return C.CString("frankenphp_worker_set_vars() can only be called from a background worker") | ||
| } | ||
| handler := &bgHandler.workerThread | ||
|
|
||
| sk := handler.worker.backgroundWorker | ||
|
|
||
| sk.mu.Lock() | ||
| *oldPtr = sk.varsPtr | ||
| sk.varsPtr = varsPtr | ||
| sk.varsVersion.Add(1) | ||
| sk.mu.Unlock() | ||
|
|
||
| sk.readyOnce.Do(func() { | ||
| handler.markBackgroundReady() | ||
| close(sk.ready) | ||
| }) | ||
|
|
||
| return nil | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.