Serialize concurrent token refreshes

This commit is contained in:
lemon07r
2026-04-18 23:31:18 -04:00
parent 0d22121a01
commit 038e9d67ca
5 changed files with 105 additions and 13 deletions
+2 -1
View File
@@ -43,7 +43,7 @@ Data flow on a chat request:
1. opencode asks the `@ai-sdk/openai-compatible` provider for a language model.
2. Before instantiating it, opencode calls our `auth.loader`. We return `{ apiKey, fetch }`.
3. The SDK uses our `fetch` for every HTTP call (models, chat, whatever).
4. Our `fetch` calls `ensureFresh()` → maybe refreshes → lazily discovers `/coding/v1/models` when needed → sets Authorization + the seven `X-Msh-*` headers → on 401 refreshes once and retries.
4. Our `fetch` calls `ensureFresh()` → maybe refreshes (sharing one in-flight refresh across concurrent callers so they don't race the same refresh token) → lazily discovers `/coding/v1/models` when needed → sets Authorization + the seven `X-Msh-*` headers → on 401 refreshes once and retries.
5. Separately, opencode runs `chat.headers` and `chat.params`. `chat.headers` computes `thinking`, `reasoning_effort`, and `prompt_cache_key` from `input.model.options` plus the selected `input.message.model.variant`, then passes them to `loader.fetch` via private `x-opencode-kimi-*` headers. `loader.fetch` strips those headers and injects the wire fields into the JSON body. `chat.params` mirrors the same keys into `output.options` only as a forward-compat fallback if opencode later fixes its openai-compatible providerOptions namespace mismatch.
### Contracts to keep intact
@@ -71,6 +71,7 @@ These are the invariants that, if broken, silently degrade K2.6 → K2.5 or prod
8. **Provider id must not collide with any id in the [models.dev](https://models.dev) catalog.** models.dev publishes `kimi-for-coding` (static `KIMI_API_KEY``@ai-sdk/anthropic` → K2.5). If we registered under that same id, `opencode auth login kimi-for-coding` would surface two methods under one entry and users picking the API-key one would silently land on K2.5. We deliberately use `kimi-for-coding-oauth` instead; `MODEL_ID` on the wire stays `kimi-for-coding` (rule 6).
9. **`src/index.ts` must have exactly one export — the default plugin function.** opencode's plugin loader (`research/opencode/packages/opencode/src/plugin/index.ts``getLegacyPlugins`) iterates every export of the plugin module and throws `Plugin export is not a function` if any named export is not callable. The failure mode is silent in the CLI (the provider just doesn't appear in `opencode auth login`); the error only surfaces in `~/.local/share/opencode/log/*.log`. Keep constants in `src/constants.ts` and import them in `src/index.ts` rather than re-exporting. `test/exports.test.ts` guards this.
10. **The post-login config hint must not emit a partial `limit` object.** opencode's live config schema at `https://opencode.ai/config.json` requires both `limit.context` and `limit.output` whenever `limit` is present, while Kimi's `GET /coding/v1/models` only gives us `context_length`. Therefore `buildConfigBlock()` omits `limit` entirely and leaves `provider.models` to backfill `limit.context` at runtime. Do not invent `output` or set `input` heuristically; opencode's overflow logic treats `limit.input` as authoritative (`research/opencode/packages/opencode/src/session/overflow.ts`).
11. **Concurrent refreshes must collapse to one in-flight OAuth exchange.** `provider.models` and `auth.loader` can both notice an expiring token at about the same time. `refreshAuth()` in `src/index.ts` therefore shares one promise across overlapping callers so they do not race the same refresh token or double-write auth state. `test/plugin.test.ts` covers both loader-vs-loader and provider.models-vs-loader overlap.
### Working on this repo
+1 -1
View File
@@ -1,6 +1,6 @@
{
"name": "opencode-kimi-full",
"version": "1.2.3",
"version": "1.2.4",
"description": "OpenCode plugin that adds first-class support for Kimi K2.6 (kimi-for-coding) via the official Kimi OAuth device flow, matching the upstream kimi-cli 1:1.",
"license": "MIT",
"repository": {
+21 -9
View File
@@ -243,6 +243,7 @@ const plugin: Plugin = async ({ client }) => {
// --- helpers ---------------------------------------------------------------
let cachedDiscovery: ModelDiscovery = {}
let refreshPromise: Promise<OAuthAuth> | undefined
const persistAuth = async (auth: OAuthAuth) => {
await client.auth.set({ path: { id: PROVIDER_ID }, body: auth })
@@ -256,15 +257,26 @@ const plugin: Plugin = async ({ client }) => {
}
const refreshAuth = async (auth: OAuthAuth) => {
const tokens = await refreshToken(auth.refresh)
const next: OAuthAuth = {
type: "oauth",
refresh: tokens.refresh_token,
access: tokens.access_token,
expires: Date.now() + tokens.expires_in * 1000,
}
await persistAuth(next)
return next
// opencode can ask both `provider.models` and `loader.fetch` to refresh
// around the same time. Serialize those calls so one refresh token does
// not fan out into multiple concurrent refresh exchanges.
if (refreshPromise) return refreshPromise
refreshPromise = (async () => {
try {
const tokens = await refreshToken(auth.refresh)
const next: OAuthAuth = {
type: "oauth",
refresh: tokens.refresh_token,
access: tokens.access_token,
expires: Date.now() + tokens.expires_in * 1000,
}
await persistAuth(next)
return next
} finally {
refreshPromise = undefined
}
})()
return refreshPromise
}
// --- return hooks ----------------------------------------------------------
+4 -2
View File
@@ -9,10 +9,12 @@ export type FetchCall = {
body: string | undefined
}
type MaybePromise<T> = T | Promise<T>
export type Responder = (
call: FetchCall,
callIndex: number,
) => { status?: number; body?: unknown; bodyText?: string }
) => MaybePromise<{ status?: number; body?: unknown; bodyText?: string }>
export function installFetchMock(responder: Responder) {
const calls: FetchCall[] = []
@@ -41,7 +43,7 @@ export function installFetchMock(responder: Responder) {
: String(init.body)
const call: FetchCall = { url, method: (init?.method ?? request?.method ?? "GET").toUpperCase(), headers, body }
calls.push(call)
const r = responder(call, calls.length - 1)
const r = await responder(call, calls.length - 1)
const status = r.status ?? 200
const text = r.bodyText ?? (r.body === undefined ? "" : JSON.stringify(r.body))
return new Response(text, {
+77
View File
@@ -239,6 +239,16 @@ async function getLoaderFetch(readAuth: () => Promise<unknown>) {
return { fetch: (res as { fetch: typeof fetch }).fetch, apiKey: (res as { apiKey: string }).apiKey, writes }
}
function deferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void
let reject!: (reason?: unknown) => void
const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})
return { promise, resolve, reject }
}
function makeProviderState(context = 0) {
return {
id: PROVIDER_ID,
@@ -484,6 +494,73 @@ test("auth.loader: refreshes when expiry is within safety window", async () => {
expect(reads).toBeGreaterThan(0)
})
test("auth.loader: concurrent expiring requests share one refresh exchange", async () => {
const gate = deferred<void>()
mock = installFetchMock(async (call) => {
if (call.url.includes("/oauth/token")) {
await gate.promise
return { body: { access_token: "access-2", refresh_token: "refresh-2", token_type: "Bearer", expires_in: 900 } }
}
if (call.url.endsWith("/coding/v1/models")) {
return { body: { data: [{ id: MODEL_ID, context_length: 262144 }] } }
}
return { body: { ok: true } }
})
const expiring = validAuth({ access: "stale", expires: Date.now() + REFRESH_SAFETY_WINDOW_MS / 2 })
const { fetch: f, writes } = await getLoaderFetch(async () => expiring)
const request = {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ model: MODEL_ID, messages: [] }),
}
const p1 = f("https://api.kimi.com/coding/v1/chat/completions", request)
const p2 = f("https://api.kimi.com/coding/v1/chat/completions", request)
await new Promise((r) => setTimeout(r, 0))
gate.resolve()
await Promise.all([p1, p2])
expect(mock.calls.filter((c) => c.url.includes("/oauth/token"))).toHaveLength(1)
expect(mock.calls.filter((c) => c.url.endsWith("/coding/v1/chat/completions"))).toHaveLength(2)
expect(mock.calls.filter((c) => c.url.endsWith("/coding/v1/chat/completions")).map((c) => c.headers["authorization"])).toEqual([
"Bearer access-2",
"Bearer access-2",
])
expect(writes).toHaveLength(1)
})
test("provider.models and auth.loader share one in-flight refresh exchange", async () => {
const gate = deferred<void>()
mock = installFetchMock(async (call) => {
if (call.url.includes("/oauth/token")) {
await gate.promise
return { body: { access_token: "fresh", refresh_token: "refresh-2", token_type: "Bearer", expires_in: 900 } }
}
if (call.url.endsWith("/coding/v1/models")) {
return { body: { data: [{ id: MODEL_ID, context_length: 131072 }] } }
}
return { body: { ok: true } }
})
const { hooks, writes } = await getHooks()
const expiring = validAuth({ access: "stale", expires: Date.now() + REFRESH_SAFETY_WINDOW_MS / 2 })
const provider = makeProviderState()
const loader = (await hooks.auth!.loader!(async () => expiring, {} as any)) as { fetch: typeof fetch }
const modelsPromise = hooks.provider!.models!(provider as any, { auth: expiring } as any)
const fetchPromise = loader.fetch("https://api.kimi.com/coding/v1/chat/completions", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify({ model: MODEL_ID, messages: [] }),
})
await new Promise((r) => setTimeout(r, 0))
gate.resolve()
const [models] = await Promise.all([modelsPromise, fetchPromise])
expect(mock.calls.filter((c) => c.url.includes("/oauth/token"))).toHaveLength(1)
expect((models as Record<string, { limit?: { context?: number } }>)[MODEL_ID]!.limit?.context).toBe(131072)
expect(writes).toHaveLength(1)
})
test("auth.loader: prefers the canonical MODEL_ID slug when /models returns multiple", async () => {
// Server returns several entries; the canonical `kimi-for-coding` is not first.
// Selection must still prefer it over the first element.