From f7e58860555aa678f500d02a7f2b74f7a9c57d13 Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Sat, 21 Mar 2026 16:10:39 +0100 Subject: [PATCH 1/2] feat: Add configurable max_requests for PHP threads --- caddy/app.go | 17 ++- docs/config.md | 25 ++++ frankenphp.go | 13 +- frankenphp_test.go | 2 + maxrequests_regular_test.go | 128 ++++++++++++++++++ maxrequests_test.go | 177 +++++++++++++++++++++++++ options.go | 10 ++ testdata/sleep-with-output.php | 8 ++ testdata/worker-counter-persistent.php | 10 ++ threadregular.go | 39 +++++- threadworker.go | 41 ++++++ 11 files changed, 466 insertions(+), 4 deletions(-) create mode 100644 maxrequests_regular_test.go create mode 100644 maxrequests_test.go create mode 100644 testdata/sleep-with-output.php create mode 100644 testdata/worker-counter-persistent.php diff --git a/caddy/app.go b/caddy/app.go index 9242d870c6..0e912408e9 100644 --- a/caddy/app.go +++ b/caddy/app.go @@ -57,6 +57,8 @@ type FrankenPHPApp struct { MaxWaitTime time.Duration `json:"max_wait_time,omitempty"` // The maximum amount of time an autoscaled thread may be idle before being deactivated MaxIdleTime time.Duration `json:"max_idle_time,omitempty"` + // MaxRequests sets the maximum number of requests a regular (non-worker) PHP thread handles before restarting (0 = unlimited) + MaxRequests int `json:"max_requests,omitempty"` opts []frankenphp.Option metrics frankenphp.Metrics @@ -153,6 +155,7 @@ func (f *FrankenPHPApp) Start() error { frankenphp.WithPhpIni(f.PhpIni), frankenphp.WithMaxWaitTime(f.MaxWaitTime), frankenphp.WithMaxIdleTime(f.MaxIdleTime), + frankenphp.WithMaxRequests(f.MaxRequests), ) for _, w := range f.Workers { @@ -192,6 +195,7 @@ func (f *FrankenPHPApp) Stop() error { f.NumThreads = 0 f.MaxWaitTime = 0 f.MaxIdleTime = 0 + f.MaxRequests = 0 optionsMU.Lock() options = nil @@ -255,6 +259,17 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } f.MaxIdleTime = v + case "max_requests": + if !d.NextArg() { + return d.ArgErr() + } + + v, err := strconv.ParseUint(d.Val(), 10, 32) + if err != nil { + return d.WrapErr(err) + } + + f.MaxRequests = int(v) case "php_ini": parseIniLine := func(d *caddyfile.Dispenser) error { key := d.Val() @@ -311,7 +326,7 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { f.Workers = append(f.Workers, wc) default: - return wrongSubDirectiveError("frankenphp", "num_threads, max_threads, php_ini, worker, max_wait_time, max_idle_time", d.Val()) + return wrongSubDirectiveError("frankenphp", "num_threads, max_threads, php_ini, worker, max_wait_time, max_idle_time, max_requests", d.Val()) } } } diff --git a/docs/config.md b/docs/config.md index bfcdeb1230..245fca4448 100644 --- a/docs/config.md +++ b/docs/config.md @@ -97,6 +97,7 @@ You can also explicitly configure FrankenPHP using the [global option](https://c max_threads # Limits the number of additional PHP threads that can be started at runtime. Default: num_threads. Can be set to 'auto'. max_wait_time # Sets the maximum time a request may wait for a free PHP thread before timing out. Default: disabled. max_idle_time # Sets the maximum time an autoscaled thread may be idle before being deactivated. Default: 5s. + max_requests # Sets the maximum number of requests a PHP thread will handle before being restarted, useful for mitigating memory leaks. Applies to both regular and worker threads. Default: 0 (unlimited). See below. php_ini # Set a php.ini directive. Can be used several times to set multiple directives. worker { file # Sets the path to the worker script. @@ -190,6 +191,7 @@ php_server [] { watch # Sets the path to watch for file changes. Can be specified more than once for multiple paths. env # Sets an extra environment variable to the given value. Can be specified more than once for multiple environment variables. Environment variables for this worker are also inherited from the php_server parent, but can be overwritten here. match # match the worker to a path pattern. Overrides try_files and can only be used in the php_server directive. + max_requests # Sets the maximum number of requests a worker thread will handle before restarting, useful for mitigating memory leaks. Default: 0 (unlimited). } worker # Can also use the short form like in the global frankenphp block. } @@ -265,6 +267,29 @@ and otherwise forward the request to the worker matching the path pattern. } ``` +## Restarting Threads After a Number of Requests + +Similar to PHP-FPM's [`pm.max_requests`](https://www.php.net/manual/en/install.fpm.configuration.php#pm.max-requests), +FrankenPHP can automatically restart PHP threads after they have handled a given number of requests. +This is useful for mitigating memory leaks in PHP extensions or application code, +since a restart fully cleans up the thread's memory and state. + +The `max_requests` setting in the global `frankenphp` block applies to all PHP threads (both regular and worker threads): + +```caddyfile +{ + frankenphp { + max_requests 500 + } +} +``` + +When a thread reaches the limit, the underlying C thread is fully restarted, +cleaning up all ZTS thread-local storage, including any memory leaked by PHP extensions. +Other threads continue to serve requests during the restart, so there is no downtime. + +Set to `0` (default) to disable the limit and let threads run indefinitely. + ## Environment Variables The following environment variables can be used to inject Caddy directives in the `Caddyfile` without modifying it: diff --git a/frankenphp.go b/frankenphp.go index d2aaa3c7d9..40d4548e5a 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -66,7 +66,8 @@ var ( metrics Metrics = nullMetrics{} - maxWaitTime time.Duration + maxWaitTime time.Duration + maxRequestsPerThread int ) type ErrRejected struct { @@ -275,6 +276,7 @@ func Init(options ...Option) error { } maxWaitTime = opt.maxWaitTime + maxRequestsPerThread = opt.maxRequests if opt.maxIdleTime > 0 { maxIdleTime = opt.maxIdleTime @@ -369,6 +371,13 @@ func Shutdown() { drainWatchers() drainAutoScaling() + + // signal restart goroutines to stop spawning and wait for in-flight ones + shutdownInProgress.Store(true) + for restartingThreads.Load() > 0 { + runtime.Gosched() + } + drainPHPThreads() metrics.Shutdown() @@ -786,5 +795,7 @@ func resetGlobals() { workersByPath = nil watcherIsEnabled = false maxIdleTime = defaultMaxIdleTime + maxRequestsPerThread = 0 + shutdownInProgress.Store(false) globalMu.Unlock() } diff --git a/frankenphp_test.go b/frankenphp_test.go index 47e65c490b..b7ac43d4e9 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -46,6 +46,7 @@ type testOptions struct { realServer bool logger *slog.Logger initOpts []frankenphp.Option + workerOpts []frankenphp.WorkerOption requestOpts []frankenphp.RequestOption phpIni map[string]string } @@ -67,6 +68,7 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * frankenphp.WithWorkerEnv(opts.env), frankenphp.WithWorkerWatchMode(opts.watch), } + workerOpts = append(workerOpts, opts.workerOpts...) initOpts = append(initOpts, frankenphp.WithWorkers("workerName", testDataDir+opts.workerScript, opts.nbWorkers, workerOpts...)) } initOpts = append(initOpts, opts.initOpts...) diff --git a/maxrequests_regular_test.go b/maxrequests_regular_test.go new file mode 100644 index 0000000000..b6ef8f52d5 --- /dev/null +++ b/maxrequests_regular_test.go @@ -0,0 +1,128 @@ +package frankenphp_test + +import ( + "io" + "log/slog" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestModuleMaxRequests verifies that regular (non-worker) PHP threads restart +// after reaching max_requests by checking debug logs for restart messages. +func TestModuleMaxRequests(t *testing.T) { + const maxRequests = 5 + const totalRequests = 30 + + var buf syncBuffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})) + + runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { + require.NotNil(t, ts) + client := &http.Client{Timeout: 5 * time.Second} + + for i := 0; i < totalRequests; i++ { + resp, err := client.Get(ts.URL + "/index.php") + require.NoError(t, err, "request %d should succeed", i) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + _ = resp.Body.Close() + + assert.Equal(t, 200, resp.StatusCode, "request %d should return 200, got body: %s", i, string(body)) + assert.Contains(t, string(body), "I am by birth a Genevese", + "request %d should return correct body", i) + } + + restartCount := strings.Count(buf.String(), "max requests reached, restarting thread") + t.Logf("Thread restarts observed: %d", restartCount) + assert.GreaterOrEqual(t, restartCount, 2, + "with maxRequests=%d and %d requests on 2 threads, at least 2 restarts should occur", maxRequests, totalRequests) + }, &testOptions{ + realServer: true, + logger: logger, + initOpts: []frankenphp.Option{ + frankenphp.WithNumThreads(2), + frankenphp.WithMaxRequests(maxRequests), + }, + }) +} + +// TestModuleMaxRequestsConcurrent verifies max_requests works under concurrent load +// in module mode. All requests must succeed despite threads restarting. +func TestModuleMaxRequestsConcurrent(t *testing.T) { + const maxRequests = 10 + const totalRequests = 200 + const concurrency = 20 + + runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { + require.NotNil(t, ts) + client := &http.Client{Timeout: 10 * time.Second} + + var successCount int + var mu sync.Mutex + sem := make(chan struct{}, concurrency) + var wg sync.WaitGroup + + for i := 0; i < totalRequests; i++ { + wg.Add(1) + sem <- struct{}{} + go func(i int) { + defer func() { <-sem; wg.Done() }() + + resp, err := client.Get(ts.URL + "/index.php") + if err != nil { + return + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + + if resp.StatusCode == 200 && strings.Contains(string(body), "I am by birth a Genevese") { + mu.Lock() + successCount++ + mu.Unlock() + } + }(i) + } + wg.Wait() + + t.Logf("Success: %d/%d", successCount, totalRequests) + assert.GreaterOrEqual(t, successCount, int(totalRequests*0.95), + "at least 95%% of requests should succeed despite regular thread restarts") + }, &testOptions{ + realServer: true, + initOpts: []frankenphp.Option{ + frankenphp.WithNumThreads(8), + frankenphp.WithMaxRequests(maxRequests), + }, + }) +} + +// TestModuleMaxRequestsZeroIsUnlimited verifies that max_requests=0 (default) +// means threads never restart. +func TestModuleMaxRequestsZeroIsUnlimited(t *testing.T) { + runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { + require.NotNil(t, ts) + client := &http.Client{Timeout: 5 * time.Second} + + for i := 0; i < 50; i++ { + resp, err := client.Get(ts.URL + "/index.php") + require.NoError(t, err) + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + + assert.Equal(t, 200, resp.StatusCode) + assert.Contains(t, string(body), "I am by birth a Genevese") + } + }, &testOptions{ + realServer: true, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2)}, + }) +} diff --git a/maxrequests_test.go b/maxrequests_test.go new file mode 100644 index 0000000000..3526afa973 --- /dev/null +++ b/maxrequests_test.go @@ -0,0 +1,177 @@ +package frankenphp_test + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestWorkerMaxRequests verifies that a worker restarts after reaching max_requests. +func TestWorkerMaxRequests(t *testing.T) { + const maxRequests = 5 + const totalRequests = 20 + + runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { + require.NotNil(t, ts) + client := &http.Client{Timeout: 5 * time.Second} + + // Track unique instance IDs across requests. + // After maxRequests, the worker should restart with a new instance ID. + instanceIDs := make(map[string]int) + + for i := 0; i < totalRequests; i++ { + resp, err := client.Get(ts.URL + "/worker-counter-persistent.php") + require.NoError(t, err, "request %d should succeed", i) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + _ = resp.Body.Close() + + assert.Equal(t, 200, resp.StatusCode, "request %d should return 200, got body: %s", i, string(body)) + + // Parse response: "instance:,count:" + bodyStr := string(body) + parts := strings.Split(bodyStr, ",") + if len(parts) == 2 { + instanceID := strings.TrimPrefix(parts[0], "instance:") + instanceIDs[instanceID]++ + } + } + + t.Logf("Unique worker instances seen: %d (expected >= %d)", len(instanceIDs), totalRequests/maxRequests) + for id, count := range instanceIDs { + t.Logf(" instance %s: handled %d requests", id, count) + } + + // With maxRequests=5 and totalRequests=20 on 1 worker, we expect at least 4 restarts + assert.GreaterOrEqual(t, len(instanceIDs), totalRequests/maxRequests, + "worker should have restarted at least %d times with max_requests=%d over %d requests", + totalRequests/maxRequests, maxRequests, totalRequests) + + // No single instance should have handled more than maxRequests + for id, count := range instanceIDs { + assert.LessOrEqual(t, count, maxRequests, + fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) + } + }, &testOptions{ + workerScript: "worker-counter-persistent.php", + nbWorkers: 1, + nbParallelRequests: 1, + realServer: true, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2), frankenphp.WithMaxRequests(maxRequests)}, + }) +} + +// TestWorkerMaxRequestsZeroIsUnlimited verifies that max_requests=0 means no limit. +func TestWorkerMaxRequestsZeroIsUnlimited(t *testing.T) { + runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { + require.NotNil(t, ts) + client := &http.Client{Timeout: 5 * time.Second} + + instanceIDs := make(map[string]bool) + + for i := 0; i < 50; i++ { + resp, err := client.Get(ts.URL + "/worker-counter-persistent.php") + require.NoError(t, err) + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + + bodyStr := string(body) + parts := strings.Split(bodyStr, ",") + if len(parts) == 2 { + instanceID := strings.TrimPrefix(parts[0], "instance:") + instanceIDs[instanceID] = true + } + } + + // With 1 worker thread and no max_requests, all requests should go to + // the same instance (no restarts) + assert.Equal(t, 1, len(instanceIDs), + "with max_requests=0, worker should never restart (saw %d instances)", len(instanceIDs)) + }, &testOptions{ + workerScript: "worker-counter-persistent.php", + nbWorkers: 1, + nbParallelRequests: 1, + realServer: true, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2)}, + }) +} + +// TestWorkerMaxRequestsHighConcurrency verifies max_requests works under concurrent load. +func TestWorkerMaxRequestsHighConcurrency(t *testing.T) { + const maxRequests = 10 + const totalRequests = 200 + const concurrency = 20 + + runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { + require.NotNil(t, ts) + client := &http.Client{Timeout: 10 * time.Second} + + var ( + successCount int + mu sync.Mutex + instanceIDs = make(map[string]int) + ) + sem := make(chan struct{}, concurrency) + var wg sync.WaitGroup + + for i := 0; i < totalRequests; i++ { + wg.Add(1) + sem <- struct{}{} + go func() { + defer func() { <-sem; wg.Done() }() + + resp, err := client.Get(ts.URL + "/worker-counter-persistent.php") + if err != nil { + return + } + body, _ := io.ReadAll(resp.Body) + _ = resp.Body.Close() + + if resp.StatusCode == 200 { + mu.Lock() + successCount++ + bodyStr := string(body) + parts := strings.Split(bodyStr, ",") + if len(parts) == 2 { + instanceID := strings.TrimPrefix(parts[0], "instance:") + instanceIDs[instanceID]++ + } + mu.Unlock() + } + }() + } + wg.Wait() + + t.Logf("Success: %d/%d, instances: %d", successCount, totalRequests, len(instanceIDs)) + + assert.Equal(t, totalRequests, successCount, + "all requests should succeed") + + // With max_requests=10 and 4 workers handling 200 requests total, + // each worker restarts after 10 requests. We should see multiple instances. + assert.Greater(t, len(instanceIDs), 4, + "workers should have restarted multiple times") + + // No instance should exceed max_requests + for id, count := range instanceIDs { + assert.LessOrEqual(t, count, maxRequests, + fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) + } + }, &testOptions{ + workerScript: "worker-counter-persistent.php", + nbWorkers: 4, + nbParallelRequests: 1, + realServer: true, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(5), frankenphp.WithMaxRequests(maxRequests)}, + }) +} diff --git a/options.go b/options.go index 9ba1f916f6..ac189f292e 100644 --- a/options.go +++ b/options.go @@ -31,6 +31,7 @@ type opt struct { phpIni map[string]string maxWaitTime time.Duration maxIdleTime time.Duration + maxRequests int } type workerOpt struct { @@ -166,6 +167,15 @@ func WithMaxIdleTime(maxIdleTime time.Duration) Option { } } +// WithMaxRequests sets the default max requests before restarting a PHP thread (0 = unlimited). Applies to regular and worker threads. +func WithMaxRequests(maxRequests int) Option { + return func(o *opt) error { + o.maxRequests = maxRequests + + return nil + } +} + // WithWorkerEnv sets environment variables for the worker func WithWorkerEnv(env map[string]string) WorkerOption { return func(w *workerOpt) error { diff --git a/testdata/sleep-with-output.php b/testdata/sleep-with-output.php new file mode 100644 index 0000000000..0fc66a1039 --- /dev/null +++ b/testdata/sleep-with-output.php @@ -0,0 +1,8 @@ + 0 && handler.requestCount >= maxRequestsPerThread { + if globalLogger.Enabled(globalCtx, slog.LevelDebug) { + globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting thread", + slog.Int("thread", handler.thread.threadIndex), + slog.Int("max_requests", maxRequestsPerThread), + ) + } + + detachRegularThread(handler.thread) + restartingThreads.Add(1) + go restartRegularThread(handler.thread) + + return "" + } + handler.state.MarkAsWaiting(true) var ch contextHolder @@ -88,6 +108,7 @@ func (handler *regularThread) waitForRequest() string { case ch = <-handler.thread.requestChan: } + handler.requestCount++ handler.ctx = ch.ctx handler.contextHolder.frankenPHPContext = ch.frankenPHPContext handler.state.MarkAsWaiting(false) @@ -102,6 +123,20 @@ func (handler *regularThread) afterRequest() { handler.ctx = nil } +// restartRegularThread re-boots a thread after it exits the C loop (ts_free_thread cleans up ZTS state). +func restartRegularThread(thread *phpThread) { + defer restartingThreads.Add(-1) + thread.state.WaitFor(state.Done) + + if shutdownInProgress.Load() { + return + } + + thread.state.Set(state.Reserved) + thread.boot() + convertToRegularThread(thread) +} + func handleRequestWithRegularPHPThreads(ch contextHolder) error { metrics.StartRequest() diff --git a/threadworker.go b/threadworker.go index a0984afab7..296f55717b 100644 --- a/threadworker.go +++ b/threadworker.go @@ -26,6 +26,8 @@ type workerThread struct { workerContext context.Context isBootingScript bool // true if the worker has not reached frankenphp_handle_request yet failureCount int // number of consecutive startup failures + requestCount int // number of requests handled since last restart + needsRestart bool // true if the thread needs to exit the C loop for ZTS cleanup } func convertToWorkerThread(thread *phpThread, worker *worker) { @@ -54,6 +56,15 @@ func (handler *workerThread) beforeScriptExecution() string { handler.state.WaitFor(state.Ready, state.ShuttingDown) return handler.beforeScriptExecution() case state.Ready, state.TransitionComplete: + // exit the C loop for full ZTS cleanup after max_requests + if handler.needsRestart { + handler.needsRestart = false + handler.worker.detachThread(handler.thread) + restartingThreads.Add(1) + go restartWorkerThread(handler.thread, handler.worker) + return "" + } + handler.thread.updateContext(true) if handler.worker.onThreadReady != nil { handler.worker.onThreadReady(handler.thread.threadIndex) @@ -98,6 +109,20 @@ func (handler *workerThread) name() string { return "Worker PHP Thread - " + handler.worker.fileName } +// restartWorkerThread re-boots a worker thread after it exits the C loop for ZTS cleanup. +func restartWorkerThread(thread *phpThread, worker *worker) { + defer restartingThreads.Add(-1) + thread.state.WaitFor(state.Done) + + if shutdownInProgress.Load() { + return + } + + thread.state.Set(state.Reserved) + thread.boot() + convertToWorkerThread(thread, worker) +} + func setupWorkerScript(handler *workerThread, worker *worker) { metrics.StartWorker(worker.name) @@ -116,6 +141,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) { handler.dummyFrankenPHPContext = fc handler.dummyContext = ctx handler.isBootingScript = true + handler.requestCount = 0 if globalLogger.Enabled(ctx, slog.LevelDebug) { globalLogger.LogAttrs(ctx, slog.LevelDebug, "starting", slog.String("worker", worker.name), slog.Int("thread", handler.thread.threadIndex)) @@ -211,6 +237,20 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { metrics.ReadyWorker(handler.worker.name) } + // max_requests reached: exit the worker loop to trigger a ZTS restart + if maxRequestsPerThread > 0 && handler.requestCount >= maxRequestsPerThread { + if globalLogger.Enabled(globalCtx, slog.LevelDebug) { + globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting", + slog.String("worker", handler.worker.name), + slog.Int("thread", handler.thread.threadIndex), + slog.Int("max_requests", maxRequestsPerThread), + ) + } + + handler.needsRestart = true + return false, nil + } + if handler.state.Is(state.TransitionComplete) { handler.state.Set(state.Ready) } @@ -235,6 +275,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { case requestCH = <-handler.worker.requestChan: } + handler.requestCount++ handler.workerContext = requestCH.ctx handler.workerFrankenPHPContext = requestCH.frankenPHPContext handler.state.MarkAsWaiting(false) From 966326f127da146b12ebdbd6582947b431521bef Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Mon, 23 Mar 2026 09:01:43 +0100 Subject: [PATCH 2/2] Review by AlliBalliBaba --- docs/config.md | 3 +- frankenphp.go | 8 -- frankenphp_test.go | 2 - internal/state/state.go | 9 ++ maxrequests_regular_test.go | 83 +++------------- maxrequests_test.go | 177 --------------------------------- phpthread.go | 24 ++++- testdata/sleep-with-output.php | 8 -- threadregular.go | 25 ++--- threadworker.go | 32 ++---- worker_test.go | 93 +++++++++++++++++ 11 files changed, 151 insertions(+), 313 deletions(-) delete mode 100644 maxrequests_test.go delete mode 100644 testdata/sleep-with-output.php diff --git a/docs/config.md b/docs/config.md index 245fca4448..9886735565 100644 --- a/docs/config.md +++ b/docs/config.md @@ -191,7 +191,6 @@ php_server [] { watch # Sets the path to watch for file changes. Can be specified more than once for multiple paths. env # Sets an extra environment variable to the given value. Can be specified more than once for multiple environment variables. Environment variables for this worker are also inherited from the php_server parent, but can be overwritten here. match # match the worker to a path pattern. Overrides try_files and can only be used in the php_server directive. - max_requests # Sets the maximum number of requests a worker thread will handle before restarting, useful for mitigating memory leaks. Default: 0 (unlimited). } worker # Can also use the short form like in the global frankenphp block. } @@ -285,7 +284,7 @@ The `max_requests` setting in the global `frankenphp` block applies to all PHP t ``` When a thread reaches the limit, the underlying C thread is fully restarted, -cleaning up all ZTS thread-local storage, including any memory leaked by PHP extensions. +cleaning up all memory and state, including any memory leaked by PHP extensions. Other threads continue to serve requests during the restart, so there is no downtime. Set to `0` (default) to disable the limit and let threads run indefinitely. diff --git a/frankenphp.go b/frankenphp.go index 40d4548e5a..2b5b94850d 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -371,13 +371,6 @@ func Shutdown() { drainWatchers() drainAutoScaling() - - // signal restart goroutines to stop spawning and wait for in-flight ones - shutdownInProgress.Store(true) - for restartingThreads.Load() > 0 { - runtime.Gosched() - } - drainPHPThreads() metrics.Shutdown() @@ -796,6 +789,5 @@ func resetGlobals() { watcherIsEnabled = false maxIdleTime = defaultMaxIdleTime maxRequestsPerThread = 0 - shutdownInProgress.Store(false) globalMu.Unlock() } diff --git a/frankenphp_test.go b/frankenphp_test.go index b7ac43d4e9..47e65c490b 100644 --- a/frankenphp_test.go +++ b/frankenphp_test.go @@ -46,7 +46,6 @@ type testOptions struct { realServer bool logger *slog.Logger initOpts []frankenphp.Option - workerOpts []frankenphp.WorkerOption requestOpts []frankenphp.RequestOption phpIni map[string]string } @@ -68,7 +67,6 @@ func runTest(t *testing.T, test func(func(http.ResponseWriter, *http.Request), * frankenphp.WithWorkerEnv(opts.env), frankenphp.WithWorkerWatchMode(opts.watch), } - workerOpts = append(workerOpts, opts.workerOpts...) initOpts = append(initOpts, frankenphp.WithWorkers("workerName", testDataDir+opts.workerScript, opts.nbWorkers, workerOpts...)) } initOpts = append(initOpts, opts.initOpts...) diff --git a/internal/state/state.go b/internal/state/state.go index 7bdf9c064a..f8d2b3acb7 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -30,6 +30,11 @@ const ( TransitionRequested TransitionInProgress TransitionComplete + + // thread is exiting the C loop for a full ZTS restart (max_requests) + Rebooting + // C thread has exited and ZTS state is cleaned up, ready to spawn a new C thread + RebootReady ) func (s State) String() string { @@ -58,6 +63,10 @@ func (s State) String() string { return "transition in progress" case TransitionComplete: return "transition complete" + case Rebooting: + return "rebooting" + case RebootReady: + return "reboot ready" default: return "unknown" } diff --git a/maxrequests_regular_test.go b/maxrequests_regular_test.go index b6ef8f52d5..47a39a02d8 100644 --- a/maxrequests_regular_test.go +++ b/maxrequests_regular_test.go @@ -1,18 +1,15 @@ package frankenphp_test import ( - "io" "log/slog" "net/http" "net/http/httptest" "strings" "sync" "testing" - "time" "github.com/dunglas/frankenphp" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) // TestModuleMaxRequests verifies that regular (non-worker) PHP threads restart @@ -24,21 +21,11 @@ func TestModuleMaxRequests(t *testing.T) { var buf syncBuffer logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})) - runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { - require.NotNil(t, ts) - client := &http.Client{Timeout: 5 * time.Second} - + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { for i := 0; i < totalRequests; i++ { - resp, err := client.Get(ts.URL + "/index.php") - require.NoError(t, err, "request %d should succeed", i) - - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - _ = resp.Body.Close() - - assert.Equal(t, 200, resp.StatusCode, "request %d should return 200, got body: %s", i, string(body)) - assert.Contains(t, string(body), "I am by birth a Genevese", - "request %d should return correct body", i) + body, resp := testGet("http://example.com/index.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + assert.Contains(t, body, "I am by birth a Genevese") } restartCount := strings.Count(buf.String(), "max requests reached, restarting thread") @@ -46,8 +33,7 @@ func TestModuleMaxRequests(t *testing.T) { assert.GreaterOrEqual(t, restartCount, 2, "with maxRequests=%d and %d requests on 2 threads, at least 2 restarts should occur", maxRequests, totalRequests) }, &testOptions{ - realServer: true, - logger: logger, + logger: logger, initOpts: []frankenphp.Option{ frankenphp.WithNumThreads(2), frankenphp.WithMaxRequests(maxRequests), @@ -60,69 +46,24 @@ func TestModuleMaxRequests(t *testing.T) { func TestModuleMaxRequestsConcurrent(t *testing.T) { const maxRequests = 10 const totalRequests = 200 - const concurrency = 20 - runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { - require.NotNil(t, ts) - client := &http.Client{Timeout: 10 * time.Second} - - var successCount int - var mu sync.Mutex - sem := make(chan struct{}, concurrency) + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { var wg sync.WaitGroup for i := 0; i < totalRequests; i++ { wg.Add(1) - sem <- struct{}{} - go func(i int) { - defer func() { <-sem; wg.Done() }() - - resp, err := client.Get(ts.URL + "/index.php") - if err != nil { - return - } - body, _ := io.ReadAll(resp.Body) - _ = resp.Body.Close() - - if resp.StatusCode == 200 && strings.Contains(string(body), "I am by birth a Genevese") { - mu.Lock() - successCount++ - mu.Unlock() - } - }(i) + go func() { + defer wg.Done() + body, resp := testGet("http://example.com/index.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + assert.Contains(t, body, "I am by birth a Genevese") + }() } wg.Wait() - - t.Logf("Success: %d/%d", successCount, totalRequests) - assert.GreaterOrEqual(t, successCount, int(totalRequests*0.95), - "at least 95%% of requests should succeed despite regular thread restarts") }, &testOptions{ - realServer: true, initOpts: []frankenphp.Option{ frankenphp.WithNumThreads(8), frankenphp.WithMaxRequests(maxRequests), }, }) } - -// TestModuleMaxRequestsZeroIsUnlimited verifies that max_requests=0 (default) -// means threads never restart. -func TestModuleMaxRequestsZeroIsUnlimited(t *testing.T) { - runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { - require.NotNil(t, ts) - client := &http.Client{Timeout: 5 * time.Second} - - for i := 0; i < 50; i++ { - resp, err := client.Get(ts.URL + "/index.php") - require.NoError(t, err) - body, _ := io.ReadAll(resp.Body) - _ = resp.Body.Close() - - assert.Equal(t, 200, resp.StatusCode) - assert.Contains(t, string(body), "I am by birth a Genevese") - } - }, &testOptions{ - realServer: true, - initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2)}, - }) -} diff --git a/maxrequests_test.go b/maxrequests_test.go deleted file mode 100644 index 3526afa973..0000000000 --- a/maxrequests_test.go +++ /dev/null @@ -1,177 +0,0 @@ -package frankenphp_test - -import ( - "fmt" - "io" - "net/http" - "net/http/httptest" - "strings" - "sync" - "testing" - "time" - - "github.com/dunglas/frankenphp" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// TestWorkerMaxRequests verifies that a worker restarts after reaching max_requests. -func TestWorkerMaxRequests(t *testing.T) { - const maxRequests = 5 - const totalRequests = 20 - - runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { - require.NotNil(t, ts) - client := &http.Client{Timeout: 5 * time.Second} - - // Track unique instance IDs across requests. - // After maxRequests, the worker should restart with a new instance ID. - instanceIDs := make(map[string]int) - - for i := 0; i < totalRequests; i++ { - resp, err := client.Get(ts.URL + "/worker-counter-persistent.php") - require.NoError(t, err, "request %d should succeed", i) - - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - _ = resp.Body.Close() - - assert.Equal(t, 200, resp.StatusCode, "request %d should return 200, got body: %s", i, string(body)) - - // Parse response: "instance:,count:" - bodyStr := string(body) - parts := strings.Split(bodyStr, ",") - if len(parts) == 2 { - instanceID := strings.TrimPrefix(parts[0], "instance:") - instanceIDs[instanceID]++ - } - } - - t.Logf("Unique worker instances seen: %d (expected >= %d)", len(instanceIDs), totalRequests/maxRequests) - for id, count := range instanceIDs { - t.Logf(" instance %s: handled %d requests", id, count) - } - - // With maxRequests=5 and totalRequests=20 on 1 worker, we expect at least 4 restarts - assert.GreaterOrEqual(t, len(instanceIDs), totalRequests/maxRequests, - "worker should have restarted at least %d times with max_requests=%d over %d requests", - totalRequests/maxRequests, maxRequests, totalRequests) - - // No single instance should have handled more than maxRequests - for id, count := range instanceIDs { - assert.LessOrEqual(t, count, maxRequests, - fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) - } - }, &testOptions{ - workerScript: "worker-counter-persistent.php", - nbWorkers: 1, - nbParallelRequests: 1, - realServer: true, - initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2), frankenphp.WithMaxRequests(maxRequests)}, - }) -} - -// TestWorkerMaxRequestsZeroIsUnlimited verifies that max_requests=0 means no limit. -func TestWorkerMaxRequestsZeroIsUnlimited(t *testing.T) { - runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { - require.NotNil(t, ts) - client := &http.Client{Timeout: 5 * time.Second} - - instanceIDs := make(map[string]bool) - - for i := 0; i < 50; i++ { - resp, err := client.Get(ts.URL + "/worker-counter-persistent.php") - require.NoError(t, err) - body, _ := io.ReadAll(resp.Body) - _ = resp.Body.Close() - - bodyStr := string(body) - parts := strings.Split(bodyStr, ",") - if len(parts) == 2 { - instanceID := strings.TrimPrefix(parts[0], "instance:") - instanceIDs[instanceID] = true - } - } - - // With 1 worker thread and no max_requests, all requests should go to - // the same instance (no restarts) - assert.Equal(t, 1, len(instanceIDs), - "with max_requests=0, worker should never restart (saw %d instances)", len(instanceIDs)) - }, &testOptions{ - workerScript: "worker-counter-persistent.php", - nbWorkers: 1, - nbParallelRequests: 1, - realServer: true, - initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2)}, - }) -} - -// TestWorkerMaxRequestsHighConcurrency verifies max_requests works under concurrent load. -func TestWorkerMaxRequestsHighConcurrency(t *testing.T) { - const maxRequests = 10 - const totalRequests = 200 - const concurrency = 20 - - runTest(t, func(_ func(http.ResponseWriter, *http.Request), ts *httptest.Server, _ int) { - require.NotNil(t, ts) - client := &http.Client{Timeout: 10 * time.Second} - - var ( - successCount int - mu sync.Mutex - instanceIDs = make(map[string]int) - ) - sem := make(chan struct{}, concurrency) - var wg sync.WaitGroup - - for i := 0; i < totalRequests; i++ { - wg.Add(1) - sem <- struct{}{} - go func() { - defer func() { <-sem; wg.Done() }() - - resp, err := client.Get(ts.URL + "/worker-counter-persistent.php") - if err != nil { - return - } - body, _ := io.ReadAll(resp.Body) - _ = resp.Body.Close() - - if resp.StatusCode == 200 { - mu.Lock() - successCount++ - bodyStr := string(body) - parts := strings.Split(bodyStr, ",") - if len(parts) == 2 { - instanceID := strings.TrimPrefix(parts[0], "instance:") - instanceIDs[instanceID]++ - } - mu.Unlock() - } - }() - } - wg.Wait() - - t.Logf("Success: %d/%d, instances: %d", successCount, totalRequests, len(instanceIDs)) - - assert.Equal(t, totalRequests, successCount, - "all requests should succeed") - - // With max_requests=10 and 4 workers handling 200 requests total, - // each worker restarts after 10 requests. We should see multiple instances. - assert.Greater(t, len(instanceIDs), 4, - "workers should have restarted multiple times") - - // No instance should exceed max_requests - for id, count := range instanceIDs { - assert.LessOrEqual(t, count, maxRequests, - fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) - } - }, &testOptions{ - workerScript: "worker-counter-persistent.php", - nbWorkers: 4, - nbParallelRequests: 1, - realServer: true, - initOpts: []frankenphp.Option{frankenphp.WithNumThreads(5), frankenphp.WithMaxRequests(maxRequests)}, - }) -} diff --git a/phpthread.go b/phpthread.go index fdf263717c..96ba394078 100644 --- a/phpthread.go +++ b/phpthread.go @@ -62,6 +62,24 @@ func (thread *phpThread) boot() { thread.state.WaitFor(state.Inactive) } +// reboot exits the C thread loop for full ZTS cleanup, then spawns a fresh C thread. +// Returns false if the thread is no longer in Ready state (e.g. shutting down). +func (thread *phpThread) reboot() bool { + if !thread.state.CompareAndSwap(state.Ready, state.Rebooting) { + return false + } + + go func() { + thread.state.WaitFor(state.RebootReady) + + if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) { + panic("unable to create thread") + } + }() + + return true +} + // shutdown the underlying PHP thread func (thread *phpThread) shutdown() { if !thread.state.RequestSafeStateChange(state.ShuttingDown) { @@ -183,5 +201,9 @@ func go_frankenphp_after_script_execution(threadIndex C.uintptr_t, exitStatus C. func go_frankenphp_on_thread_shutdown(threadIndex C.uintptr_t) { thread := phpThreads[threadIndex] thread.Unpin() - thread.state.Set(state.Done) + if thread.state.Is(state.Rebooting) { + thread.state.Set(state.RebootReady) + } else { + thread.state.Set(state.Done) + } } diff --git a/testdata/sleep-with-output.php b/testdata/sleep-with-output.php deleted file mode 100644 index 0fc66a1039..0000000000 --- a/testdata/sleep-with-output.php +++ /dev/null @@ -1,8 +0,0 @@ - 0 && handler.requestCount >= maxRequestsPerThread { if globalLogger.Enabled(globalCtx, slog.LevelDebug) { globalLogger.LogAttrs(globalCtx, slog.LevelDebug, "max requests reached, restarting", @@ -247,7 +229,7 @@ func (handler *workerThread) waitForWorkerRequest() (bool, any) { ) } - handler.needsRestart = true + handler.thread.reboot() return false, nil } diff --git a/worker_test.go b/worker_test.go index 97cc80d2e4..3fd2d63f94 100644 --- a/worker_test.go +++ b/worker_test.go @@ -5,11 +5,13 @@ import ( "fmt" "io" "log" + "log/slog" "net/http" "net/http/httptest" "net/url" "strconv" "strings" + "sync" "testing" "github.com/dunglas/frankenphp" @@ -169,3 +171,94 @@ func TestKeepRunningOnConnectionAbort(t *testing.T) { assert.Equal(t, "requests:2", body2, "should not have stopped execution after the first request was aborted") }, &testOptions{workerScript: "worker-with-counter.php", nbWorkers: 1, nbParallelRequests: 1}) } + +// TestWorkerMaxRequests verifies that a worker restarts after reaching max_requests. +func TestWorkerMaxRequests(t *testing.T) { + const maxRequests = 5 + const totalRequests = 20 + + var buf syncBuffer + logger := slog.New(slog.NewTextHandler(&buf, &slog.HandlerOptions{Level: slog.LevelDebug})) + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + instanceIDs := make(map[string]int) + + for i := 0; i < totalRequests; i++ { + body, resp := testGet("http://example.com/worker-counter-persistent.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + + parts := strings.Split(body, ",") + if len(parts) == 2 { + instanceID := strings.TrimPrefix(parts[0], "instance:") + instanceIDs[instanceID]++ + } + } + + t.Logf("Unique worker instances seen: %d (expected >= %d)", len(instanceIDs), totalRequests/maxRequests) + for id, count := range instanceIDs { + t.Logf(" instance %s: handled %d requests", id, count) + } + + assert.GreaterOrEqual(t, len(instanceIDs), totalRequests/maxRequests) + + for id, count := range instanceIDs { + assert.LessOrEqual(t, count, maxRequests, + fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) + } + + restartCount := strings.Count(buf.String(), "max requests reached, restarting") + t.Logf("Worker restarts observed: %d", restartCount) + assert.GreaterOrEqual(t, restartCount, 2) + }, &testOptions{ + workerScript: "worker-counter-persistent.php", + nbWorkers: 1, + nbParallelRequests: 1, + logger: logger, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(2), frankenphp.WithMaxRequests(maxRequests)}, + }) +} + +// TestWorkerMaxRequestsHighConcurrency verifies max_requests works under concurrent load. +func TestWorkerMaxRequestsHighConcurrency(t *testing.T) { + const maxRequests = 10 + const totalRequests = 200 + + runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, _ int) { + var ( + mu sync.Mutex + instanceIDs = make(map[string]int) + ) + var wg sync.WaitGroup + + for i := 0; i < totalRequests; i++ { + wg.Add(1) + go func() { + defer wg.Done() + body, resp := testGet("http://example.com/worker-counter-persistent.php", handler, t) + assert.Equal(t, 200, resp.StatusCode) + + mu.Lock() + parts := strings.Split(body, ",") + if len(parts) == 2 { + instanceID := strings.TrimPrefix(parts[0], "instance:") + instanceIDs[instanceID]++ + } + mu.Unlock() + }() + } + wg.Wait() + + t.Logf("instances: %d", len(instanceIDs)) + assert.Greater(t, len(instanceIDs), 4, "workers should have restarted multiple times") + + for id, count := range instanceIDs { + assert.LessOrEqual(t, count, maxRequests, + fmt.Sprintf("instance %s handled %d requests, exceeding max_requests=%d", id, count, maxRequests)) + } + }, &testOptions{ + workerScript: "worker-counter-persistent.php", + nbWorkers: 4, + nbParallelRequests: 1, + initOpts: []frankenphp.Option{frankenphp.WithNumThreads(5), frankenphp.WithMaxRequests(maxRequests)}, + }) +}