diff --git a/pkg/dmsg/client_sessions.go b/pkg/dmsg/client_sessions.go index d6b6ccd7..ba213b7f 100644 --- a/pkg/dmsg/client_sessions.go +++ b/pkg/dmsg/client_sessions.go @@ -93,14 +93,14 @@ func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs Client return ClientSession{}, err } if entry.Protocol == "smux" { - dSes.sm.smux, err = smux.Client(conn, smux.DefaultConfig()) + dSes.sm.smux, err = smux.Client(conn, SmuxConfig()) if err != nil { conn.Close() //nolint:errcheck,gosec return ClientSession{}, err } ce.log.Infof("smux stream session initial for %s", dSes.RemotePK().String()) } else { - dSes.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig()) + dSes.sm.yamux, err = yamux.Client(conn, YamuxConfig()) if err != nil { conn.Close() //nolint:errcheck,gosec return ClientSession{}, err diff --git a/pkg/dmsg/server.go b/pkg/dmsg/server.go index 9bd4a622..213e9091 100644 --- a/pkg/dmsg/server.go +++ b/pkg/dmsg/server.go @@ -380,7 +380,7 @@ func (s *Server) maintainPeerConnection(ctx context.Context, peer PeerEntry) { } ses.sm.mutx.Lock() - ses.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig()) + ses.sm.yamux, err = yamux.Client(conn, YamuxConfig()) if err != nil { ses.sm.mutx.Unlock() log.WithError(err).Warn("Peer yamux setup failed.") @@ -470,7 +470,7 @@ func (s *Server) handleSession(conn net.Conn) { // based on protocol, create smux or yamux stream session dSes.sm.mutx.Lock() if protocol == "smux" { - dSes.sm.smux, err = smux.Server(conn, smux.DefaultConfig()) + dSes.sm.smux, err = smux.Server(conn, SmuxConfig()) if err != nil { dSes.sm.mutx.Unlock() conn.Close() //nolint:errcheck,gosec @@ -480,7 +480,7 @@ func (s *Server) handleSession(conn net.Conn) { dSes.sm.addr = dSes.sm.smux.RemoteAddr() log.Infof("smux stream session initial for %s", dSes.RemotePK().String()) } else { - dSes.sm.yamux, err = yamux.Server(conn, yamux.DefaultConfig()) + dSes.sm.yamux, err = yamux.Server(conn, YamuxConfig()) if err != nil { dSes.sm.mutx.Unlock() conn.Close() //nolint:errcheck,gosec diff --git a/pkg/dmsg/server_session.go b/pkg/dmsg/server_session.go index 3ff0c90b..32f0b5dc 100644 --- a/pkg/dmsg/server_session.go +++ b/pkg/dmsg/server_session.go @@ -153,9 +153,15 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl return err } if header[0] == 0 && header[1] == 0 { - // Ping: echo back the marker and close. + // Ping: echo back the marker and close the stream to flush the write. + // Without the explicit close, smux may buffer the response and the + // client's read times out waiting for data that never arrives. _, err := yStr.Write(pingMarker) - return err + closeErr := yStr.Close() + if err != nil { + return err + } + return closeErr } // Not a ping — the 2 bytes are the length prefix of a normal object. diff --git a/pkg/dmsg/stream.go b/pkg/dmsg/stream.go index bb70240b..7f49ee5c 100644 --- a/pkg/dmsg/stream.go +++ b/pkg/dmsg/stream.go @@ -71,7 +71,10 @@ func (s *Stream) Close() error { if s.sStr != nil { return s.sStr.Close() } - return s.yStr.Close() + if s.yStr != nil { + return s.yStr.Close() + } + return nil } // Logger returns the internal logrus.FieldLogger instance. diff --git a/pkg/dmsg/types.go b/pkg/dmsg/types.go index dfde5ade..99c6e31d 100644 --- a/pkg/dmsg/types.go +++ b/pkg/dmsg/types.go @@ -4,11 +4,14 @@ package dmsg import ( "errors" "fmt" + "io" "net" "strings" "time" + "github.com/hashicorp/yamux" "github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher" + "github.com/xtaci/smux" ) const ( @@ -37,6 +40,32 @@ var ( AcceptBufferSize = 20 ) +// YamuxConfig returns a tuned yamux configuration for dmsg sessions. +func YamuxConfig() *yamux.Config { + return &yamux.Config{ + AcceptBacklog: 512, + EnableKeepAlive: true, + KeepAliveInterval: 30 * time.Second, + ConnectionWriteTimeout: 10 * time.Second, + MaxStreamWindowSize: 256 * 1024, + StreamOpenTimeout: 20 * time.Second, + StreamCloseTimeout: 30 * time.Second, + LogOutput: io.Discard, + } +} + +// SmuxConfig returns a tuned smux configuration for dmsg sessions. +func SmuxConfig() *smux.Config { + return &smux.Config{ + Version: 2, + KeepAliveInterval: 10 * time.Second, + KeepAliveTimeout: 30 * time.Second, + MaxFrameSize: 32768, + MaxReceiveBuffer: 1048576, + MaxStreamBuffer: 65536, + } +} + // Addr implements net.Addr for dmsg addresses. type Addr struct { PK cipher.PubKey `json:"public_key"`