From 7031ca3d751291c97c5d58c1e727369cd20e9ca2 Mon Sep 17 00:00:00 2001 From: Andrey Butusov Date: Tue, 10 Mar 2026 22:17:25 +0300 Subject: [PATCH 1/2] sn/grpc: support reloading servers with SIGHUP Reload gRPC servers, if the config is updated. Closes #3860. Signed-off-by: Andrey Butusov --- CHANGELOG.md | 1 + cmd/neofs-node/accounting.go | 5 +- cmd/neofs-node/config.go | 27 ++- cmd/neofs-node/container.go | 5 +- cmd/neofs-node/grpc.go | 314 ++++++++++++++++++++++++++++------- cmd/neofs-node/object.go | 4 +- cmd/neofs-node/reputation.go | 5 +- cmd/neofs-node/session.go | 5 +- 8 files changed, 295 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b95f62c8a0..0073e3061a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Changelog for NeoFS Node - `neofs-lens meta resync` command (#3849) - `policer.boost_multiplier` SN config option (#3855) - `neofs-lens storage flush-write-caches` command (#3872) +- Reload gRPC SN config with SIGHUP (#3874) ### Fixed - Resending the header after chunks have already been sent in object service `Get` handler (#3833) diff --git a/cmd/neofs-node/accounting.go b/cmd/neofs-node/accounting.go index 482025be93..aab315f370 100644 --- a/cmd/neofs-node/accounting.go +++ b/cmd/neofs-node/accounting.go @@ -3,6 +3,7 @@ package main import ( accountingService "github.com/nspcc-dev/neofs-node/pkg/services/accounting" protoaccounting "github.com/nspcc-dev/neofs-sdk-go/proto/accounting" + "google.golang.org/grpc" ) func initAccountingService(c *cfg) { @@ -12,7 +13,7 @@ func initAccountingService(c *cfg) { server := accountingService.New(&c.key.PrivateKey, c.networkState, c.bCli) - for _, srv := range c.cfgGRPC.servers { + c.cfgGRPC.registerService(func(srv *grpc.Server) { protoaccounting.RegisterAccountingServiceServer(srv, server) - } + }) } diff --git a/cmd/neofs-node/config.go b/cmd/neofs-node/config.go index bfcd5f3245..ba1e91cbed 100644 --- a/cmd/neofs-node/config.go +++ b/cmd/neofs-node/config.go @@ -233,9 +233,26 @@ func (c *cfg) GetNetworkMap() (netmap.NetMap, error) { func (c *cfg) CurrentEpoch() uint64 { return c.networkState.CurrentEpoch() } type cfgGRPC struct { + mu sync.Mutex + listeners []net.Listener + servers []*grpc.Server + + // serviceRegistrators stores functions that register gRPC service + // implementations into a gRPC server. + serviceRegistrators []func(*grpc.Server) +} - servers []*grpc.Server +// registerService saves the registrator function and immediately applies it to +// all currently running gRPC servers. +func (g *cfgGRPC) registerService(f func(*grpc.Server)) { + g.mu.Lock() + defer g.mu.Unlock() + + g.serviceRegistrators = append(g.serviceRegistrators, f) + for _, srv := range g.servers { + f(srv) + } } type cfgMeta struct { @@ -681,6 +698,7 @@ func (c *cfg) configWatcher(ctx context.Context) { oldMetrics := writeMetricConfig(c.appCfg) oldProfiler := writeProfilerConfig(c.appCfg) + oldGRPC := writeGRPCConfig(c.appCfg) c.appCfg, err = config.New(config.WithConfigFile(c.appCfg.Path())) if err != nil { @@ -745,6 +763,13 @@ func (c *cfg) configWatcher(ctx context.Context) { continue } + // gRPC + + if err = reloadGRPC(c, oldGRPC); err != nil { + c.log.Error("gRPC configuration reload", zap.Error(err)) + continue + } + c.log.Info("configuration has been reloaded successfully") case <-ctx.Done(): return diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index 5972918339..b1fc2c25ed 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -31,6 +31,7 @@ import ( protocontainer "github.com/nspcc-dev/neofs-sdk-go/proto/container" "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/zap" + "google.golang.org/grpc" ) func initContainerService(c *cfg) { @@ -112,9 +113,9 @@ func initContainerService(c *cfg) { cnrSrv.ResetSessionTokenCheckCache() }) - for _, srv := range c.cfgGRPC.servers { + c.cfgGRPC.registerService(func(srv *grpc.Server) { protocontainer.RegisterContainerServiceServer(srv, cnrSrv) - } + }) } func initSizeLoadReports(c *cfg) { diff --git a/cmd/neofs-node/grpc.go b/cmd/neofs-node/grpc.go index 6f0f8ff0c7..20008e933c 100644 --- a/cmd/neofs-node/grpc.go +++ b/cmd/neofs-node/grpc.go @@ -1,13 +1,19 @@ package main import ( + "crypto/sha256" "crypto/tls" "errors" "fmt" + "hash" + "io" "math" "net" + "os" "time" + "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config" + grpcconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/grpc" iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" "github.com/nspcc-dev/neofs-sdk-go/object" "go.uber.org/zap" @@ -18,6 +24,69 @@ import ( "google.golang.org/grpc/resolver" ) +const maxTLSFingerprintFileBytes int64 = 16 << 10 // 16 KB + +type grpcServerSnapshot struct { + grpcconfig.GRPC + certFingerprint string +} + +func (s grpcServerSnapshot) unchanged(other grpcServerSnapshot) bool { + return s.ConnLimit == other.ConnLimit && + s.TLS.Enabled == other.TLS.Enabled && + s.TLS.Certificate == other.TLS.Certificate && + s.TLS.Key == other.TLS.Key && + s.certFingerprint == other.certFingerprint +} + +type grpcConfigSnapshot []grpcServerSnapshot + +func writeGRPCConfig(c *config.Config) grpcConfigSnapshot { + snap := make(grpcConfigSnapshot, len(c.GRPC)) + for i, sc := range c.GRPC { + snap[i] = grpcServerSnapshot{ + GRPC: sc, + certFingerprint: tlsCertFingerprint(sc.TLS.Certificate, sc.TLS.Key), + } + } + return snap +} + +func tlsCertFingerprint(certFile, keyFile string) string { + if keyFile == "" { + return "" + } + + h := sha256.New() + err := hashFileLimited(h, certFile, maxTLSFingerprintFileBytes) + if err != nil { + return "" + } + _, _ = h.Write([]byte{0}) + err = hashFileLimited(h, keyFile, maxTLSFingerprintFileBytes) + if err != nil { + return "" + } + + return string(h.Sum(nil)) +} + +func hashFileLimited(dst hash.Hash, path string, maxBytes int64) error { + f, err := os.Open(path) + if err != nil { + return err + } + defer func() { + _ = f.Close() + }() + + _, err = io.Copy(dst, io.LimitReader(f, maxBytes)) + if err != nil { + return err + } + return nil +} + func initGRPC(c *cfg) { // although docs state that 'passthrough' is set by default, it should be set explicitly for activation resolver.SetDefaultScheme("passthrough") @@ -26,13 +95,39 @@ func initGRPC(c *cfg) { initMorphComponents(c) } + maxRecvMsgSizeOpt, err := getMaxRecvMsgSizeOpt(c) + fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err) + + if err := buildGRPCServers(c, maxRecvMsgSizeOpt); err != nil { + fatalOnErr(err) + } + + // register a single shutdown hook that stops whatever servers are current + // at the time of shutdown (including those created by reload). + c.onShutdown(func() { + c.cfgGRPC.mu.Lock() + srvs := make([]*grpc.Server, len(c.cfgGRPC.servers)) + copy(srvs, c.cfgGRPC.servers) + c.cfgGRPC.mu.Unlock() + for _, srv := range srvs { + stopGRPC("NeoFS Public API", srv, c.log) + } + }) +} + +// getMaxRecvMsgSizeOpt computes the grpc.MaxRecvMsgSize option based on the +// network's max object size setting. Returns nil when the default gRPC limit is +// sufficient. +func getMaxRecvMsgSizeOpt(c *cfg) (grpc.ServerOption, error) { + maxObjSize, err := c.nCli.MaxObjectSize() + if err != nil { + return nil, err + } + // limit max size of single messages received by the gRPC servers up to max // object size setting of the NeoFS network: this is needed to serve // ObjectService.Replicate RPC transmitting the entire stored object in one // message - maxObjSize, err := c.nCli.MaxObjectSize() - fatalOnErrDetails("read max object size network setting to determine gRPC recv message limit", err) - maxRecvSize := maxObjSize // don't forget about meta fields if maxRecvSize < uint64(math.MaxUint64-object.MaxHeaderLen) { // just in case, always true in practice @@ -41,84 +136,183 @@ func initGRPC(c *cfg) { maxRecvSize = math.MaxUint64 } - var maxRecvMsgSizeOpt grpc.ServerOption - if maxRecvSize > maxMsgSize { // do not decrease default value - if maxRecvSize > math.MaxInt { - // ^2GB for 32-bit systems which is currently enough in practice. If at some - // point this is not enough, we'll need to expand the option - fatalOnErr(fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d", - maxRecvSize, math.MaxInt)) - } - maxRecvMsgSizeOpt = grpc.MaxRecvMsgSize(int(maxRecvSize)) - c.log.Debug("limit max recv gRPC message size to fit max stored objects", - zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize)) + if maxRecvSize <= maxMsgSize { // do not decrease default value + return nil, nil } - var successCount int + if maxRecvSize > math.MaxInt { + // ^2GB for 32-bit systems which is currently enough in practice. If at some + // point this is not enough, we'll need to expand the option + return nil, fmt.Errorf("cannot serve NeoFS API over gRPC: object of max size is bigger than gRPC server is able to support %d>%d", + maxRecvSize, math.MaxInt) + } + + c.log.Debug("limit max recv gRPC message size to fit max stored objects", + zap.Uint64("max object size", maxObjSize), zap.Uint64("max recv msg", maxRecvSize)) + + return grpc.MaxRecvMsgSize(int(maxRecvSize)), nil +} + +func buildGRPCServers(c *cfg, maxRecvMsgSizeOpt grpc.ServerOption) error { + if len(c.appCfg.GRPC) == 0 { + return errors.New("could not listen to any gRPC endpoints") + } for _, sc := range c.appCfg.GRPC { - serverOpts := []grpc.ServerOption{ - grpc.MaxSendMsgSize(maxMsgSize), - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 5 * time.Second, // w/o this server sends GoAway with ENHANCE_YOUR_CALM code "too_many_pings" - PermitWithoutStream: true, - }), - grpc.ForceServerCodecV2(iprotobuf.BufferedCodec{}), - } - if maxRecvMsgSizeOpt != nil { - // TODO(@cthulhu-rider): the setting can be server-global only now, support - // per-RPC limits - // TODO(@cthulhu-rider): max object size setting may change in general, - // but server configuration is static now - serverOpts = append(serverOpts, maxRecvMsgSizeOpt) + srv, lis, err := buildSingleGRPCServer(c, sc, maxRecvMsgSizeOpt) + if err != nil { + return err } + c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis) + c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv) + } + return nil +} - tlsCfg := sc.TLS +func buildSingleGRPCServer(c *cfg, sc grpcconfig.GRPC, maxRecvMsgSizeOpt grpc.ServerOption) (*grpc.Server, net.Listener, error) { + serverOpts := []grpc.ServerOption{ + grpc.MaxSendMsgSize(maxMsgSize), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 5 * time.Second, // w/o this server sends GoAway with ENHANCE_YOUR_CALM code "too_many_pings" + PermitWithoutStream: true, + }), + grpc.ForceServerCodecV2(iprotobuf.BufferedCodec{}), + } + if maxRecvMsgSizeOpt != nil { + // TODO(@cthulhu-rider): the setting can be server-global only now, support + // per-RPC limits + // TODO(@cthulhu-rider): max object size setting may change in general, + // but server configuration is static now + serverOpts = append(serverOpts, maxRecvMsgSizeOpt) + } - if tlsCfg.Key != "" { - cert, err := tls.LoadX509KeyPair(tlsCfg.Certificate, tlsCfg.Key) - if err != nil { - c.log.Error("could not read certificate from file", zap.Error(err)) - return - } + tlsCfg := sc.TLS - creds := credentials.NewTLS(&tls.Config{ - Certificates: []tls.Certificate{cert}, - }) + if tlsCfg.Key != "" { + certFile, keyFile := tlsCfg.Certificate, tlsCfg.Key - serverOpts = append(serverOpts, grpc.Creds(creds)) + if _, err := tls.LoadX509KeyPair(certFile, keyFile); err != nil { + c.log.Error("could not read certificate from file", zap.Error(err)) + return nil, nil, err } - lis, err := net.Listen("tcp", sc.Endpoint) - if err != nil { - c.log.Error("can't listen gRPC endpoint", zap.Error(err)) - return - } + // read certificate from disk on each handshake to pick up renewals automatically. + creds := credentials.NewTLS(&tls.Config{ + GetConfigForClient: func(*tls.ClientHelloInfo) (*tls.Config, error) { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return nil, fmt.Errorf("reload TLS certificate: %w", err) + } + return &tls.Config{ + Certificates: []tls.Certificate{cert}, + }, nil + }, + }) + + serverOpts = append(serverOpts, grpc.Creds(creds)) + } + + lis, err := net.Listen("tcp", sc.Endpoint) + if err != nil { + c.log.Error("can't listen gRPC endpoint", zap.Error(err)) + return nil, nil, err + } + + if connLimit := sc.ConnLimit; connLimit > 0 { + lis = netutil.LimitListener(lis, connLimit) + } + + return grpc.NewServer(serverOpts...), lis, nil +} + +// reloadGRPC performs a fine-grained reload: only gRPC servers whose +// configuration or TLS certificate has actually changed are stopped and +// re-created; the rest continue serving without interruption. +func reloadGRPC(c *cfg, oldCfg grpcConfigSnapshot) error { + newCfg := writeGRPCConfig(c.appCfg) - if connLimit := sc.ConnLimit; connLimit > 0 { - lis = netutil.LimitListener(lis, connLimit) + maxRecvMsgSizeOpt, err := getMaxRecvMsgSizeOpt(c) + if err != nil { + return fmt.Errorf("get max recv msg size option: %w", err) + } + + c.cfgGRPC.mu.Lock() + defer c.cfgGRPC.mu.Unlock() + + type serverEntry struct { + srv *grpc.Server + lis net.Listener + snap grpcServerSnapshot + } + oldByEndpoint := make(map[string]serverEntry, len(c.cfgGRPC.servers)) + for i, srv := range c.cfgGRPC.servers { + if i < len(oldCfg) { + oldByEndpoint[oldCfg[i].Endpoint] = serverEntry{ + srv: srv, + lis: c.cfgGRPC.listeners[i], + snap: oldCfg[i], + } } + } - c.cfgGRPC.listeners = append(c.cfgGRPC.listeners, lis) + newServers := make([]*grpc.Server, 0, len(newCfg)) + newListeners := make([]net.Listener, 0, len(newCfg)) + // freshServers/freshListeners hold only newly created servers that need + // service registration and must start serving. + var freshServers []*grpc.Server + var freshListeners []net.Listener - srv := grpc.NewServer(serverOpts...) + for _, newSnap := range newCfg { + if old, ok := oldByEndpoint[newSnap.Endpoint]; ok { + delete(oldByEndpoint, newSnap.Endpoint) + if old.snap.unchanged(newSnap) { + newServers = append(newServers, old.srv) + newListeners = append(newListeners, old.lis) + continue + } + stopGRPC("NeoFS Public API", old.srv, c.log) + } - c.onShutdown(func() { - stopGRPC("NeoFS Public API", srv, c.log) - }) + srv, lis, err := buildSingleGRPCServer(c, newSnap.GRPC, maxRecvMsgSizeOpt) + if err != nil { + c.log.Error("failed to start gRPC server", + zap.String("endpoint", newSnap.Endpoint), zap.Error(err)) + continue + } + newServers = append(newServers, srv) + newListeners = append(newListeners, lis) + freshServers = append(freshServers, srv) + freshListeners = append(freshListeners, lis) + } - c.cfgGRPC.servers = append(c.cfgGRPC.servers, srv) - successCount++ + // stop servers that were removed from the config entirely + for _, entry := range oldByEndpoint { + stopGRPC("NeoFS Public API", entry.srv, c.log) + } + + if len(newServers) == 0 { + return errors.New("could not listen to any gRPC endpoints") } - if successCount == 0 { - fatalOnErr(errors.New("could not listen to any gRPC endpoints")) + c.cfgGRPC.servers = newServers + c.cfgGRPC.listeners = newListeners + + for _, reg := range c.cfgGRPC.serviceRegistrators { + for _, srv := range freshServers { + reg(srv) + } } + serveGRPCList(c, freshServers, freshListeners) + return nil } func serveGRPC(c *cfg) { - for i := range c.cfgGRPC.servers { - srv := c.cfgGRPC.servers[i] - lis := c.cfgGRPC.listeners[i] + serveGRPCList(c, c.cfgGRPC.servers, c.cfgGRPC.listeners) +} + +func serveGRPCList(c *cfg, servers []*grpc.Server, listeners []net.Listener) { + for i := range servers { + srv := servers[i] + lis := listeners[i] c.wg.Go(func() { defer func() { @@ -132,7 +326,7 @@ func serveGRPC(c *cfg) { ) if err := srv.Serve(lis); err != nil { - fmt.Println("gRPC server error", err) + c.log.Error("gRPC server failed", zap.Stringer("endpoint", lis.Addr()), zap.Error(err)) } }) } diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 01da6f8924..c7de08de96 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -345,9 +345,9 @@ func initObjectService(c *cfg) { return server.HeadBuffered(ctx, req), nil } - for _, srv := range c.cfgGRPC.servers { + c.cfgGRPC.registerService(func(srv *grpc.Server) { srv.RegisterService(&svcDesc, server) - } + }) } type reputationClientConstructor struct { diff --git a/cmd/neofs-node/reputation.go b/cmd/neofs-node/reputation.go index 6391db9e51..bf9261efe6 100644 --- a/cmd/neofs-node/reputation.go +++ b/cmd/neofs-node/reputation.go @@ -31,6 +31,7 @@ import ( apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation" "github.com/nspcc-dev/neofs-sdk-go/version" "go.uber.org/zap" + "google.golang.org/grpc" ) func initReputationService(c *cfg) { @@ -203,9 +204,9 @@ func initReputationService(c *cfg) { routeBuilder: localRouteBuilder, } - for _, srv := range c.cfgGRPC.servers { + c.cfgGRPC.registerService(func(srv *grpc.Server) { protoreputation.RegisterReputationServiceServer(srv, server) - } + }) // initialize eigen trust block timer newEigenTrustIterTimer(c) diff --git a/cmd/neofs-node/session.go b/cmd/neofs-node/session.go index 253d117112..e770b356ca 100644 --- a/cmd/neofs-node/session.go +++ b/cmd/neofs-node/session.go @@ -8,6 +8,7 @@ import ( protosession "github.com/nspcc-dev/neofs-sdk-go/proto/session" sessionv2 "github.com/nspcc-dev/neofs-sdk-go/session/v2" "github.com/nspcc-dev/neofs-sdk-go/user" + "google.golang.org/grpc" ) type sessionStorage interface { @@ -26,7 +27,7 @@ func initSessionService(c *cfg) { server := sessionSvc.New(&c.key.PrivateKey, c, c.privateTokenStore) - for _, srv := range c.cfgGRPC.servers { + c.cfgGRPC.registerService(func(srv *grpc.Server) { protosession.RegisterSessionServiceServer(srv, server) - } + }) } From 0aa32fd749a17fa983e7d2311e347508fa72dc89 Mon Sep 17 00:00:00 2001 From: Andrey Butusov Date: Tue, 10 Mar 2026 22:17:54 +0300 Subject: [PATCH 2/2] docs: update SIGHUP doc Signed-off-by: Andrey Butusov --- docs/sighup.md | 77 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 75 insertions(+), 2 deletions(-) diff --git a/docs/sighup.md b/docs/sighup.md index 8fabb95780..062c6d6c56 100644 --- a/docs/sighup.md +++ b/docs/sighup.md @@ -4,15 +4,59 @@ Logger level can be reloaded with a SIGHUP. +## Prometheus + +If any of the fields below are changed, the Prometheus HTTP server is restarted. + +```yml +prometheus: + enabled: + address: + shutdown_timeout: +``` + +## pprof + +If any of the fields below are changed, the pprof HTTP server is restarted. + +```yml +pprof: + enabled: + address: + shutdown_timeout: + enable_block: + enable_mutex: +``` + +`enable_block` and `enable_mutex` are applied immediately without +restarting the HTTP server. + +## Object pool + +Worker pool sizes are tuned without restarting the pools. + +```yml +object: + put: + pool_size_remote: + search: + pool_size: + +replicator: + pool_size: +``` + ## Policer Available for reconfiguration fields: ```yml +policer: head_timeout: replication_cooldown: object_batch_size: max_workers: + boost_multiplier: ``` ## Storage engine @@ -33,14 +77,43 @@ comparing paths from `shard.blobstor` section. After this we have 3 sets: |-----------------|----------------------------------------------------------------------------------------------------------------------| | `path` | If `path` is different, metabase is closed and opened with a new path. All other configuration will also be updated. | -### FS chain +## FS chain | Changed section | Actions | |-----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `endpoints` | Updates N3 endpoints.
If new `endpoints` do not contain the endpoint client is connected to, it will reconnect to another endpoint from the new list. Node service can be interrupted in this case. | -### Node +## Node | Changed section | Actions | |-----------------|--------------------------| | `attribute_*` | Updates node attributes. | + +## Meta service + +```yml +fschain: + endpoints: +``` + +The meta service re-applies the updated FS chain RPC endpoints. + +## gRPC + +If any field in the `grpc` section has changed, all public API gRPC servers are +gracefully stopped and restarted with the new configuration. All service +implementations (Object, Container, Session, Accounting, Reputation) are +re-registered automatically in the new servers. + +```yml +grpc: + - endpoint: + conn_limit: + tls: + enabled: + certificate: + key: +``` + +During the restart there is a short period of unavailability. +The control service gRPC endpoint is not affected by SIGHUP.