Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/dmsg/client_sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ func (ce *Client) dialSession(ctx context.Context, entry *disc.Entry) (cs Client
return ClientSession{}, err
}
if entry.Protocol == "smux" {
dSes.sm.smux, err = smux.Client(conn, smux.DefaultConfig())
dSes.sm.smux, err = smux.Client(conn, SmuxConfig())
if err != nil {
conn.Close() //nolint:errcheck,gosec
return ClientSession{}, err
}
ce.log.Infof("smux stream session initial for %s", dSes.RemotePK().String())
} else {
dSes.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig())
dSes.sm.yamux, err = yamux.Client(conn, YamuxConfig())
if err != nil {
conn.Close() //nolint:errcheck,gosec
return ClientSession{}, err
Expand Down
6 changes: 3 additions & 3 deletions pkg/dmsg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (s *Server) maintainPeerConnection(ctx context.Context, peer PeerEntry) {
}

ses.sm.mutx.Lock()
ses.sm.yamux, err = yamux.Client(conn, yamux.DefaultConfig())
ses.sm.yamux, err = yamux.Client(conn, YamuxConfig())
if err != nil {
ses.sm.mutx.Unlock()
log.WithError(err).Warn("Peer yamux setup failed.")
Expand Down Expand Up @@ -470,7 +470,7 @@ func (s *Server) handleSession(conn net.Conn) {
// based on protocol, create smux or yamux stream session
dSes.sm.mutx.Lock()
if protocol == "smux" {
dSes.sm.smux, err = smux.Server(conn, smux.DefaultConfig())
dSes.sm.smux, err = smux.Server(conn, SmuxConfig())
if err != nil {
dSes.sm.mutx.Unlock()
conn.Close() //nolint:errcheck,gosec
Expand All @@ -480,7 +480,7 @@ func (s *Server) handleSession(conn net.Conn) {
dSes.sm.addr = dSes.sm.smux.RemoteAddr()
log.Infof("smux stream session initial for %s", dSes.RemotePK().String())
} else {
dSes.sm.yamux, err = yamux.Server(conn, yamux.DefaultConfig())
dSes.sm.yamux, err = yamux.Server(conn, YamuxConfig())
if err != nil {
dSes.sm.mutx.Unlock()
conn.Close() //nolint:errcheck,gosec
Expand Down
10 changes: 8 additions & 2 deletions pkg/dmsg/server_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,15 @@ func (ss *ServerSession) serveStream(log logrus.FieldLogger, yStr io.ReadWriteCl
return err
}
if header[0] == 0 && header[1] == 0 {
// Ping: echo back the marker and close.
// Ping: echo back the marker and close the stream to flush the write.
// Without the explicit close, smux may buffer the response and the
// client's read times out waiting for data that never arrives.
_, err := yStr.Write(pingMarker)
return err
closeErr := yStr.Close()
if err != nil {
return err
}
return closeErr
}

// Not a ping — the 2 bytes are the length prefix of a normal object.
Expand Down
5 changes: 4 additions & 1 deletion pkg/dmsg/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func (s *Stream) Close() error {
if s.sStr != nil {
return s.sStr.Close()
}
return s.yStr.Close()
if s.yStr != nil {
return s.yStr.Close()
}
return nil
}

// Logger returns the internal logrus.FieldLogger instance.
Expand Down
29 changes: 29 additions & 0 deletions pkg/dmsg/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ package dmsg
import (
"errors"
"fmt"
"io"
"net"
"strings"
"time"

"github.com/hashicorp/yamux"
"github.com/skycoin/skywire/pkg/skywire-utilities/pkg/cipher"
"github.com/xtaci/smux"
)

const (
Expand Down Expand Up @@ -37,6 +40,32 @@ var (
AcceptBufferSize = 20
)

// YamuxConfig returns a tuned yamux configuration for dmsg sessions.
func YamuxConfig() *yamux.Config {
return &yamux.Config{
AcceptBacklog: 512,
EnableKeepAlive: true,
KeepAliveInterval: 30 * time.Second,
ConnectionWriteTimeout: 10 * time.Second,
MaxStreamWindowSize: 256 * 1024,
StreamOpenTimeout: 20 * time.Second,
StreamCloseTimeout: 30 * time.Second,
LogOutput: io.Discard,
}
}

// SmuxConfig returns a tuned smux configuration for dmsg sessions.
func SmuxConfig() *smux.Config {
return &smux.Config{
Version: 2,
KeepAliveInterval: 10 * time.Second,
KeepAliveTimeout: 30 * time.Second,
MaxFrameSize: 32768,
MaxReceiveBuffer: 1048576,
MaxStreamBuffer: 65536,
}
}

// Addr implements net.Addr for dmsg addresses.
type Addr struct {
PK cipher.PubKey `json:"public_key"`
Expand Down
Loading