mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-02 06:23:37 +02:00
feat(smux): replace internal/mux with smux over KCP
Replace the hand-rolled multiplexer (internal/mux) with xtaci/smux v2 running on top of the existing KCP-reliable vp8channel transport. - Add internal/muxconn: io.ReadWriteCloser adapter bridging link.Link (message-oriented) into the byte-stream smux expects; applies AEAD on every write and inverts it on every received message - Rewrite client: smux.Client session over muxconn; OpenStream per SOCKS5 connection; reconnect handler tears down and rebuilds session - Rewrite server: smux.Server session; AcceptStream loop dispatches each stream to a proxy handler; tolerates session bounces on reconnect - Delete internal/mux: all sequence/reorder/buffer logic is now handled by smux + KCP
This commit is contained in:
@@ -0,0 +1,119 @@
|
||||
// Package muxconn adapts a link.Link into an io.ReadWriteCloser suitable for
|
||||
// driving a smux session. The wrapper applies AEAD on every wire-bound write
|
||||
// and inverts it on every received message before exposing the bytes as a
|
||||
// byte stream.
|
||||
//
|
||||
// Link semantics are message-oriented: each Send produces exactly one OnData
|
||||
// on the peer. smux operates on a pure byte stream (header + payload may be
|
||||
// glued or split across reads). We bridge by:
|
||||
//
|
||||
// - Treating each Push as an opaque chunk appended to an internal byte
|
||||
// buffer that Read drains in arbitrary slices.
|
||||
// - Letting smux's sendLoop call Write once per frame; we encrypt and hand
|
||||
// the whole buffer to the link as a single message. Length boundaries
|
||||
// are preserved end-to-end by the transport (KCP length-prefix framing
|
||||
// in vp8channel, native message boundaries in datachannel).
|
||||
package muxconn
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/openlibrecommunity/olcrtc/internal/crypto"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/link"
|
||||
)
|
||||
|
||||
// ErrClosed is returned from Read/Write after the conn has been closed.
|
||||
var ErrClosed = errors.New("muxconn: closed")
|
||||
|
||||
// Conn is an io.ReadWriteCloser over a link.Link with optional AEAD wrapping.
|
||||
type Conn struct {
|
||||
ln link.Link
|
||||
cipher *crypto.Cipher
|
||||
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
buf []byte
|
||||
closed bool
|
||||
}
|
||||
|
||||
// New wires a Conn over the given link. Push must be set as the link's OnData
|
||||
// callback before this conn is used.
|
||||
func New(ln link.Link, cipher *crypto.Cipher) *Conn {
|
||||
c := &Conn{ln: ln, cipher: cipher}
|
||||
c.cond = sync.NewCond(&c.mu)
|
||||
return c
|
||||
}
|
||||
|
||||
// Push hands an encrypted wire payload (one OnData event) to the conn.
|
||||
func (c *Conn) Push(ciphertext []byte) {
|
||||
pt, err := c.cipher.Decrypt(ciphertext)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.closed {
|
||||
return
|
||||
}
|
||||
c.buf = append(c.buf, pt...)
|
||||
c.cond.Broadcast()
|
||||
}
|
||||
|
||||
// Read implements io.Reader. Blocks until at least one byte is available.
|
||||
func (c *Conn) Read(p []byte) (int, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for !c.closed && len(c.buf) == 0 {
|
||||
c.cond.Wait()
|
||||
}
|
||||
if len(c.buf) == 0 {
|
||||
return 0, io.EOF
|
||||
}
|
||||
n := copy(p, c.buf)
|
||||
c.buf = c.buf[n:]
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// Write encrypts p and ships it to the link as a single message. Blocks while
|
||||
// the link signals back-pressure.
|
||||
func (c *Conn) Write(p []byte) (int, error) {
|
||||
for {
|
||||
if c.isClosed() {
|
||||
return 0, ErrClosed
|
||||
}
|
||||
if c.ln.CanSend() {
|
||||
break
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
enc, err := c.cipher.Encrypt(p)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := c.ln.Send(enc); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Close unblocks any pending Read with io.EOF.
|
||||
func (c *Conn) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
c.closed = true
|
||||
c.cond.Broadcast()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) isClosed() bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.closed
|
||||
}
|
||||
Reference in New Issue
Block a user