diff --git a/caddy/app.go b/caddy/app.go index 9242d870c6..0e912408e9 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -57,6 +57,8 @@ type FrankenPHPApp struct { MaxWaitTime time.Duration `json:"max_wait_time,omitempty"` // The maximum amount of time an autoscaled thread may be idle before being deactivated MaxIdleTime time.Duration `json:"max_idle_time,omitempty"` + // MaxRequests sets the maximum number of requests a regular (non-worker) PHP thread handles before restarting (0 = unlimited) + MaxRequests int `json:"max_requests,omitempty"` opts []frankenphp.Option metrics frankenphp.Metrics @@ -153,6 +155,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithPhpIni(f.PhpIni), frankenphp.WithMaxWaitTime(f.MaxWaitTime), frankenphp.WithMaxIdleTime(f.MaxIdleTime), + frankenphp.WithMaxRequests(f.MaxRequests), ) for _, w := range f.Workers { @@ -192,6 +195,7 @@ func (f *FrankenPHPApp) Stop() error { f.NumThreads = 0 f.MaxWaitTime = 0 f.MaxIdleTime = 0 + f.MaxRequests = 0 optionsMU.Lock() options = nil @@ -255,6 +259,17 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } f.MaxIdleTime = v + case "max_requests": + if !d.NextArg() { + return d.ArgErr() + } + + v, err := strconv.ParseUint(d.Val(), 10, 32) + if err != nil { + return d.WrapErr(err) + } + + f.MaxRequests = int(v) case "php_ini": parseIniLine := func(d *caddyfile.Dispenser) error { key := d.Val() @@ -311,7 +326,7 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { f.Workers = append(f.Workers, wc) default: - return wrongSubDirectiveError("frankenphp", "num_threads, max_threads, php_ini, worker, max_wait_time, max_idle_time", d.Val()) + return wrongSubDirectiveError("frankenphp", "num_threads, max_threads, php_ini, worker, max_wait_time, max_idle_time, max_requests", d.Val()) } } } diff --git a/docs/config.md b/docs/config.md index bfcdeb1230..9886735565 100644 --- a/docs/config.md +++ b/docs/config.md @@ -97,6 +97,7 @@ You can also explicitly configure FrankenPHP using the [global option](https://c max_threads # Limits the number of additional PHP threads that can be started at runtime. Default: num_threads. Can be set to 'auto'. max_wait_time # Sets the maximum time a request may wait for a free PHP thread before timing out. Default: disabled. max_idle_time # Sets the maximum time an autoscaled thread may be idle before being deactivated. Default: 5s. + max_requests # Sets the maximum number of requests a PHP thread will handle before being restarted, useful for mitigating memory leaks. Applies to both regular and worker threads. Default: 0 (unlimited). See below. php_ini # Set a php.ini directive. Can be used several times to set multiple directives. worker { file # Sets the path to the worker script. @@ -265,6 +266,29 @@ and otherwise forward the request to the worker matching the path pattern. } ``` +## Restarting Threads After a Number of Requests + +Similar to PHP-FPM's [`pm.max_requests`](https://www.php.net/manual/en/install.fpm.configuration.php#pm.max-requests), +FrankenPHP can automatically restart PHP threads after they have handled a given number of requests. +This is useful for mitigating memory leaks in PHP extensions or application code, +since a restart fully cleans up the thread's memory and state. + +The `max_requests` setting in the global `frankenphp` block applies to all PHP threads (both regular and worker threads): + +```caddyfile +{ + frankenphp { + max_requests 500 + } +} +``` + +When a thread reaches the limit, the underlying C thread is fully restarted, +cleaning up all memory and state, including any memory leaked by PHP extensions. +Other threads continue to serve requests during the restart, so there is no downtime. + +Set to `0` (default) to disable the limit and let threads run indefinitely. + ## Environment Variables The following environment variables can be used to inject Caddy directives in the `Caddyfile` without modifying it: diff --git a/frankenphp.go b/frankenphp.go index d2aaa3c7d9..2b5b94850d 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -66,7 +66,8 @@ var ( metrics Metrics = nullMetrics{} - maxWaitTime time.Duration + maxWaitTime time.Duration + maxRequestsPerThread int ) type ErrRejected struct { @@ -275,6 +276,7 @@ func Init(options ...Option) error { } maxWaitTime = opt.maxWaitTime + maxRequestsPerThread = opt.maxRequests if opt.maxIdleTime > 0 { maxIdleTime = opt.maxIdleTime @@ -786,5 +788,6 @@ func resetGlobals() { workersByPath = nil watcherIsEnabled = false maxIdleTime = defaultMaxIdleTime + maxRequestsPerThread = 0 globalMu.Unlock() } diff --git a/internal/state/state.go b/internal/state/state.go index 7bdf9c064a..f8d2b3acb7 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -30,6 +30,11 @@ const ( TransitionRequested TransitionInProgress TransitionComplete + + // thread is exiting the C loop for a full ZTS restart (max_requests) + Rebooting + // C thread has exited and ZTS state is cleaned up, ready to spawn a new C thread + RebootReady ) func (s State) String() string { @@ -58,6 +63,10 @@ func (s State) String() string { return "transition in progress" case TransitionComplete: return "transition complete" + case Rebooting: + return "rebooting" + case RebootReady: + return "reboot ready" default: return "unknown" } diff --git a/maxrequests_regular_test.go b/maxrequests_regular_test.go new file mode 100644 index 0000000000..47a39a02d8 --- /dev/null +++ b/maxrequests_regular_test.go @@ -0,0 +1,69 @@ +package frankenphp_test + +import ( + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" +) + +// TestModuleMaxRequests verifies that regular (non-worker) PHP threads restart +// after reaching max_requests by checking debug logs for restart messages. +func TestModuleMaxRequests(t *testing.T) { + const maxRequests = 5 + const totalRequests = 30 + + var buf syncBuffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})) + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + for i := 0; i < totalRequests; i++ { + body, resp := testGet("http://example.com/index.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + assert.Contains(t, body, "I am by birth a Genevese") + } + + restartCount := strings.Count(buf.String(), "max requests reached, restarting thread") + t.Logf("Thread restarts observed: %d", restartCount) + assert.GreaterOrEqual(t, restartCount, 2, + "with maxRequests=%d and %d requests on 2 threads, at least 2 restarts should occur", maxRequests, totalRequests) + }, &testOptions{ + logger: logger, + initOpts: []frankenphp.Option{ + frankenphp.WithNumThreads(2), + frankenphp.WithMaxRequests(maxRequests), + }, + }) +} + +// TestModuleMaxRequestsConcurrent verifies max_requests works under concurrent load +// in module mode. All requests must succeed despite threads restarting. +func TestModuleMaxRequestsConcurrent(t *testing.T) { + const maxRequests = 10 + const totalRequests = 200 + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + var wg sync.WaitGroup + + for i := 0; i < totalRequests; i++ { + wg.Add(1) + go func() { + defer wg.Done() + body, resp := testGet("http://example.com/index.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + assert.Contains(t, body, "I am by birth a Genevese") + }() + } + wg.Wait() + }, &testOptions{ + initOpts: []frankenphp.Option{ + frankenphp.WithNumThreads(8), + frankenphp.WithMaxRequests(maxRequests), + }, + }) +} diff --git a/options.go b/options.go index 9ba1f916f6..ac189f292e 100644 --- a/options.go +++ b/options.go @@ -31,6 +31,7 @@ type opt struct { phpIni map[string]string maxWaitTime time.Duration maxIdleTime time.Duration + maxRequests int } type workerOpt struct { @@ -166,6 +167,15 @@ func WithMaxIdleTime(maxIdleTime time.Duration) Option { } } +// WithMaxRequests sets the default max requests before restarting a PHP thread (0 = unlimited). Applies to regular and worker threads. +func WithMaxRequests(maxRequests int) Option { + return func(o *opt) error { + o.maxRequests = maxRequests + + return nil + } +} + // WithWorkerEnv sets environment variables for the worker func WithWorkerEnv(env map[string]string) WorkerOption { return func(w *workerOpt) error { diff --git a/phpthread.go b/phpthread.go index fdf263717c..96ba394078 100644 --- a/phpthread.go +++ b/phpthread.go @@ -62,6 +62,24 @@ func (thread *phpThread) boot() { thread.state.WaitFor(state.Inactive) } +// reboot exits the C thread loop for full ZTS cleanup, then spawns a fresh C thread. +// Returns false if the thread is no longer in Ready state (e.g. shutting down). +func (thread *phpThread) reboot() bool { + if !thread.state.CompareAndSwap(state.Ready, state.Rebooting) { + return false + } + + go func() { + thread.state.WaitFor(state.RebootReady) + + if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) { + panic("unable to create thread") + } + }() + + return true +} + // shutdown the underlying PHP thread func (thread *phpThread) shutdown() { if !thread.state.RequestSafeStateChange(state.ShuttingDown) { @@ -183,5 +201,9 @@ func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C. func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { thread := phpThreads[threadIndex] thread.Unpin() - thread.state.Set(state.Done) + if thread.state.Is(state.Rebooting) { + thread.state.Set(state.RebootReady) + } else { + thread.state.Set(state.Done) + } } diff --git a/testdata/worker-counter-persistent.php b/testdata/worker-counter-persistent.php new file mode 100644 index 0000000000..2ac69f4117 --- /dev/null +++ b/testdata/worker-counter-persistent.php @@ -0,0 +1,10 @@ + 0 && handler.requestCount >= maxRequestsPerThread { + if globalLogger.Enabled(globalCtx, slog.LevelDebug) { + globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting thread", + slog.Int("thread", handler.thread.threadIndex), + slog.Int("max_requests", maxRequestsPerThread), + ) + } + + handler.thread.reboot() + return "" + } + handler.state.MarkAsWaiting(true) var ch contextHolder @@ -88,6 +108,7 @@ func (handler *regularThread) waitForRequest() string { case ch = <-handler.thread.requestChan: } + handler.requestCount++ handler.ctx = ch.ctx handler.contextHolder.frankenPHPContext = ch.frankenPHPContext handler.state.MarkAsWaiting(false) @@ -102,6 +123,7 @@ func (handler *regularThread) afterRequest() { handler.ctx = nil } + func handleRequestWithRegularPHPThreads(ch contextHolder) error { metrics.StartRequest() diff --git a/threadworker.go b/threadworker.go index a0984afab7..59e9736e24 100644 --- a/threadworker.go +++ b/threadworker.go @@ -26,6 +26,7 @@ type workerThread struct { workerContext context.Context isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet failureCount int // number of consecutive startup failures + requestCount int // number of requests handled since last restart } func convertToWorkerThread(thread *phpThread, worker *worker) { @@ -62,6 +63,11 @@ func (handler *workerThread) beforeScriptExecution() string { setupWorkerScript(handler, handler.worker) return handler.worker.fileName + case state.Rebooting: + return "" + case state.RebootReady: + handler.requestCount = 0 + return handler.beforeScriptExecution() case state.ShuttingDown: if handler.worker.onThreadShutdown != nil { handler.worker.onThreadShutdown(handler.thread.threadIndex) @@ -98,6 +104,7 @@ func (handler *workerThread) name() string { return "Worker PHP Thread - " + handler.worker.fileName } + func setupWorkerScript(handler *workerThread, worker *worker) { metrics.StartWorker(worker.name) @@ -116,6 +123,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) { handler.dummyFrankenPHPContext = fc handler.dummyContext = ctx handler.isBootingScript = true + handler.requestCount = 0 if globalLogger.Enabled(ctx, slog.LevelDebug) { globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) @@ -211,6 +219,20 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { metrics.ReadyWorker(handler.worker.name) } + // max_requests reached: signal reboot for full ZTS cleanup + if maxRequestsPerThread > 0 && handler.requestCount >= maxRequestsPerThread { + if globalLogger.Enabled(globalCtx, slog.LevelDebug) { + globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting", + slog.String("worker", handler.worker.name), + slog.Int("thread", handler.thread.threadIndex), + slog.Int("max_requests", maxRequestsPerThread), + ) + } + + handler.thread.reboot() + return false, nil + } + if handler.state.Is(state.TransitionComplete) { handler.state.Set(state.Ready) } @@ -235,6 +257,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { case requestCH = <-handler.worker.requestChan: } + handler.requestCount++ handler.workerContext = requestCH.ctx handler.workerFrankenPHPContext = requestCH.frankenPHPContext handler.state.MarkAsWaiting(false) diff --git a/worker_test.go b/worker_test.go index 97cc80d2e4..3fd2d63f94 100644 --- a/worker_test.go +++ b/worker_test.go @@ -5,11 +5,13 @@ import ( "fmt" "io" "log" + "log/slog" "net/http" "net/http/httptest" "net/url" "strconv" "strings" + "sync" "testing" "github.com/dunglas/frankenphp" @@ -169,3 +171,94 @@ func TestKeepRunningOnConnectionAbort(t *testing.T) { assert.Equal(t, "requests:2", body2, "should not have stopped execution after the first request was aborted") }, &testOptions{workerScript: "worker-with-counter.php", nbWorkers: 1, nbParallelRequests: 1}) } + +// TestWorkerMaxRequests verifies that a worker restarts after reaching max_requests. +func TestWorkerMaxRequests(t *testing.T) { + const maxRequests = 5 + const totalRequests = 20 + + var buf syncBuffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})) + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + instanceIDs := make(map[string]int) + + for i := 0; i < totalRequests; i++ { + body, resp := testGet("http://example.com/worker-counter-persistent.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + + parts := strings.Split(body, ",") + if len(parts) == 2 { + instanceID := strings.TrimPrefix(parts[0], "instance:") + instanceIDs[instanceID]++ + } + } + + t.Logf("Unique worker instances seen: %d (expected >= %d)", len(instanceIDs), totalRequests/maxRequests) + for id, count := range instanceIDs { + t.Logf(" instance %s: handled %d requests", id, count) + } + + assert.GreaterOrEqual(t, len(instanceIDs), totalRequests/maxRequests) + + for id, count := range instanceIDs { + assert.LessOrEqual(t, count, maxRequests, + fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) + } + + restartCount := strings.Count(buf.String(), "max requests reached, restarting") + t.Logf("Worker restarts observed: %d", restartCount) + assert.GreaterOrEqual(t, restartCount, 2) + }, &testOptions{ + workerScript: "worker-counter-persistent.php", + nbWorkers: 1, + nbParallelRequests: 1, + logger: logger, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2), frankenphp.WithMaxRequests(maxRequests)}, + }) +} + +// TestWorkerMaxRequestsHighConcurrency verifies max_requests works under concurrent load. +func TestWorkerMaxRequestsHighConcurrency(t *testing.T) { + const maxRequests = 10 + const totalRequests = 200 + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + var ( + mu sync.Mutex + instanceIDs = make(map[string]int) + ) + var wg sync.WaitGroup + + for i := 0; i < totalRequests; i++ { + wg.Add(1) + go func() { + defer wg.Done() + body, resp := testGet("http://example.com/worker-counter-persistent.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + + mu.Lock() + parts := strings.Split(body, ",") + if len(parts) == 2 { + instanceID := strings.TrimPrefix(parts[0], "instance:") + instanceIDs[instanceID]++ + } + mu.Unlock() + }() + } + wg.Wait() + + t.Logf("instances: %d", len(instanceIDs)) + assert.Greater(t, len(instanceIDs), 4, "workers should have restarted multiple times") + + for id, count := range instanceIDs { + assert.LessOrEqual(t, count, maxRequests, + fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) + } + }, &testOptions{ + workerScript: "worker-counter-persistent.php", + nbWorkers: 4, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(5), frankenphp.WithMaxRequests(maxRequests)}, + }) +}