Example: Custom Transport

Build a custom ChatTransport adapter and wire it into the SDK. Two examples: a fetch-based REST transport and a WebSocket transport with reconnection.

The ChatTransport interface

Every transport implements this interface (from gecx-chat):

interface ChatTransport {
  readonly name?: string;
  readonly capabilities?: TransportCapabilities;
  connect(sessionId: string): Promise<void>;
  send(request: SendRequest): Promise<void>;
  stream(request: SendRequest, signal?: AbortSignal): AsyncIterable<TransportEvent>;
  reconnect?(reason: string): Promise<void>;
  resumeStream?(cursor: TurnCursor, signal?: AbortSignal): AsyncIterable<TransportEvent>;
  close(): Promise<void>;
}

interface TransportCapabilities {
  class: 'request-response' | 'server-stream' | 'bidi';
  reconnect: boolean;
  resume: boolean;
  multiplex: boolean;
  protocolVersion: string;
}

The SDK reads capabilities.class at construction time and adapts its streaming loop:

  • request-response -- the SDK calls stream(), which yields a synthetic event sequence from a single JSON response.
  • server-stream -- stream() yields events from an SSE or NDJSON stream.
  • bidi -- stream() yields events from a long-lived duplex connection.

Example 1: Fetch-based REST transport

Posts user messages as JSON. Parses the JSON response into synthetic TransportEvents. Good for backends behind API Gateway with hard timeout limits.

// lib/fetchTransport.ts
import type {
  ChatTransport,
  SendRequest,
  TransportCapabilities,
  TransportEvent,
} from 'gecx-chat';

interface FetchTransportOptions {
  endpoint: string;
  headers?: Record<string, string>;
}

export function createFetchTransport(options: FetchTransportOptions): ChatTransport {
  const { endpoint, headers = {} } = options;

  const capabilities: TransportCapabilities = {
    class: 'request-response',
    reconnect: false,
    resume: false,
    multiplex: false,
    protocolVersion: '1',
  };

  return {
    name: 'custom-fetch',
    capabilities,

    async connect(): Promise<void> {
      // Nothing to open for a stateless HTTP transport.
    },

    async send(request: SendRequest): Promise<void> {
      await fetch(endpoint, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json', ...headers },
        body: JSON.stringify(request),
      });
    },

    async *stream(request: SendRequest, signal?: AbortSignal): AsyncIterable<TransportEvent> {
      const res = await fetch(endpoint, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json', ...headers },
        body: JSON.stringify(request),
        signal,
      });

      if (!res.ok) throw new Error(`Transport returned ${res.status}`);

      const body = await res.json();
      const responseId = body.responseId ?? crypto.randomUUID();
      const requestId = crypto.randomUUID();
      const ts = new Date().toISOString();
      let seq = 0;

      yield {
        type: 'response.started',
        responseId,
        requestId,
        timestamp: ts,
        sequence: seq++,
      } as TransportEvent;

      if (body.text) {
        yield {
          type: 'text.delta',
          delta: body.text,
          responseId,
          requestId,
          timestamp: new Date().toISOString(),
          sequence: seq++,
        } as TransportEvent;
      }

      yield {
        type: 'response.completed',
        responseId,
        requestId,
        timestamp: new Date().toISOString(),
        sequence: seq++,
      } as TransportEvent;
    },

    async close(): Promise<void> {},
  };
}

Use it

import { createChatClient, tokenEndpointAuth } from 'gecx-chat';
import { createFetchTransport } from './lib/fetchTransport';

const client = createChatClient({
  auth: tokenEndpointAuth({ endpoint: '/api/chat/token' }),
  transport: createFetchTransport({ endpoint: '/api/chat/completions' }),
});

Example 2: WebSocket transport with reconnection

Opens a persistent WebSocket. Sends JSON frames, receives TransportEvents. Declares reconnect: true so the SDK's recovery loop can call reconnect() after a disconnect.

// lib/wsTransport.ts
import type {
  ChatTransport,
  SendRequest,
  TransportCapabilities,
  TransportEvent,
} from 'gecx-chat';

interface WsTransportOptions {
  url: string;
}

