feat: add peer-addressed routing across transport and engine layers

This commit is contained in:
zarazaex69
2026-05-18 02:38:45 +03:00
parent 95b73750c9
commit 143f6dd8a6
11 changed files with 591 additions and 122 deletions
+16 -2
View File
@@ -34,6 +34,7 @@ var ErrClosed = errors.New("muxconn: closed")
// Conn is an io.ReadWriteCloser over a [transport.Transport] with optional AEAD wrapping.
type Conn struct {
ln transport.Transport
send func([]byte) error
cipher *crypto.Cipher
mu sync.Mutex
@@ -45,7 +46,20 @@ type Conn struct {
// New wires a Conn over the given transport. Push must be set as the
// transport's OnData callback before this conn is used.
func New(ln transport.Transport, cipher *crypto.Cipher) *Conn {
c := &Conn{ln: ln, cipher: cipher}
c := &Conn{ln: ln, send: ln.Send, cipher: cipher}
c.cond = sync.NewCond(&c.mu)
return c
}
// NewPeer wires a Conn whose writes are addressed to a specific transport peer.
func NewPeer(ln transport.PeerTransport, cipher *crypto.Cipher, peerID string) *Conn {
c := &Conn{
ln: ln,
send: func(data []byte) error {
return ln.SendTo(peerID, data)
},
cipher: cipher,
}
c.cond = sync.NewCond(&c.mu)
return c
}
@@ -123,7 +137,7 @@ func (c *Conn) Write(p []byte) (int, error) {
if err != nil {
return 0, fmt.Errorf("encrypt: %w", err)
}
if err := c.ln.Send(enc); err != nil {
if err := c.send(enc); err != nil {
return 0, fmt.Errorf("send: %w", err)
}
return len(p), nil
+46 -7
View File
@@ -20,16 +20,17 @@ type stubLink struct {
canSend bool
sendErr error
sent [][]byte
peerSent map[string][][]byte
canSendFn func() bool
}
func (s *stubLink) Connect(context.Context) error { return nil }
func (s *stubLink) Close() error { return nil }
func (s *stubLink) SetReconnectCallback(func()) {}
func (s *stubLink) SetShouldReconnect(func() bool) {}
func (s *stubLink) SetEndedCallback(func(string)) {}
func (s *stubLink) WatchConnection(context.Context) {}
func (s *stubLink) Features() transport.Features { return transport.Features{} }
func (s *stubLink) Connect(context.Context) error { return nil }
func (s *stubLink) Close() error { return nil }
func (s *stubLink) SetReconnectCallback(func()) {}
func (s *stubLink) SetShouldReconnect(func() bool) {}
func (s *stubLink) SetEndedCallback(func(string)) {}
func (s *stubLink) WatchConnection(context.Context) {}
func (s *stubLink) Features() transport.Features { return transport.Features{} }
func (s *stubLink) Send(data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
@@ -44,6 +45,16 @@ func (s *stubLink) CanSend() bool {
defer s.mu.Unlock()
return s.canSend
}
func (s *stubLink) SendTo(peerID string, data []byte) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.peerSent == nil {
s.peerSent = make(map[string][][]byte)
}
s.peerSent[peerID] = append(s.peerSent[peerID], append([]byte(nil), data...))
return s.sendErr
}
func (s *stubLink) SupportsPeerRouting() bool { return true }
func newTestCipher(t *testing.T) *cryptopkg.Cipher {
t.Helper()
@@ -121,6 +132,34 @@ func TestWriteEncryptsAndSends(t *testing.T) {
}
}
func TestPeerWriteEncryptsAndSendsToPeer(t *testing.T) {
cipher := newTestCipher(t)
ln := &stubLink{canSend: true}
conn := NewPeer(ln, cipher, "peer-a")
n, err := conn.Write([]byte("payload"))
if err != nil {
t.Fatalf("Write() error = %v", err)
}
if n != len("payload") {
t.Fatalf("Write() n = %d, want %d", n, len("payload"))
}
if len(ln.sent) != 0 {
t.Fatalf("broadcast sent packets = %d, want 0", len(ln.sent))
}
if len(ln.peerSent["peer-a"]) != 1 {
t.Fatalf("peer sent packets = %d, want 1", len(ln.peerSent["peer-a"]))
}
got, err := cipher.Decrypt(ln.peerSent["peer-a"][0])
if err != nil {
t.Fatalf("Decrypt(peer sent) error = %v", err)
}
if !bytes.Equal(got, []byte("payload")) {
t.Fatalf("decrypted payload = %q, want %q", got, "payload")
}
}
func TestWriteWaitsForCanSend(t *testing.T) {
cipher := newTestCipher(t)
start := time.Now()