mirror of
https://github.com/anomalyco/opencode.git
synced 2026-06-02 06:16:48 +02:00
zen: batch balance calculation
This commit is contained in:
@@ -47,6 +47,7 @@ import { i18n, type Key } from "~/i18n"
|
||||
import { localeFromRequest } from "~/lib/language"
|
||||
import { createModelTpmLimiter } from "./modelTpmLimiter"
|
||||
import { createModelTpsLimiter } from "./modelTpsLimiter"
|
||||
import { accumulateUsage, HOT_WORKSPACES } from "./usageBatcher"
|
||||
|
||||
type ZenData = Awaited<ReturnType<typeof ZenData.list>>
|
||||
type RetryOptions = {
|
||||
@@ -981,6 +982,19 @@ export async function handler(
|
||||
authInfo = authInfo!
|
||||
|
||||
const cost = centsToMicroCents(totalCostInCent)
|
||||
|
||||
// For hot workspaces, batch balance/usage updates through Redis to avoid
|
||||
// row-level lock contention on BillingTable/UserTable. Returns the amount
|
||||
// to flush this request, or null to skip the DB writes entirely.
|
||||
const balanceFlush = await (async () => {
|
||||
if (billingSource !== "subscription" && billingSource !== "lite" && HOT_WORKSPACES.has(authInfo.workspaceID)) {
|
||||
const workspaceCost = billingSource === "free" || billingSource === "byok" ? 0 : cost
|
||||
const flush = await accumulateUsage(authInfo.workspaceID, authInfo.user.id, workspaceCost, cost)
|
||||
return { batched: true as const, flush }
|
||||
}
|
||||
return { batched: false as const, flush: null }
|
||||
})()
|
||||
|
||||
await Database.use((db) =>
|
||||
Promise.all([
|
||||
db.insert(UsageTable).values({
|
||||
@@ -1082,18 +1096,22 @@ export async function handler(
|
||||
]
|
||||
}
|
||||
|
||||
// Batched hot workspace: skip DB writes unless this request is the flush.
|
||||
if (balanceFlush.batched && !balanceFlush.flush) return []
|
||||
|
||||
const workspaceDelta = balanceFlush.flush?.workspaceCost ?? cost
|
||||
const userDelta = balanceFlush.flush?.userCost ?? cost
|
||||
const balanceDelta = billingSource === "free" || billingSource === "byok" ? 0 : workspaceDelta
|
||||
|
||||
return [
|
||||
db
|
||||
.update(BillingTable)
|
||||
.set({
|
||||
balance:
|
||||
billingSource === "free" || billingSource === "byok"
|
||||
? sql`${BillingTable.balance} - ${0}`
|
||||
: sql`${BillingTable.balance} - ${cost}`,
|
||||
balance: sql`${BillingTable.balance} - ${balanceDelta}`,
|
||||
monthlyUsage: sql`
|
||||
CASE
|
||||
WHEN MONTH(${BillingTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${BillingTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${BillingTable.monthlyUsage} + ${cost}
|
||||
ELSE ${cost}
|
||||
WHEN MONTH(${BillingTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${BillingTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${BillingTable.monthlyUsage} + ${workspaceDelta}
|
||||
ELSE ${workspaceDelta}
|
||||
END
|
||||
`,
|
||||
timeMonthlyUsageUpdated: sql`now()`,
|
||||
@@ -1104,8 +1122,8 @@ export async function handler(
|
||||
.set({
|
||||
monthlyUsage: sql`
|
||||
CASE
|
||||
WHEN MONTH(${UserTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${UserTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${UserTable.monthlyUsage} + ${cost}
|
||||
ELSE ${cost}
|
||||
WHEN MONTH(${UserTable.timeMonthlyUsageUpdated}) = MONTH(now()) AND YEAR(${UserTable.timeMonthlyUsageUpdated}) = YEAR(now()) THEN ${UserTable.monthlyUsage} + ${userDelta}
|
||||
ELSE ${userDelta}
|
||||
END
|
||||
`,
|
||||
timeMonthlyUsageUpdated: sql`now()`,
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
import { Resource } from "@opencode-ai/console-resource"
|
||||
import { getRedis } from "./redis"
|
||||
|
||||
// Workspaces whose balance/usage updates should be batched in Redis to avoid
|
||||
// row-level lock contention on BillingTable / UserTable.
|
||||
export const HOT_WORKSPACES = new Set<string>([
|
||||
"wrk_01KJ8PX5CH50Y4YNGNS9ZR8YDC", // invoice
|
||||
])
|
||||
|
||||
// Probability that a given request flushes the accumulated totals to the DB.
|
||||
// Lower = fewer DB writes, more staleness. ~1 in 100 -> ~1% of requests write.
|
||||
const FLUSH_PROBABILITY = 1 / 100
|
||||
|
||||
export async function accumulateUsage(workspaceID: string, userID: string, workspaceCost: number, userCost: number) {
|
||||
const redis = getRedis()
|
||||
const wKey = `${Resource.App.stage}:usage:wrk:${workspaceID}`
|
||||
const uKey = `${Resource.App.stage}:usage:usr:${workspaceID}:${userID}`
|
||||
|
||||
await Promise.all([redis.incrby(wKey, workspaceCost), redis.incrby(uKey, userCost)])
|
||||
|
||||
if (Math.random() > FLUSH_PROBABILITY) return null
|
||||
|
||||
// Atomically take the current totals and reset to 0
|
||||
const [workspaceTotal, userTotal] = await Promise.all([redis.getdel<number>(wKey), redis.getdel<number>(uKey)])
|
||||
|
||||
const workspaceFlush = Number(workspaceTotal ?? 0)
|
||||
const userFlush = Number(userTotal ?? 0)
|
||||
if (workspaceFlush === 0 && userFlush === 0) return null
|
||||
|
||||
return { workspaceCost: workspaceFlush, userCost: userFlush }
|
||||
}
|
||||
Reference in New Issue
Block a user