mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-02 06:23:37 +02:00
Merge branch 'refactor/make-as-lib' into refactor/universal-carrier
This commit is contained in:
+1
-1
@@ -246,6 +246,6 @@ go.work.sum
|
||||
build/
|
||||
GEMINI.md
|
||||
code/package-lock.json
|
||||
olcrtc
|
||||
!cmd/olcrtc/
|
||||
!cmd/olcrtc/main_test.go
|
||||
!pkg/
|
||||
|
||||
@@ -0,0 +1,51 @@
|
||||
package olcrtc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
// conn wraps a Session as a net.Conn.
|
||||
// Read is backed by an io.Pipe fed by the engine's OnData callback.
|
||||
// Write calls Session.Send.
|
||||
// Deadlines are not supported — callers should use context cancellation.
|
||||
type conn struct {
|
||||
s *Session
|
||||
}
|
||||
|
||||
func (c *conn) Read(b []byte) (int, error) {
|
||||
n, err := c.s.pr.Read(b)
|
||||
if err != nil {
|
||||
return n, fmt.Errorf("read: %w", err)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (c *conn) Write(b []byte) (int, error) {
|
||||
if err := c.s.inner.Send(b); err != nil {
|
||||
return 0, fmt.Errorf("write: %w", err)
|
||||
}
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
func (c *conn) Close() error {
|
||||
_ = c.s.pw.CloseWithError(net.ErrClosed)
|
||||
if err := c.s.inner.Close(); err != nil {
|
||||
return fmt.Errorf("close: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *conn) LocalAddr() net.Addr { return webrtcAddr("local") }
|
||||
func (c *conn) RemoteAddr() net.Addr { return webrtcAddr("remote") }
|
||||
|
||||
func (c *conn) SetDeadline(_ time.Time) error { return errors.ErrUnsupported }
|
||||
func (c *conn) SetReadDeadline(_ time.Time) error { return errors.ErrUnsupported }
|
||||
func (c *conn) SetWriteDeadline(_ time.Time) error { return errors.ErrUnsupported }
|
||||
|
||||
type webrtcAddr string
|
||||
|
||||
func (a webrtcAddr) Network() string { return "webrtc" }
|
||||
func (a webrtcAddr) String() string { return string(a) }
|
||||
@@ -0,0 +1,260 @@
|
||||
// Package olcrtc exposes olcrtc as an embeddable Go library.
|
||||
//
|
||||
// Typical usage — obtain a [net.Conn]-compatible handle and dial:
|
||||
//
|
||||
// sess, err := olcrtc.New(ctx, olcrtc.Config{
|
||||
// Engine: "livekit",
|
||||
// URL: "wss://sfu.example/",
|
||||
// Token: "<livekit-jwt>",
|
||||
// })
|
||||
// if err != nil { ... }
|
||||
// conn, err := sess.Dial(ctx) // blocks until WebRTC data channel is ready
|
||||
// // conn implements net.Conn — pass it to sing-box / any io.ReadWriter consumer
|
||||
//
|
||||
// Built-in auth providers (telemost, jazz, wbstream):
|
||||
//
|
||||
// sess, err := olcrtc.New(ctx, olcrtc.Config{
|
||||
// Auth: "telemost",
|
||||
// RoomID: "<room-hash>",
|
||||
// })
|
||||
//
|
||||
// Import the implementations you need via blank imports, or call [RegisterDefaults]:
|
||||
//
|
||||
// import (
|
||||
// _ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit"
|
||||
// _ "github.com/openlibrecommunity/olcrtc/internal/auth/telemost"
|
||||
// )
|
||||
package olcrtc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"github.com/openlibrecommunity/olcrtc/internal/auth"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/carrier/builtin"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/engine"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrURLRequired is returned when direct mode is used without a URL.
|
||||
ErrURLRequired = errors.New("olcrtc: URL required when using direct engine mode")
|
||||
// ErrTokenRequired is returned when direct mode is used without a token.
|
||||
ErrTokenRequired = errors.New("olcrtc: Token required when using direct engine mode")
|
||||
// ErrRoomCreationUnsupported is returned when the auth provider cannot create rooms.
|
||||
ErrRoomCreationUnsupported = errors.New("olcrtc: auth provider does not support room creation")
|
||||
// ErrSessionEnded is returned from Read/Write when the session has ended permanently.
|
||||
ErrSessionEnded = errors.New("olcrtc: session ended")
|
||||
)
|
||||
|
||||
// Config is the input to [New].
|
||||
type Config struct {
|
||||
// --- built-in auth mode ---
|
||||
// Auth is the name of a registered auth provider ("telemost", "jazz", "wbstream").
|
||||
// When set, RoomID is forwarded to the provider as the room reference.
|
||||
Auth string
|
||||
RoomID string
|
||||
|
||||
// --- direct engine mode (Auth == "") ---
|
||||
// Engine selects the SFU protocol ("livekit", "goolom", "salutejazz").
|
||||
// Defaults to "livekit" when Auth is empty.
|
||||
Engine string
|
||||
URL string
|
||||
Token string
|
||||
|
||||
// --- common ---
|
||||
// Name is the display name used when joining the room.
|
||||
Name string
|
||||
// DNSServer is an optional custom DNS resolver (e.g. "1.1.1.1:53").
|
||||
DNSServer string
|
||||
// ProxyAddr / ProxyPort configure an outbound SOCKS5 proxy.
|
||||
ProxyAddr string
|
||||
ProxyPort int
|
||||
}
|
||||
|
||||
// Session is the library handle returned by [New].
|
||||
// Call [Session.Dial] to connect and obtain a [net.Conn].
|
||||
type Session struct {
|
||||
inner engine.Session
|
||||
pr *io.PipeReader
|
||||
pw *io.PipeWriter
|
||||
authProvider auth.Provider
|
||||
authCfg auth.Config
|
||||
}
|
||||
|
||||
// RegisterDefaults registers all built-in engines and auth providers.
|
||||
// Call once at program start if you want the full set without manual blank
|
||||
// imports. Safe to call multiple times.
|
||||
func RegisterDefaults() {
|
||||
builtin.Register()
|
||||
}
|
||||
|
||||
// New creates a Session from cfg. The session is not connected yet; call
|
||||
// [Session.Connect] when ready.
|
||||
func New(ctx context.Context, cfg Config) (*Session, error) {
|
||||
if cfg.Auth != "" {
|
||||
return newWithAuth(ctx, cfg)
|
||||
}
|
||||
return newDirect(ctx, cfg)
|
||||
}
|
||||
|
||||
func newWithAuth(ctx context.Context, cfg Config) (*Session, error) {
|
||||
p, err := auth.Get(cfg.Auth)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("olcrtc: auth provider %q not registered: %w", cfg.Auth, err)
|
||||
}
|
||||
|
||||
authCfg := auth.Config{
|
||||
RoomURL: cfg.RoomID,
|
||||
Name: cfg.Name,
|
||||
DNSServer: cfg.DNSServer,
|
||||
ProxyAddr: cfg.ProxyAddr,
|
||||
ProxyPort: cfg.ProxyPort,
|
||||
}
|
||||
|
||||
creds, err := p.Issue(ctx, authCfg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("olcrtc: auth issue: %w", err)
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
engineName := p.Engine()
|
||||
sess, err := engine.New(ctx, engineName, engine.Config{
|
||||
URL: creds.URL,
|
||||
Token: creds.Token,
|
||||
Name: cfg.Name,
|
||||
Extra: creds.Extra,
|
||||
OnData: func(data []byte) { _, _ = pw.Write(data) },
|
||||
DNSServer: cfg.DNSServer,
|
||||
ProxyAddr: cfg.ProxyAddr,
|
||||
ProxyPort: cfg.ProxyPort,
|
||||
Refresh: func(rCtx context.Context) (engine.Credentials, error) {
|
||||
fresh, freshErr := p.Issue(rCtx, authCfg)
|
||||
if freshErr != nil {
|
||||
return engine.Credentials{}, fmt.Errorf("olcrtc: auth refresh: %w", freshErr)
|
||||
}
|
||||
return engine.Credentials{URL: fresh.URL, Token: fresh.Token, Extra: fresh.Extra}, nil
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
_ = pw.CloseWithError(err)
|
||||
return nil, fmt.Errorf("olcrtc: engine %q: %w", engineName, err)
|
||||
}
|
||||
|
||||
return &Session{inner: sess, pr: pr, pw: pw, authProvider: p, authCfg: authCfg}, nil
|
||||
}
|
||||
|
||||
func newDirect(ctx context.Context, cfg Config) (*Session, error) {
|
||||
if cfg.URL == "" {
|
||||
return nil, ErrURLRequired
|
||||
}
|
||||
if cfg.Token == "" {
|
||||
return nil, ErrTokenRequired
|
||||
}
|
||||
|
||||
engineName := cfg.Engine
|
||||
if engineName == "" {
|
||||
engineName = "livekit"
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
sess, err := engine.New(ctx, engineName, engine.Config{
|
||||
URL: cfg.URL,
|
||||
Token: cfg.Token,
|
||||
Name: cfg.Name,
|
||||
OnData: func(data []byte) { _, _ = pw.Write(data) },
|
||||
DNSServer: cfg.DNSServer,
|
||||
ProxyAddr: cfg.ProxyAddr,
|
||||
ProxyPort: cfg.ProxyPort,
|
||||
})
|
||||
if err != nil {
|
||||
_ = pw.CloseWithError(err)
|
||||
return nil, fmt.Errorf("olcrtc: engine %q: %w", engineName, err)
|
||||
}
|
||||
|
||||
return &Session{inner: sess, pr: pr, pw: pw}, nil
|
||||
}
|
||||
|
||||
// Dial connects and returns a [net.Conn] backed by the WebRTC data channel.
|
||||
// It combines [Session.Connect] + wrapping in a single call.
|
||||
// The connection watcher runs in the background for the lifetime of ctx;
|
||||
// when the session ends permanently, Read will return an error.
|
||||
func (s *Session) Dial(ctx context.Context) (net.Conn, error) {
|
||||
s.inner.SetEndedCallback(func(_ string) {
|
||||
_ = s.pw.CloseWithError(ErrSessionEnded)
|
||||
})
|
||||
if err := s.Connect(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go s.inner.WatchConnection(ctx)
|
||||
return &conn{s: s}, nil
|
||||
}
|
||||
|
||||
// Connect establishes the WebRTC connection. Blocks until the data channel (or
|
||||
// media) is ready, or ctx is cancelled.
|
||||
func (s *Session) Connect(ctx context.Context) error {
|
||||
if err := s.inner.Connect(ctx); err != nil {
|
||||
return fmt.Errorf("connect: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Send queues data for transmission over the data channel.
|
||||
func (s *Session) Send(data []byte) error {
|
||||
if err := s.inner.Send(data); err != nil {
|
||||
return fmt.Errorf("send: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close tears down the session and releases all resources.
|
||||
func (s *Session) Close() error {
|
||||
if err := s.inner.Close(); err != nil {
|
||||
return fmt.Errorf("close: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WatchConnection monitors the connection and handles reconnects. Run in a
|
||||
// goroutine alongside Connect.
|
||||
func (s *Session) WatchConnection(ctx context.Context) {
|
||||
s.inner.WatchConnection(ctx)
|
||||
}
|
||||
|
||||
// CanSend reports whether the session is ready to accept outgoing data.
|
||||
func (s *Session) CanSend() bool {
|
||||
return s.inner.CanSend()
|
||||
}
|
||||
|
||||
// SetEndedCallback registers a function called when the session ends
|
||||
// permanently (after reconnect exhaustion or explicit close).
|
||||
func (s *Session) SetEndedCallback(cb func(reason string)) {
|
||||
s.inner.SetEndedCallback(cb)
|
||||
}
|
||||
|
||||
// SetShouldReconnect controls whether automatic reconnection is attempted.
|
||||
func (s *Session) SetShouldReconnect(fn func() bool) {
|
||||
s.inner.SetShouldReconnect(fn)
|
||||
}
|
||||
|
||||
// CreateRoom creates a new room via the auth provider and returns the room ID.
|
||||
// Only works when the session was created with Auth set to a provider that
|
||||
// supports room creation (wbstream, jazz). Returns [ErrRoomCreationUnsupported]
|
||||
// for providers that don't support it (e.g. telemost).
|
||||
func CreateRoom(ctx context.Context, authName string) (string, error) {
|
||||
p, err := auth.Get(authName)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("olcrtc: auth provider %q not registered: %w", authName, err)
|
||||
}
|
||||
creator, ok := p.(auth.RoomCreator)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("%w: %s", ErrRoomCreationUnsupported, authName)
|
||||
}
|
||||
roomID, err := creator.CreateRoom(ctx, auth.Config{})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("olcrtc: create room: %w", err)
|
||||
}
|
||||
return roomID, nil
|
||||
}
|
||||
@@ -0,0 +1,288 @@
|
||||
package olcrtc_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/openlibrecommunity/olcrtc/internal/auth"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/engine"
|
||||
"github.com/openlibrecommunity/olcrtc/pkg/olcrtc"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
const (
|
||||
stubToken = "tok"
|
||||
stubURL = "wss://x/"
|
||||
)
|
||||
|
||||
// --- stub engine ---
|
||||
|
||||
type stubSession struct {
|
||||
connected bool
|
||||
onEnded func(string)
|
||||
watchBlock chan struct{} // closed to unblock WatchConnection
|
||||
}
|
||||
|
||||
func newStubSession() *stubSession { return &stubSession{watchBlock: make(chan struct{})} }
|
||||
|
||||
func (s *stubSession) Connect(_ context.Context) error { s.connected = true; return nil }
|
||||
func (s *stubSession) Send(_ []byte) error { return nil }
|
||||
func (s *stubSession) Close() error { return nil }
|
||||
func (s *stubSession) SetReconnectCallback(_ func(*webrtc.DataChannel)) {}
|
||||
func (s *stubSession) SetShouldReconnect(_ func() bool) {}
|
||||
func (s *stubSession) SetEndedCallback(cb func(string)) { s.onEnded = cb }
|
||||
func (s *stubSession) WatchConnection(_ context.Context) { <-s.watchBlock }
|
||||
func (s *stubSession) CanSend() bool { return s.connected }
|
||||
func (s *stubSession) GetSendQueue() chan []byte { return nil }
|
||||
func (s *stubSession) GetBufferedAmount() uint64 { return 0 }
|
||||
func (s *stubSession) Capabilities() engine.Capabilities { return engine.Capabilities{ByteStream: true} }
|
||||
|
||||
// Compile-time check: stubSession must satisfy engine.Session.
|
||||
var _ engine.Session = (*stubSession)(nil)
|
||||
|
||||
func registerStubEngine(t *testing.T, name string) {
|
||||
t.Helper()
|
||||
engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) {
|
||||
return newStubSession(), nil
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) {
|
||||
return newStubSession(), nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// registerStubEngineControlled registers an engine that returns a pre-built stub the test controls.
|
||||
func registerStubEngineControlled(t *testing.T, name string, stub *stubSession) {
|
||||
t.Helper()
|
||||
engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) {
|
||||
return stub, nil
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) {
|
||||
return newStubSession(), nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// --- stub auth ---
|
||||
|
||||
type stubAuth struct{ engineName string }
|
||||
|
||||
func (a stubAuth) Engine() string { return a.engineName }
|
||||
func (a stubAuth) Issue(_ context.Context, cfg auth.Config) (auth.Credentials, error) {
|
||||
if cfg.RoomURL == "" {
|
||||
return auth.Credentials{}, auth.ErrRoomIDRequired
|
||||
}
|
||||
return auth.Credentials{URL: "wss://stub/", Token: stubToken}, nil
|
||||
}
|
||||
|
||||
type stubAuthWithRoomCreator struct{ stubAuth }
|
||||
|
||||
func (stubAuthWithRoomCreator) CreateRoom(_ context.Context, _ auth.Config) (string, error) {
|
||||
return "created-room-id", nil
|
||||
}
|
||||
|
||||
func registerStubAuth(t *testing.T, name, engineName string) {
|
||||
t.Helper()
|
||||
auth.Register(name, stubAuth{engineName: engineName})
|
||||
}
|
||||
|
||||
func registerStubAuthWithCreator(t *testing.T, name, engineName string) {
|
||||
t.Helper()
|
||||
auth.Register(name, stubAuthWithRoomCreator{stubAuth{engineName: engineName}})
|
||||
}
|
||||
|
||||
// --- tests ---
|
||||
|
||||
func TestNewDirect_MissingURL(t *testing.T) {
|
||||
_, err := olcrtc.New(context.Background(), olcrtc.Config{Token: "tok"})
|
||||
if !errors.Is(err, olcrtc.ErrURLRequired) {
|
||||
t.Fatalf("New(no url) = %v, want ErrURLRequired", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewDirect_MissingToken(t *testing.T) {
|
||||
_, err := olcrtc.New(context.Background(), olcrtc.Config{URL: stubURL})
|
||||
if !errors.Is(err, olcrtc.ErrTokenRequired) {
|
||||
t.Fatalf("New(no token) = %v, want ErrTokenRequired", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewDirect_UnknownEngine(t *testing.T) {
|
||||
_, err := olcrtc.New(context.Background(), olcrtc.Config{
|
||||
Engine: "no-such-engine",
|
||||
URL: stubURL,
|
||||
Token: stubToken,
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("New(bad engine) error = nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewDirect_OK(t *testing.T) {
|
||||
registerStubEngine(t, "stub-direct")
|
||||
|
||||
sess, err := olcrtc.New(context.Background(), olcrtc.Config{
|
||||
Engine: "stub-direct",
|
||||
URL: stubURL,
|
||||
Token: stubToken,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
if err := sess.Connect(context.Background()); err != nil {
|
||||
t.Fatalf("Connect() error = %v", err)
|
||||
}
|
||||
if !sess.CanSend() {
|
||||
t.Fatal("CanSend() = false after connect")
|
||||
}
|
||||
if err := sess.Close(); err != nil {
|
||||
t.Fatalf("Close() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewAuth_UnknownProvider(t *testing.T) {
|
||||
_, err := olcrtc.New(context.Background(), olcrtc.Config{
|
||||
Auth: "no-such-auth",
|
||||
RoomID: "room",
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("New(bad auth) error = nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewAuth_MissingRoomID(t *testing.T) {
|
||||
registerStubEngine(t, "stub-auth-engine")
|
||||
registerStubAuth(t, "stub-auth-noroomid", "stub-auth-engine")
|
||||
|
||||
_, err := olcrtc.New(context.Background(), olcrtc.Config{
|
||||
Auth: "stub-auth-noroomid",
|
||||
// RoomID intentionally empty
|
||||
})
|
||||
if err == nil {
|
||||
t.Fatal("New(auth, no room) error = nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewAuth_OK(t *testing.T) {
|
||||
registerStubEngine(t, "stub-auth-ok-engine")
|
||||
registerStubAuth(t, "stub-auth-ok", "stub-auth-ok-engine")
|
||||
|
||||
sess, err := olcrtc.New(context.Background(), olcrtc.Config{
|
||||
Auth: "stub-auth-ok",
|
||||
RoomID: "some-room",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New(auth) error = %v", err)
|
||||
}
|
||||
if err := sess.Connect(context.Background()); err != nil {
|
||||
t.Fatalf("Connect() error = %v", err)
|
||||
}
|
||||
_ = sess.Close()
|
||||
}
|
||||
|
||||
func TestRegisterDefaults_Idempotent(_ *testing.T) {
|
||||
olcrtc.RegisterDefaults()
|
||||
olcrtc.RegisterDefaults()
|
||||
}
|
||||
|
||||
func TestCreateRoom_Unsupported(t *testing.T) {
|
||||
registerStubAuth(t, "stub-nocreate", "stub-direct")
|
||||
|
||||
_, err := olcrtc.CreateRoom(context.Background(), "stub-nocreate")
|
||||
if !errors.Is(err, olcrtc.ErrRoomCreationUnsupported) {
|
||||
t.Fatalf("CreateRoom(no creator) = %v, want ErrRoomCreationUnsupported", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateRoom_OK(t *testing.T) {
|
||||
registerStubEngine(t, "stub-creator-engine")
|
||||
registerStubAuthWithCreator(t, "stub-creator", "stub-creator-engine")
|
||||
|
||||
roomID, err := olcrtc.CreateRoom(context.Background(), "stub-creator")
|
||||
if err != nil {
|
||||
t.Fatalf("CreateRoom() error = %v", err)
|
||||
}
|
||||
if roomID == "" {
|
||||
t.Fatal("CreateRoom() returned empty room ID")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDial_ReadUnblocksOnSessionEnd(t *testing.T) {
|
||||
stub := newStubSession()
|
||||
registerStubEngineControlled(t, "stub-ended", stub)
|
||||
|
||||
sess, err := olcrtc.New(context.Background(), olcrtc.Config{
|
||||
Engine: "stub-ended",
|
||||
URL: stubURL,
|
||||
Token: stubToken,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
|
||||
c, err := sess.Dial(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("Dial() error = %v", err)
|
||||
}
|
||||
|
||||
readErr := make(chan error, 1)
|
||||
go func() {
|
||||
buf := make([]byte, 4)
|
||||
_, err := c.Read(buf)
|
||||
readErr <- err
|
||||
}()
|
||||
|
||||
// Simulate session ending permanently.
|
||||
stub.onEnded("test reason")
|
||||
close(stub.watchBlock)
|
||||
|
||||
select {
|
||||
case err := <-readErr:
|
||||
if err == nil {
|
||||
t.Fatal("Read() should return error after session ended")
|
||||
}
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Read() did not unblock after session ended")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDial_RoundTrip(t *testing.T) {
|
||||
registerStubEngine(t, "stub-dial")
|
||||
|
||||
sess, err := olcrtc.New(context.Background(), olcrtc.Config{
|
||||
Engine: "stub-dial",
|
||||
URL: stubURL,
|
||||
Token: stubToken,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
|
||||
c, err := sess.Dial(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("Dial() error = %v", err)
|
||||
}
|
||||
|
||||
// Write should succeed (stub Send is a no-op).
|
||||
payload := []byte("hello")
|
||||
n, err := c.Write(payload)
|
||||
if err != nil || n != len(payload) {
|
||||
t.Fatalf("Write() = (%d, %v)", n, err)
|
||||
}
|
||||
|
||||
// Close should unblock any pending Read.
|
||||
if err := c.Close(); err != nil {
|
||||
t.Fatalf("Close() error = %v", err)
|
||||
}
|
||||
|
||||
// Read after close should return an error (pipe closed).
|
||||
buf := make([]byte, 4)
|
||||
_, err = c.Read(buf)
|
||||
if err == nil {
|
||||
t.Fatal("Read() after Close() should return error")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user