export function createWsTransport(options: WsTransportOptions): ChatTransport {
  let ws: WebSocket | null = null;
  let sessionId: string | null = null;

  const capabilities: TransportCapabilities = {
    class: 'bidi',
    reconnect: true,
    resume: false,
    multiplex: false,
    protocolVersion: '1',
  };

  function open(): Promise<void> {
    return new Promise((resolve, reject) => {
      ws = new WebSocket(options.url);
      ws.addEventListener('open', () => {
        // Send a hello frame so the server knows which session this is
        ws!.send(JSON.stringify({ kind: 'hello', sessionId }));
        resolve();
      });
      ws.addEventListener('error', () => reject(new Error('WebSocket connect failed')));
    });
  }

  return {
    name: 'custom-ws',
    capabilities,

    async connect(id: string): Promise<void> {
      sessionId = id;
      await open();
    },

    async send(request: SendRequest): Promise<void> {
      if (!ws || ws.readyState !== WebSocket.OPEN) {
        throw new Error('WebSocket not connected');
      }
      ws.send(JSON.stringify({ kind: 'send', request }));
    },

    async *stream(request: SendRequest, signal?: AbortSignal): AsyncIterable<TransportEvent> {
      if (!ws || ws.readyState !== WebSocket.OPEN) {
        throw new Error('WebSocket not connected');
      }

      ws.send(JSON.stringify({ kind: 'send', request }));

      // Queue incoming events until the turn completes or aborts
      const queue: TransportEvent[] = [];
      let done = false;
      let resolve: (() => void) | null = null;

      function onMessage(ev: MessageEvent) {
        const frame = JSON.parse(ev.data as string);
        if (frame.kind === 'event') {
          queue.push(frame.event as TransportEvent);
          resolve?.();
        }
      }

      ws.addEventListener('message', onMessage);
      signal?.addEventListener('abort', () => { done = true; resolve?.(); });

      try {
        while (!done) {
          if (queue.length > 0) {
            const event = queue.shift()!;
            yield event;
            if (event.type === 'response.completed' || event.type === 'session.ended') {
              break;
            }
          } else {
            await new Promise<void>((r) => { resolve = r; });
            resolve = null;
          }
        }
      } finally {
        ws?.removeEventListener('message', onMessage);
      }
    },

    async reconnect(reason: string): Promise<void> {
      // Close the old socket and reopen
      if (ws) {
        try { ws.close(); } catch { /* ignore */ }
      }
      await open();
    },

    async close(): Promise<void> {
      if (ws) {
        ws.send(JSON.stringify({ kind: 'bye' }));
        ws.close();
        ws = null;
      }
    },
  };
}

Use it

import { createChatClient, tokenEndpointAuth } from 'gecx-chat';
import { createWsTransport } from './lib/wsTransport';

const client = createChatClient({
  auth: tokenEndpointAuth({ endpoint: '/api/chat/token' }),
  transport: createWsTransport({ url: 'wss://chat.example.com/ws' }),
});

Testing a custom transport

Use the SDK's mock scenario infrastructure to verify your transport emits a valid event stream without hitting a real server.

// __tests__/fetchTransport.test.ts
import { describe, it, expect, vi } from 'vitest';
import { createFetchTransport } from '../lib/fetchTransport';
import type { TransportEvent } from 'gecx-chat';

describe('createFetchTransport', () => {
  it('yields a valid event stream', async () => {
    // Stub fetch to return a canned response
    globalThis.fetch = vi.fn().mockResolvedValue({
      ok: true,
      json: async () => ({
        responseId: 'r-1',
        text: 'Hello from the backend',
      }),
    });

    const transport = createFetchTransport({ endpoint: '/api/chat' });
    await transport.connect('session-1');

    const events: TransportEvent[] = [];
    for await (const event of transport.stream({
      sessionId: 'session-1',
      text: 'Hi',
    })) {
      events.push(event);
    }

    expect(events[0].type).toBe('response.started');
    expect(events[1]).toMatchObject({ type: 'text.delta', delta: 'Hello from the backend' });
    expect(events[2].type).toBe('response.completed');
  });
});

Key design decisions

  • capabilities.class tells the SDK whether to expect a single response (request-response), a chunked stream (server-stream), or a persistent connection (bidi). Set this honestly -- it controls timeout and recovery behavior.
  • reconnect should be true only if your transport can reopen its underlying connection. The SDK's recovery loop calls reconnect(reason) before retrying a failed stream().
  • resume requires implementing resumeStream(cursor) and de-duplicating by cursor.lastSequence. Leave it false unless your server can replay from a checkpoint.
  • name shows up in debug bundles and trace output. Set it to something descriptive.

Validate against the contract harness

Before shipping, validate your transport against runTransportContractTests from gecx-chat/testing/vitest:

import { runTransportContractTests } from 'gecx-chat/testing/vitest';
import { describe } from 'vitest';
import { createMyTransport } from './myTransport';

describe('myTransport', () => {
  runTransportContractTests({
    name: 'myTransport',
    factory: () => createMyTransport({ /* ... */ }),
  });
});

The harness validates: connect() accepts a session id and resolves; capability class advertised matches actual behaviour; protocolVersion is set; stream() returns an AsyncIterable<TransportEvent> that respects AbortSignal; close() resolves; and (for tier-2 transports) conditional reconnect/resume behaviour matches the contract. The bundled mockTransport is itself validated by this harness, so the bar is "behave exactly like the mock for the parts of the contract you implement." See Testing.

Source: docs/examples/custom-transport.md