From b4ca2de2ad9e36695bdfd319e0d40bb42ef8acd6 Mon Sep 17 00:00:00 2001 From: Shawn Tice Date: Mon, 22 Jun 2026 17:37:18 -0500 Subject: [PATCH] Pace bulk refresh against the GitHub API quota MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The refresh-* backfills and the startup open-pulls refresh share one GitHub token — and one hourly quota — with the live webhook/socket refreshes that keep Pulldasher current. A bulk run is a tight loop, so it drains the quota to zero and starves normal operation, taking Pulldasher down. The throttle plugin only reacts once the quota is already exhausted, which is too late to protect live traffic. Add a proactive pacer (lib/pacer.js). It observes x-ratelimit-* on every response (live calls included, so its view stays fresh) and gates only the bulk path: requests are spread across the rate-limit reset window, and bulk pauses entirely below a configurable reserve floor (config.github.bulkReserve, default 1000). The token-bucket cursor advances by the x-ratelimit-used delta between gates, so a pull's multi-call fan-out — and any live-traffic spike — pushes bulk's next slot out; bulk yields automatically on both signals. Gate at enqueue (pushAllOnQueue) and between fetch pages (pacedPaginate), never inside the shared queue consumer. The wait happens off the queue, so a paced or floor-paused backfill can't park the single consumer that webhook refreshes also depend on — live traffic keeps draining throughout. Co-Authored-By: Claude Opus 4.8 (1M context) --- config.example.js | 5 ++ lib/git-manager.js | 36 ++++++++++-- lib/pacer.js | 138 +++++++++++++++++++++++++++++++++++++++++++++ lib/refresh.js | 23 ++++++-- test/pacer.test.js | 94 ++++++++++++++++++++++++++++++ 5 files changed, 284 insertions(+), 12 deletions(-) create mode 100644 lib/pacer.js create mode 100644 test/pacer.test.js diff --git a/config.example.js b/config.example.js index 97789c12..de99381a 100644 --- a/config.example.js +++ b/config.example.js @@ -50,6 +50,11 @@ module.exports = { callbackURL: "http://localhost:" + port + "/auth/github/callback", // An API token for the backend to make API requests with. token: "oauth api token for server-side api calls", + // How much of the token's hourly GitHub API quota to reserve for live + // webhook/socket traffic. Bulk backfills (bin/refresh-*) pace themselves to + // stay above this floor and pause when remaining quota hits it, so a large + // backfill never starves normal operation. Defaults to 1000 if omitted. + bulkReserve: 1000, // This will need to be the same secret you use on the Webhooks page for // the repo Pulldasher is going to monitor. hook_secret: diff --git a/lib/git-manager.js b/lib/git-manager.js index 99d61223..60e9e934 100644 --- a/lib/git-manager.js +++ b/lib/git-manager.js @@ -14,6 +14,7 @@ import Label from "../models/label.js"; import Status from "../models/status.js"; import Signature from "../models/signature.js"; import getLogin from "./get-user-login.js"; +import pacer from "./pacer.js"; const MyOctokit = Octokit.plugin(throttling, retry); const gitDebug = debug("pulldasher:github"); @@ -61,6 +62,8 @@ const github = new MyOctokit({ // (5xx / network) — the throttle plugin already logs 4xx rate limits — and // rethrow untouched so plugin-retry's retry logic is unaffected. github.hook.error("request", (error, options) => { + // A failed response still reports the current quota; feed it to the pacer. + pacer.observe(error.response && error.response.headers); const status = error.status; if (status === undefined || status >= 500) { gitDebug( @@ -74,6 +77,13 @@ github.hook.error("request", (error, options) => { throw error; }); +// Track the shared quota from every response's x-ratelimit-* headers so the +// pacer (which gates only bulk requests) always has a current view — live +// webhook/socket calls update it too, which is how bulk yields to live traffic. +github.hook.after("request", (response) => { + pacer.observe(response.headers); +}); + const githubRest = github.rest; export default { @@ -100,7 +110,7 @@ export default { */ getOpenPulls: function (repo) { return logErrors( - github.paginate(githubRest.pulls.list, params({ state: "open" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "open" }, repo)), "Getting open pulls in repo %s", repo ); @@ -113,7 +123,7 @@ export default { */ getAllPulls: function (repo) { return logErrors( - github.paginate(githubRest.pulls.list, params({ state: "all" }, repo)), + pacedPaginate(githubRest.pulls.list, params({ state: "all" }, repo)), "Getting all pulls in repo %s", repo ); @@ -134,8 +144,7 @@ export default { getOpenIssues: function (repo) { const searchParams = params({ state: "open" }, repo); return logErrors( - github - .paginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting open issues in repo %s", @@ -151,8 +160,7 @@ export default { getAllIssues: function (repo) { const searchParams = params({ state: "all" }, repo); return logErrors( - github - .paginate(githubRest.issues.listForRepo, searchParams) + pacedPaginate(githubRest.issues.listForRepo, searchParams) .then(filterOutPulls) .then(addRepo(searchParams)), "Getting all issues in repo %s", @@ -636,3 +644,19 @@ function logErrors(promise, ...messageAndArgs) { throw err; }); } + +/** + * Paginate a bulk listing, pacing between pages so a full-repo list can't burst + * through the quota. Page one fetches un-paced and seeds the pacer's quota view + * (via the after-hook observe); each `gate()` then spaces the fetch of the next + * page. Bulk-only: the live webhook/socket path never lists whole repos, so + * gating every page is safe. + */ +async function pacedPaginate(route, parameters) { + const items = []; + for await (const response of github.paginate.iterator(route, parameters)) { + items.push(...response.data); + await pacer.gate(); + } + return items; +} diff --git a/lib/pacer.js b/lib/pacer.js new file mode 100644 index 00000000..c8f25d13 --- /dev/null +++ b/lib/pacer.js @@ -0,0 +1,138 @@ +import config from "./config-loader.js"; +import debug from "./debug.js"; +import Promise from "bluebird"; + +const pacerDebug = debug("pulldasher:pacer"); + +// Reserve a slice of the hourly quota for live (webhook/socket) traffic; bulk +// backfills pause rather than spend below it. See config.github.bulkReserve. +const reserve = config.github.bulkReserve ?? 1000; + +// Only log a paced delay once it's long enough to explain a visibly slow +// backfill — short spacing isn't worth the noise. +const LOG_DELAY_THRESHOLD_MS = 5000; + +/** + * Proactively paces bulk GitHub requests so a backfill yields to live traffic + * instead of draining the shared token's quota to zero. A single process-wide + * instance: rate state is global to the token, so every bulk call gates against + * the same view, fed by `observe` on every response (live calls included). + * + * `observe(headers)` — record the latest quota from any response (free). + * `gate()` — await this before pushing a *bulk* item; it spreads bulk work + * across the reset window and blocks entirely while below the reserve floor. + * + * Pacing is calibrated by actual consumption, not by gate count: each gate + * advances a token-bucket cursor by the number of requests spent since the last + * gate (`x-ratelimit-used` delta), so a pull whose processing fans out to a + * dozen calls correctly waits a dozen intervals — and a live-traffic spike, + * which also bumps `used`, pushes the cursor out and makes bulk yield. + */ +const pacer = { + remaining: null, + reset: null, + used: null, + lastUsed: null, + nextSlot: 0, + + observe: function (headers) { + if (!headers) { + return; + } + const remaining = headers["x-ratelimit-remaining"]; + const reset = headers["x-ratelimit-reset"]; + const used = headers["x-ratelimit-used"]; + if (remaining !== undefined) { + this.remaining = Number(remaining); + } + if (reset !== undefined) { + this.reset = Number(reset); + } + if (used !== undefined) { + this.used = Number(used); + } + }, + + gate: function () { + const now = Date.now(); + const { delayMs, nextSlot, lastUsed } = computePace({ + remaining: this.remaining, + reset: this.reset, + used: this.used, + reserve: reserve, + now: now, + nextSlot: this.nextSlot, + lastUsed: this.lastUsed, + }); + this.nextSlot = nextSlot; + this.lastUsed = lastUsed; + + // The floor-pause is the only positive delay that isn't even-spread pacing. + if (delayMs > 0 && this.remaining != null && this.remaining <= reserve) { + pacerDebug( + "bulk paused: remaining %s ≤ reserve %s, waiting %ss for quota reset", + this.remaining, + reserve, + Math.round(delayMs / 1000) + ); + } else if (delayMs >= LOG_DELAY_THRESHOLD_MS) { + pacerDebug( + "pacing bulk: waiting %sms (remaining %s)", + Math.round(delayMs), + this.remaining + ); + } + + return Promise.delay(delayMs); + }, +}; + +export default pacer; + +/** + * Pure pacing decision. Given the latest quota view and a token-bucket cursor, + * returns how long the next bulk request should wait, the advanced cursor, and + * the `used` baseline to diff against next time. + * + * - unknown quota, or a `reset` already in the past (stale window) → allow + * immediately; the next response refreshes the view. + * - `remaining ≤ reserve` → pause until `reset` (let the window refill). + * - otherwise → even-spread: the spendable budget (`remaining − reserve`) + * divided across the time left in the window gives the per-request interval. + * Advance the cursor by `spent × interval`, where `spent` is the requests + * consumed since the last gate (`used − lastUsed`), so the spacing tracks + * real consumption — a multi-call item or a live spike pushes the next slot + * further out. A shrinking `remaining` also widens the interval, so bulk + * yields on both signals. + */ +export function computePace({ + remaining, + reset, + used, + reserve, + now, + nextSlot, + lastUsed, +}) { + const baseline = used ?? lastUsed; + // No quota view, or a window that already rolled over (our `remaining` is + // stale): proceed now and let the next response refresh the view. + const allowNow = { delayMs: 0, nextSlot: now, lastUsed: baseline }; + if (remaining == null || reset == null) { + return allowNow; + } + const resetMs = reset * 1000; + if (resetMs <= now) { + return allowNow; + } + if (remaining <= reserve) { + return { delayMs: resetMs - now, nextSlot: resetMs, lastUsed: baseline }; + } + const interval = (resetMs - now) / (remaining - reserve); + // Clamp at 0: across a window rollover the server resets `used` to ~0 while + // our `lastUsed` still holds the pre-reset high, so the delta can go negative. + const spent = + used == null || lastUsed == null ? 0 : Math.max(0, used - lastUsed); + const slot = Math.max(now, nextSlot + spent * interval); + return { delayMs: slot - now, nextSlot: slot, lastUsed: baseline }; +} diff --git a/lib/refresh.js b/lib/refresh.js index 16b64b90..9d19faee 100644 --- a/lib/refresh.js +++ b/lib/refresh.js @@ -4,6 +4,7 @@ import utils from "./utils.js"; import NotifyQueue from "notify-queue"; import debug from "./debug.js"; import Promise from "bluebird"; +import pacer from "./pacer.js"; // Queues for making all refreshes be synchronous, one at a time. var issueQueue = new NotifyQueue(); @@ -61,7 +62,8 @@ export default { * and return a promise that is fulfilled when the item is fully processed. * * Single-item refreshes (webhooks, socket) have no end-of-run report, so no - * failure collector is attached. + * failure collector is attached — and they aren't quota-paced, so live traffic + * is never delayed. */ function pushOnQueue(queue) { return function (githubResponse) { @@ -81,13 +83,20 @@ function pushOnQueue(queue) { * Each item carries the run's `onFailure` collector (null for single-item * webhook/socket refreshes), so a failed parse is recorded for that run's * end-of-run report. + * + * This is the bulk path, so it paces enqueueing against the GitHub quota: + * `pacer.gate()` waits for each item's slot *before* pushing it, keeping the + * shared queue shallow. The wait happens here, off the queue — so a paced or + * floor-paused backfill never blocks the single consumer, and live + * webhook/socket refreshes keep draining promptly. */ function pushAllOnQueue(queue, onFailure) { - return function (githubResponses) { + return async function (githubResponses) { + for (const githubResponse of githubResponses) { + await pacer.gate(); + queue.push({ response: githubResponse, onFailure: onFailure }); + } return new Promise(function (resolve) { - githubResponses.forEach(function (githubResponse) { - queue.push({ response: githubResponse, onFailure: onFailure }); - }); queue.push(resolve); }); }; @@ -212,7 +221,9 @@ export function processPullItem( // queue or the fixed pop deps, so it stays scoped to the run that enqueued it — // a bulk run collects its own failures, while webhook/socket refreshes (which // push onFailure null) don't accumulate. Unwrap so the process functions see -// one response plus their injected collaborators. +// one response plus their injected collaborators. (Quota pacing happens at +// enqueue time, in pushAllOnQueue, not here — so a paused backfill can't block +// the single consumer that webhook refreshes also depend on.) issueQueue.pop(function (item, next) { if (typeof item === "function") { item(); diff --git a/test/pacer.test.js b/test/pacer.test.js new file mode 100644 index 00000000..f9e449c2 --- /dev/null +++ b/test/pacer.test.js @@ -0,0 +1,94 @@ +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { computePace } from "../lib/pacer.js"; + +const NOW = 1_700_000_000_000; // fixed clock (ms); reset values are NOW-relative +const RESERVE = 1000; +const resetIn = (seconds) => NOW / 1000 + seconds; + +// Before the first response is observed there's no quota view, so a bulk +// request must proceed immediately rather than stall forever. +test("computePace allows the request when quota is unknown", () => { + const { delayMs } = computePace({ + remaining: null, + reset: null, + used: null, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: null, + }); + assert.equal(delayMs, 0); +}); + +// A reset timestamp in the past means the window already rolled over and our +// remaining is stale — allow the request and let the next response refresh it. +test("computePace allows the request when the reset window is stale", () => { + const { delayMs } = computePace({ + remaining: 0, + reset: resetIn(-10), + used: 5000, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: 4000, + }); + assert.equal(delayMs, 0); +}); + +// At or below the reserve floor, bulk must pause until the quota refills so it +// stops competing with live traffic. +test("computePace pauses until reset when remaining is at or below the reserve", () => { + const { delayMs, nextSlot } = computePace({ + remaining: 500, + reset: resetIn(600), + used: 4500, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: 4490, + }); + assert.equal(delayMs, 600_000); + // The cursor jumps to the reset so post-refill pacing starts from the window edge. + assert.equal(nextSlot, NOW + 600_000); +}); + +// The real first-gate state: page one was fetched un-paced, so the after-hook +// `observe` has already populated the quota view (remaining/reset/used) — but no +// prior gate has set a `used` baseline. With nothing to diff against, the gate +// can't know how much was spent yet, so it proceeds immediately and just records +// the baseline (returned as lastUsed) for the next gate. +test("computePace does not delay the first gate, but records the usage baseline", () => { + const { delayMs, nextSlot, lastUsed } = computePace({ + remaining: 5000, + reset: resetIn(3600), + used: 100, + reserve: RESERVE, + now: NOW, + nextSlot: 0, + lastUsed: null, + }); + assert.equal(delayMs, 0); + assert.equal(nextSlot, NOW); + assert.equal(lastUsed, 100); +}); + +// The cursor advances by the requests actually spent since the last gate, not +// by one per gate — so a multi-call item waits proportionally longer. +test("computePace advances the cursor by the requests spent since the last gate", () => { + const params = { + remaining: 1100, // budget 100 + reset: resetIn(100), // window 100_000ms → interval = 1000ms / request + used: undefined, // set per case + reserve: RESERVE, + now: NOW, + nextSlot: NOW, + lastUsed: 1000, + }; + + const oneCall = computePace({ ...params, used: 1001 }); + assert.equal(oneCall.delayMs, 1000); + + const eightCalls = computePace({ ...params, used: 1008 }); + assert.equal(eightCalls.delayMs, 8000); +});