From ac8e686f33aa5bdf86f722310ebba148bdc85892 Mon Sep 17 00:00:00 2001 From: Frank Date: Sat, 30 May 2026 10:53:23 -0400 Subject: [PATCH] zen: batch balance calculation --- .../app/src/routes/zen/util/handler.ts | 34 ++++++++++++++----- .../app/src/routes/zen/util/usageBatcher.ts | 31 +++++++++++++++++ 2 files changed, 57 insertions(+), 8 deletions(-) create mode 100644 packages/console/app/src/routes/zen/util/usageBatcher.ts diff --git a/packages/console/app/src/routes/zen/util/handler.ts b/packages/console/app/src/routes/zen/util/handler.ts index ed76c16c1a..4438688c22 100644 --- a/packages/console/app/src/routes/zen/util/handler.ts +++ b/packages/console/app/src/routes/zen/util/handler.ts @@ -47,6 +47,7 @@ import { i18n, type Key } from "~/i18n" import { localeFromRequest } from "~/lib/language" import { createModelTpmLimiter } from "./modelTpmLimiter" import { createModelTpsLimiter } from "./modelTpsLimiter" +import { accumulateUsage, HOT_WORKSPACES } from "./usageBatcher" type ZenData = Awaited> type RetryOptions = { @@ -981,6 +982,19 @@ export async function handler( authInfo = authInfo! const cost = centsToMicroCents(totalCostInCent) + + // For hot workspaces, batch balance/usage updates through Redis to avoid + // row-level lock contention on BillingTable/UserTable. Returns the amount + // to flush this request, or null to skip the DB writes entirely. + const balanceFlush = await (async () => { + if (billingSource !== "subscription" && billingSource !== "lite" && HOT_WORKSPACES.has(authInfo.workspaceID)) { + const workspaceCost = billingSource === "free" || billingSource === "byok" ? 0 : cost + const flush = await accumulateUsage(authInfo.workspaceID, authInfo.user.id, workspaceCost, cost) + return { batched: true as const, flush } + } + return { batched: false as const, flush: null } + })() + await Database.use((db) => Promise.all([ db.insert(UsageTable).values({ @@ -1082,18 +1096,22 @@ export async function handler( ] } + // Batched hot workspace: skip DB writes unless this request is the flush. + if (balanceFlush.batched && !balanceFlush.flush) return [] + + const workspaceDelta = balanceFlush.flush?.workspaceCost ?? cost + const userDelta = balanceFlush.flush?.userCost ?? cost + const balanceDelta = billingSource === "free" || billingSource === "byok" ? 0 : workspaceDelta + return [ db .update(BillingTable) .set({ - balance: - billingSource === "free" || billingSource === "byok" - ? sql`${BillingTable.balance} - ${0}` - : sql`${BillingTable.balance} - ${cost}`, + balance: sql`${BillingTable.balance} - ${balanceDelta}`, monthlyUsage: sql` CASE - WHEN MONTH(${BillingTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${BillingTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${BillingTable.monthlyUsage} + ${cost} - ELSE ${cost} + WHEN MONTH(${BillingTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${BillingTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${BillingTable.monthlyUsage} + ${workspaceDelta} + ELSE ${workspaceDelta} END `, timeMonthlyUsageUpdated: sql`now()`, @@ -1104,8 +1122,8 @@ export async function handler( .set({ monthlyUsage: sql` CASE - WHEN MONTH(${UserTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${UserTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${UserTable.monthlyUsage} + ${cost} - ELSE ${cost} + WHEN MONTH(${UserTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${UserTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${UserTable.monthlyUsage} + ${userDelta} + ELSE ${userDelta} END `, timeMonthlyUsageUpdated: sql`now()`, diff --git a/packages/console/app/src/routes/zen/util/usageBatcher.ts b/packages/console/app/src/routes/zen/util/usageBatcher.ts new file mode 100644 index 0000000000..315ce0a2eb --- /dev/null +++ b/packages/console/app/src/routes/zen/util/usageBatcher.ts @@ -0,0 +1,31 @@ +import { Resource } from "@opencode-ai/console-resource" +import { getRedis } from "./redis" + +// Workspaces whose balance/usage updates should be batched in Redis to avoid +// row-level lock contention on BillingTable / UserTable. +export const HOT_WORKSPACES = new Set([ + "wrk_01KJ8PX5CH50Y4YNGNS9ZR8YDC", // invoice +]) + +// Probability that a given request flushes the accumulated totals to the DB. +// Lower = fewer DB writes, more staleness. ~1 in 100 -> ~1% of requests write. +const FLUSH_PROBABILITY = 1 / 100 + +export async function accumulateUsage(workspaceID: string, userID: string, workspaceCost: number, userCost: number) { + const redis = getRedis() + const wKey = `${Resource.App.stage}:usage:wrk:${workspaceID}` + const uKey = `${Resource.App.stage}:usage:usr:${workspaceID}:${userID}` + + await Promise.all([redis.incrby(wKey, workspaceCost), redis.incrby(uKey, userCost)]) + + if (Math.random() > FLUSH_PROBABILITY) return null + + // Atomically take the current totals and reset to 0 + const [workspaceTotal, userTotal] = await Promise.all([redis.getdel(wKey), redis.getdel(uKey)]) + + const workspaceFlush = Number(workspaceTotal ?? 0) + const userFlush = Number(userTotal ?? 0) + if (workspaceFlush === 0 && userFlush === 0) return null + + return { workspaceCost: workspaceFlush, userCost: userFlush } +}