From 0d7a1ce3d99827dcf365a0c89af252af20c38d12 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 3 Apr 2026 21:11:56 -0500 Subject: [PATCH 1/3] Add defensive nil check in Stream.Close() --- pkg/dmsg/stream.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/dmsg/stream.go b/pkg/dmsg/stream.go index bb70240b..7f49ee5c 100644 --- a/pkg/dmsg/stream.go +++ b/pkg/dmsg/stream.go @@ -71,7 +71,10 @@ func (s *Stream) Close() error { if s.sStr != nil { return s.sStr.Close() } - return s.yStr.Close() + if s.yStr != nil { + return s.yStr.Close() + } + return nil } // Logger returns the internal logrus.FieldLogger instance. From 90db01b72d7fc662dcb1aa61c34cae2245a7f64e Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 3 Apr 2026 21:15:58 -0500 Subject: [PATCH 2/3] Fix smux ping timeout: close stream after echo to flush write buffer The server-side ping echo wrote the response but never closed the smux stream. Without an explicit close, smux buffers the 2-byte response and the client's read times out waiting for data that never arrives. Also add defensive nil check in Stream.Close(). --- pkg/dmsg/server_session.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/dmsg/server_session.go b/pkg/dmsg/server_session.go index 3ff0c90b..32f0b5dc 100644 --- a/pkg/dmsg/server_session.go +++ b/pkg/dmsg/server_session.go @@ -153,9 +153,15 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl return err } if header[0] == 0 && header[1] == 0 { - // Ping: echo back the marker and close. + // Ping: echo back the marker and close the stream to flush the write. + // Without the explicit close, smux may buffer the response and the + // client's read times out waiting for data that never arrives. _, err := yStr.Write(pingMarker) - return err + closeErr := yStr.Close() + if err != nil { + return err + } + return closeErr } // Not a ping — the 2 bytes are the length prefix of a normal object. From 32b2594cc3c006cb782b7f7197160340e3071e09 Mon Sep 17 00:00:00 2001 From: Moses Narrow <36607567+0pcom@users.noreply.github.com> Date: Fri, 3 Apr 2026 21:21:58 -0500 Subject: [PATCH 3/3] Tune yamux and smux configs for production workloads Yamux: increase AcceptBacklog to 512, reduce StreamOpenTimeout to 20s and StreamCloseTimeout to 30s, suppress stderr logging via io.Discard. Smux: upgrade to protocol v2 (adds flow control), reduce MaxReceiveBuffer to 1MB per session (from 4MB). --- pkg/dmsg/client_sessions.go | 4 ++-- pkg/dmsg/server.go | 6 +++--- pkg/dmsg/types.go | 29 +++++++++++++++++++++++++++++ 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/pkg/dmsg/client_sessions.go b/pkg/dmsg/client_sessions.go index d6b6ccd7..ba213b7f 100644 --- a/pkg/dmsg/client_sessions.go +++ b/pkg/dmsg/client_sessions.go @@ -93,14 +93,14 @@ func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs Client return ClientSession{}, err } if entry.Protocol == "smux" { - dSes.sm.smux, err = smux.Client(conn, smux.DefaultConfig()) + dSes.sm.smux, err = smux.Client(conn, SmuxConfig()) if err != nil { conn.Close() //nolint:errcheck,gosec return ClientSession{}, err } ce.log.Infof("smux stream session initial for %s", dSes.RemotePK().String()) } else { - dSes.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig()) + dSes.sm.yamux, err = yamux.Client(conn, YamuxConfig()) if err != nil { conn.Close() //nolint:errcheck,gosec return ClientSession{}, err diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index 9bd4a622..213e9091 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -380,7 +380,7 @@ func (s *Server) maintainPeerConnection(ctx context.Context, peer PeerEntry) { } ses.sm.mutx.Lock() - ses.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig()) + ses.sm.yamux, err = yamux.Client(conn, YamuxConfig()) if err != nil { ses.sm.mutx.Unlock() log.WithError(err).Warn("Peer yamux setup failed.") @@ -470,7 +470,7 @@ func (s *Server) handleSession(conn net.Conn) { // based on protocol, create smux or yamux stream session dSes.sm.mutx.Lock() if protocol == "smux" { - dSes.sm.smux, err = smux.Server(conn, smux.DefaultConfig()) + dSes.sm.smux, err = smux.Server(conn, SmuxConfig()) if err != nil { dSes.sm.mutx.Unlock() conn.Close() //nolint:errcheck,gosec @@ -480,7 +480,7 @@ func (s *Server) handleSession(conn net.Conn) { dSes.sm.addr = dSes.sm.smux.RemoteAddr() log.Infof("smux stream session initial for %s", dSes.RemotePK().String()) } else { - dSes.sm.yamux, err = yamux.Server(conn, yamux.DefaultConfig()) + dSes.sm.yamux, err = yamux.Server(conn, YamuxConfig()) if err != nil { dSes.sm.mutx.Unlock() conn.Close() //nolint:errcheck,gosec diff --git a/pkg/dmsg/types.go b/pkg/dmsg/types.go index dfde5ade..99c6e31d 100644 --- a/pkg/dmsg/types.go +++ b/pkg/dmsg/types.go @@ -4,11 +4,14 @@ package dmsg import ( "errors" "fmt" + "io" "net" "strings" "time" + "github.com/hashicorp/yamux" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" + "github.com/xtaci/smux" ) const ( @@ -37,6 +40,32 @@ var ( AcceptBufferSize = 20 ) +// YamuxConfig returns a tuned yamux configuration for dmsg sessions. +func YamuxConfig() *yamux.Config { + return &yamux.Config{ + AcceptBacklog: 512, + EnableKeepAlive: true, + KeepAliveInterval: 30 * time.Second, + ConnectionWriteTimeout: 10 * time.Second, + MaxStreamWindowSize: 256 * 1024, + StreamOpenTimeout: 20 * time.Second, + StreamCloseTimeout: 30 * time.Second, + LogOutput: io.Discard, + } +} + +// SmuxConfig returns a tuned smux configuration for dmsg sessions. +func SmuxConfig() *smux.Config { + return &smux.Config{ + Version: 2, + KeepAliveInterval: 10 * time.Second, + KeepAliveTimeout: 30 * time.Second, + MaxFrameSize: 32768, + MaxReceiveBuffer: 1048576, + MaxStreamBuffer: 65536, + } +} + // Addr implements net.Addr for dmsg addresses. type Addr struct { PK cipher.PubKey `json:"public_key"`