diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 02afad88e9..8c808a8d0c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1239,6 +1239,9 @@ importers: nodemon: specifier: ^3.0.1 version: 3.1.0 + pg-promise: + specifier: ^11.4.3 + version: 11.6.0 services/apps/organizations_enrichment_worker: dependencies: @@ -6624,6 +6627,7 @@ packages: dottie@2.0.6: resolution: {integrity: sha512-iGCHkfUc5kFekGiqhe8B/mdaurD+lakO9txNnTvKtA6PISrw86LgqHvRzWYPyoE2Ph5aMIrCw9/uko6XHTKCwA==} + deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. dtrace-provider@0.8.8: resolution: {integrity: sha512-b7Z7cNtHPhH9EJhNNbbeqTcXB8LGFFZhq1PGgEvpeHlzd36bhbdTWoE/Ba/YguqpBSlAPKnARWhVlhunCMwfxg==} @@ -10859,8 +10863,8 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sso-oidc': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11054,11 +11058,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sso-oidc@3.572.0(@aws-sdk/client-sts@3.572.0)': + '@aws-sdk/client-sso-oidc@3.572.0': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11097,7 +11101,6 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: - - '@aws-sdk/client-sts' - aws-crt '@aws-sdk/client-sso@3.556.0': @@ -11273,11 +11276,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sts@3.572.0': + '@aws-sdk/client-sts@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11316,6 +11319,7 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: + - '@aws-sdk/client-sso-oidc' - aws-crt '@aws-sdk/client-sts@3.985.0': @@ -11481,7 +11485,7 @@ snapshots: '@aws-sdk/credential-provider-ini@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/credential-provider-env': 3.568.0 '@aws-sdk/credential-provider-process': 3.572.0 '@aws-sdk/credential-provider-sso': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) @@ -11658,7 +11662,7 @@ snapshots: '@aws-sdk/credential-provider-web-identity@3.568.0(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/types': 2.12.0 @@ -11970,7 +11974,7 @@ snapshots: '@aws-sdk/token-providers@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/shared-ini-file-loader': 2.4.0 diff --git a/services/apps/nango_worker/package.json b/services/apps/nango_worker/package.json index fcfd97a680..4f9b58c450 100644 --- a/services/apps/nango_worker/package.json +++ b/services/apps/nango_worker/package.json @@ -18,8 +18,8 @@ "@crowd/archetype-standard": "workspace:*", "@crowd/archetype-worker": "workspace:*", "@crowd/common": "workspace:*", - "@crowd/data-access-layer": "workspace:*", "@crowd/common_services": "workspace:*", + "@crowd/data-access-layer": "workspace:*", "@crowd/logging": "workspace:*", "@crowd/nango": "workspace:*", "@crowd/redis": "workspace:*", @@ -34,6 +34,7 @@ }, "devDependencies": { "@types/node": "^20.8.2", - "nodemon": "^3.0.1" + "nodemon": "^3.0.1", + "pg-promise": "^11.4.3" } } diff --git a/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts b/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts new file mode 100644 index 0000000000..0cd8deb839 --- /dev/null +++ b/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts @@ -0,0 +1,597 @@ +/** + * Gerrit Changeset Count Validator + * + * Compares changeset activity counts for a Gerrit repo between the upstream + * Gerrit REST API and local Tinybird storage, with set-diff to show exactly + * which change numbers are missing from each type. + * + * Usage: + * pnpm tsx src/bin/validate-gerrit-counts.ts [--since=] [--include-patchsets] + * + * Edit the CONFIG block below before running. + */ + +import { mkdir, writeFile } from 'node:fs/promises' +import pgPromise from 'pg-promise' + +// ─── config ─────────────────────────────────────────────────────────────────── + +const CONFIG = { + pg: { + host: process.env.CROWD_DB_READ_HOST || '', + port: parseInt(process.env.CROWD_DB_PORT || '5432'), + user: process.env.CROWD_DB_USERNAME || '', + password: process.env.CROWD_DB_PASSWORD || '', + database: process.env.CROWD_DB_DATABASE || '', + ssl: (process.env.CROWD_DB_SSL || 'true') !== 'false', + }, + tinybird: { + baseUrl: process.env.CROWD_TINYBIRD_BASE_URL || '', + token: process.env.CROWD_TINYBIRD_ACTIVITIES_TOKEN || '', + }, + gerritPageSize: 500, + gerritRequestDelayMs: 100, // delay between Gerrit pages to avoid rate-limiting + gerritMaxRetries: 3, // per-page retry attempts before giving up + // Optional ISO date string — when set, only changes updated on/after this date are compared. + // Gerrit uses `after:""`, Tinybird filters `timestamp >= ''`. + // Leave empty ('') for a full historical comparison. + since: '', + // When true, also validate patchset activity types (requires a second Gerrit walk with extra options). + includePatchsets: true, + verbose: true, + outputDir: './validation-reports', +} + +// ─── types ──────────────────────────────────────────────────────────────────── + +interface GerritChanges { + all: Set // all change numbers (→ changeset-created) + merged: Set // status=MERGED (→ changeset-merged) + abandoned: Set // status=ABANDONED (→ changeset-abandoned) + pagesWalked: number + skipErrors: number +} + +interface GerritPatchsets { + created: Set // sourceId = `${change._number}-${revision._number}` + approvals: Set // sourceId = `${change.id}-${vote._account_id}` + pagesWalked: number + skipErrors: number +} + +interface TypeDiff { + type: string + gerrit: number // -1 means "not countable from Gerrit API without expensive per-change calls" + tinybird: number + diff: number + missingFromTinybird: string[] // in Gerrit but not in Tinybird + extraInTinybird: string[] // in Tinybird but not in Gerrit + sourceIdDiffSkipped: boolean // true when counts matched — sourceId fetch was skipped + note?: string +} + +interface Report { + url: string + channelUrl: string + integrationId: string + repoName: string + runAt: string + durationMs: number + changeset: TypeDiff[] + patchset?: TypeDiff[] + gerritStats: { pagesWalked: number; changesProcessed: number; skipErrors: number } + gerritPatchsetStats?: { pagesWalked: number; skipErrors: number } +} + +// ─── validation ─────────────────────────────────────────────────────────────── + +function mustHave(value: string, label: string): string { + if (!value) throw new Error(`Missing required config: ${label}`) + return value +} + +function validateConfig(): void { + mustHave(CONFIG.pg.host, 'pg.host') + mustHave(CONFIG.pg.user, 'pg.user') + mustHave(CONFIG.pg.password, 'pg.password') + mustHave(CONFIG.pg.database, 'pg.database') + mustHave(CONFIG.tinybird.baseUrl, 'tinybird.baseUrl') + mustHave(CONFIG.tinybird.token, 'tinybird.token') +} + +// ─── url utils ──────────────────────────────────────────────────────────────── + +function parseRepoUrl(rawUrl: string): { plainUrl: string; channelUrl: string } { + let plain = rawUrl.trim().replace(/\/$/, '') + const qMatch = plain.match(/^(https?:\/\/[^/]+(?:\/[^/]+)*)\/q\/project:(.+)$/) + if (qMatch) plain = `${qMatch[1]}/${qMatch[2]}` + + const { origin, pathname } = new URL(plain) + let channelUrl: string + if (pathname.startsWith('/r/')) { + channelUrl = `${origin}/r/q/project:${pathname.slice(3)}` + } else if (pathname.startsWith('/gerrit/')) { + channelUrl = `${origin}/gerrit/q/project:${pathname.slice(8)}` + } else { + channelUrl = `${origin}/q/project:${pathname.slice(1)}` + } + + return { plainUrl: plain, channelUrl } +} + +// ─── db ─────────────────────────────────────────────────────────────────────── + +async function validateInDb( + db: pgPromise.IDatabase, + plainUrl: string, +): Promise<{ integrationId: string; segmentId: string; orgUrl: string; repoName: string }> { + const repo = await db.oneOrNone<{ sourceIntegrationId: string }>( + `SELECT "sourceIntegrationId" FROM repositories WHERE url = $1 AND "deletedAt" IS NULL`, + [plainUrl], + ) + if (!repo) + throw new Error( + `No repositories row found for: ${plainUrl}\n` + + `Tip: use the plain form without /q/project:`, + ) + + const integration = await db.oneOrNone<{ + id: string + segmentId: string + settings: { remote: { orgURL: string; repoNames: string[] } } + }>( + `SELECT id, "segmentId", settings FROM integrations WHERE id = $1 AND platform = 'gerrit' AND "deletedAt" IS NULL`, + [repo.sourceIntegrationId], + ) + if (!integration) + throw new Error(`Integration ${repo.sourceIntegrationId} not found or not platform='gerrit'.`) + + const { orgURL, repoNames } = integration.settings.remote + const repoName = plainUrl.replace(`${orgURL.replace(/\/$/, '')}/`, '') + if (!repoNames.includes(repoName)) + throw new Error( + `Repo '${repoName}' not in integration repoNames.\n` + + `Available: [${repoNames.slice(0, 10).join(', ')}${repoNames.length > 10 ? ', …' : ''}]`, + ) + + return { integrationId: integration.id, segmentId: integration.segmentId, orgUrl: orgURL, repoName } +} + +// ─── gerrit rest ────────────────────────────────────────────────────────────── + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + +function stripXssi(text: string): string { + return text.startsWith(")]}'") ? text.slice(5) : text +} + +async function fetchChangePage(url: string): Promise { + const resp = await fetch(url) + if (!resp.ok) throw new Error(`Gerrit ${resp.status}: GET ${url}`) + return JSON.parse(stripXssi(await resp.text())) as unknown[] +} + +async function fetchPageWithRetry(url: string, label: string): Promise { + let attempt = 0 + while (attempt < CONFIG.gerritMaxRetries) { + try { + return await fetchChangePage(url) + } catch (err) { + attempt++ + const delay = CONFIG.gerritRequestDelayMs * Math.pow(2, attempt) + console.error(` [gerrit] error ${label} (attempt ${attempt}/${CONFIG.gerritMaxRetries}): ${err} — retrying in ${delay}ms`) + await sleep(delay) + } + } + console.error(` [gerrit] giving up ${label} after ${CONFIG.gerritMaxRetries} attempts`) + return null +} + +// Changeset walk — lightweight, no extra options needed +async function fetchGerritChanges(orgUrl: string, repoName: string): Promise { + const result: GerritChanges = { all: new Set(), merged: new Set(), abandoned: new Set(), pagesWalked: 0, skipErrors: 0 } + const base = orgUrl.replace(/\/$/, '') + const sinceFilter = CONFIG.since ? `+after:"${CONFIG.since}"` : '' + let start = 0 + + while (true) { + const url = `${base}/changes/?q=project:${encodeURIComponent(repoName)}${sinceFilter}&n=${CONFIG.gerritPageSize}&S=${start}` + if (CONFIG.verbose) console.log(` [gerrit] changeset page S=${start}`) + + const page = await fetchPageWithRetry(url, `S=${start}`) + if (!page) { result.skipErrors++; break } + if (!Array.isArray(page) || page.length === 0) break + + result.pagesWalked++ + const hasMore = (page[page.length - 1] as Record)._more_changes === true + + for (const raw of page) { + const c = raw as Record + const num = String(c['_number'] ?? c['number'] ?? '') + if (!num) { result.skipErrors++; continue } + result.all.add(num) + if (c['status'] === 'MERGED') result.merged.add(num) + if (c['status'] === 'ABANDONED') result.abandoned.add(num) + } + + if (!hasMore) break + start += CONFIG.gerritPageSize + await sleep(CONFIG.gerritRequestDelayMs) + } + + return result +} + +// Patchset walk — separate, fetches ALL_REVISIONS + DETAILED_LABELS +async function fetchGerritPatchsets(orgUrl: string, repoName: string): Promise { + const result: GerritPatchsets = { created: new Set(), approvals: new Set(), pagesWalked: 0, skipErrors: 0 } + const base = orgUrl.replace(/\/$/, '') + const sinceFilter = CONFIG.since ? `+after:"${CONFIG.since}"` : '' + let start = 0 + + while (true) { + const url = + `${base}/changes/?q=project:${encodeURIComponent(repoName)}${sinceFilter}` + + `&o=ALL_REVISIONS&o=DETAILED_LABELS&n=${CONFIG.gerritPageSize}&S=${start}` + if (CONFIG.verbose) console.log(` [gerrit] patchset page S=${start}`) + + const page = await fetchPageWithRetry(url, `patchset S=${start}`) + if (!page) { result.skipErrors++; break } + if (!Array.isArray(page) || page.length === 0) break + + result.pagesWalked++ + const hasMore = (page[page.length - 1] as Record)._more_changes === true + + for (const raw of page) { + const c = raw as Record + const changeNum = String(c['_number'] ?? '') + if (!changeNum) { result.skipErrors++; continue } + + // patchset-created: one per revision + const revisions = c['revisions'] as Record> | undefined + if (revisions) { + for (const revDetail of Object.values(revisions)) { + const revNum = revDetail['_number'] + if (revNum != null) result.created.add(`${changeNum}-${revNum}`) + } + } + + // patchset_approval-created: sourceId = `${change.id}-${vote._account_id}` (mirrors Nango mapper) + const changeId = String(c['id'] ?? '') + const labels = c['labels'] as Record }> | undefined + const codeReview = labels?.['Code-Review']?.all ?? [] + for (const vote of codeReview) { + if (vote.value != null && vote.value !== 0 && vote._account_id != null) { + result.approvals.add(`${changeId}-${vote._account_id}`) + } + } + } + + if (!hasMore) break + start += CONFIG.gerritPageSize + await sleep(CONFIG.gerritRequestDelayMs) + } + + return result +} + +// ─── tinybird ───────────────────────────────────────────────────────────────── + +async function tinybirdSql(sql: string): Promise { + const resp = await fetch(`${CONFIG.tinybird.baseUrl}/v0/sql`, { + method: 'POST', + headers: { Authorization: `Bearer ${CONFIG.tinybird.token}`, 'Content-Type': 'application/json' }, + body: JSON.stringify({ q: sql }), + }) + if (!resp.ok) throw new Error(`Tinybird ${resp.status}: ${await resp.text()}`) + return resp.json() as Promise +} + +function tinybirdWhereClause(channelUrl: string, segmentId: string, types: string[]): string { + const ch = channelUrl.replace(/'/g, "\\'") + const seg = segmentId.replace(/'/g, "\\'") + const sinceClause = CONFIG.since ? `AND timestamp >= '${CONFIG.since}' ` : '' + const typeList = types.map((t) => `'${t}'`).join(', ') + return ( + `segmentId = '${seg}' ` + + `AND platform = 'gerrit' ` + + `AND channel = '${ch}' ` + + sinceClause + + `AND type IN (${typeList})` + ) +} + +// Phase 1: one GROUP BY query for all requested types +async function fetchTinybirdCounts( + channelUrl: string, + segmentId: string, + types: readonly string[], +): Promise> { + const sql = + `SELECT type, COUNT(DISTINCT sourceId) AS cnt ` + + `FROM activityRelations FINAL ` + + `WHERE ${tinybirdWhereClause(channelUrl, segmentId, [...types])} ` + + `GROUP BY type ` + + `FORMAT JSON` + + const result = await tinybirdSql<{ data: Array<{ type: string; cnt: string }> }>(sql) + const counts: Record = Object.fromEntries(types.map((t) => [t, 0])) + for (const row of result.data ?? []) { + if (row.type in counts) counts[row.type] = Number(row.cnt) + } + return counts +} + +// Phase 2: sourceIds for a single type — only called when count differs +async function fetchTinybirdSourceIds( + channelUrl: string, + segmentId: string, + type: string, +): Promise> { + const ch = channelUrl.replace(/'/g, "\\'") + const seg = segmentId.replace(/'/g, "\\'") + const sinceClause = CONFIG.since ? `AND timestamp >= '${CONFIG.since}' ` : '' + const sql = + `SELECT sourceId ` + + `FROM activityRelations FINAL ` + + `WHERE segmentId = '${seg}' ` + + `AND platform = 'gerrit' ` + + `AND channel = '${ch}' ` + + sinceClause + + `AND type = '${type}' ` + + `FORMAT JSON` + + const result = await tinybirdSql<{ data: Array<{ sourceId: string }> }>(sql) + return new Set((result.data ?? []).map((r) => r.sourceId)) +} + +// ─── diff ───────────────────────────────────────────────────────────────────── + +function setDiff(a: Set, b: Set): string[] { + return [...a].filter((x) => !b.has(x)).sort((x, y) => Number(x) - Number(y)) +} + +function buildDiff(type: string, gerrit: Set, tinybird: Set): TypeDiff { + return { + type, + gerrit: gerrit.size, + tinybird: tinybird.size, + diff: tinybird.size - gerrit.size, + missingFromTinybird: setDiff(gerrit, tinybird), + extraInTinybird: setDiff(tinybird, gerrit), + sourceIdDiffSkipped: false, + } +} + +function buildDiffCountsOnly(type: string, gerritCount: number, tinybirdCount: number, note?: string): TypeDiff { + return { + type, + gerrit: gerritCount, + tinybird: tinybirdCount, + diff: gerritCount === -1 ? 0 : tinybirdCount - gerritCount, + missingFromTinybird: [], + extraInTinybird: [], + sourceIdDiffSkipped: false, + note, + } +} + +async function buildTypeDiffsWithSets( + channelUrl: string, + segmentId: string, + typeConfigs: Array<{ type: string; gerritSet?: Set; gerritCount?: number; countOnly?: boolean; note?: string }>, + tinybirdCounts: Record, +): Promise { + const diffs: TypeDiff[] = [] + for (const cfg of typeConfigs) { + const tbCount = tinybirdCounts[cfg.type] ?? 0 + + if (cfg.countOnly || cfg.gerritSet == null) { + const gCount = cfg.gerritCount ?? -1 + diffs.push(buildDiffCountsOnly(cfg.type, gCount, tbCount, cfg.note)) + } else { + const gCount = cfg.gerritSet.size + if (gCount === tbCount) { + diffs.push(buildDiffCountsOnly(cfg.type, gCount, tbCount, cfg.note)) + } else { + console.log(` [tinybird] count diff on ${cfg.type} (${gCount} vs ${tbCount}) — fetching sourceIds...`) + const tbIds = await fetchTinybirdSourceIds(channelUrl, segmentId, cfg.type) + diffs.push(buildDiff(cfg.type, cfg.gerritSet, tbIds)) + } + } + } + return diffs +} + +// ─── report ─────────────────────────────────────────────────────────────────── + +function printSection(title: string, rows: TypeDiff[]): void { + const W = 30 + const N = 9 + const header = + 'Type'.padEnd(W) + + 'Gerrit'.padStart(N) + + 'Tinybird'.padStart(N) + + 'Δ'.padStart(N) + + ' Missing'.padStart(N) + + ' Extra'.padStart(N) + + console.log(`\n── ${title} ${'─'.repeat(Math.max(0, header.length - title.length - 4))}`) + console.log(header) + console.log('─'.repeat(header.length)) + + for (const r of rows) { + const gerritLabel = r.gerrit === -1 ? 'N/A' : r.gerrit.toLocaleString() + const diff = r.gerrit === -1 ? 'N/A' : `${r.diff > 0 ? '+' : ''}${r.diff.toLocaleString()}` + const missingLabel = r.sourceIdDiffSkipped ? '(skipped)' : r.missingFromTinybird.length.toLocaleString() + const extraLabel = r.sourceIdDiffSkipped ? '(skipped)' : r.extraInTinybird.length.toLocaleString() + console.log( + r.type.padEnd(W) + + gerritLabel.padStart(N) + + r.tinybird.toLocaleString().padStart(N) + + diff.padStart(N) + + missingLabel.padStart(N + 2) + + extraLabel.padStart(N + 2), + ) + if (r.note) console.log(` * ${r.note}`) + if (!r.sourceIdDiffSkipped) { + if (r.missingFromTinybird.length > 0 && r.missingFromTinybird.length <= 20) { + console.log(` missing from tinybird: ${r.missingFromTinybird.join(', ')}`) + } + if (r.extraInTinybird.length > 0 && r.extraInTinybird.length <= 20) { + console.log(` extra in tinybird: ${r.extraInTinybird.join(', ')}`) + } + } + } +} + +function printReport(report: Report): void { + console.log('') + console.log(`Repo ${report.url}`) + console.log(`Channel ${report.channelUrl}`) + console.log(`Integration ${report.integrationId}`) + console.log(`Run at ${report.runAt} (${(report.durationMs / 1000).toFixed(1)}s)`) + console.log(`Gerrit ${report.gerritStats.changesProcessed} changes / ${report.gerritStats.pagesWalked} pages`) + + printSection('Changesets', report.changeset) + if (report.patchset) printSection('Patchsets', report.patchset) + console.log('') +} + +// ─── main ───────────────────────────────────────────────────────────────────── + +const CHANGESET_TYPES = ['changeset-created', 'changeset-merged', 'changeset-abandoned'] as const +const PATCHSET_TYPES = ['patchset-created', 'patchset_approval-created', 'patchset_comment-created'] as const + +async function main(): Promise { + const args = process.argv.slice(2) + const rawUrl = args.find((a) => !a.startsWith('--')) + if (!rawUrl) { + console.error('Usage: pnpm tsx src/bin/validate-gerrit-counts.ts [--since=] [--include-patchsets]') + process.exit(1) + } + + const sinceArg = args.find((a) => a.startsWith('--since=')) + if (sinceArg) CONFIG.since = sinceArg.slice('--since='.length) + if (args.includes('--include-patchsets')) CONFIG.includePatchsets = true + + validateConfig() + const startMs = Date.now() + + console.log(`\nValidating Gerrit changesets for: ${rawUrl}`) + const { plainUrl, channelUrl } = parseRepoUrl(rawUrl) + if (plainUrl !== rawUrl.trim()) console.log(`Normalized to: ${plainUrl}`) + console.log(`Channel URL: ${channelUrl}`) + if (CONFIG.since) console.log(`Since: ${CONFIG.since} (incremental)`) + if (CONFIG.includePatchsets) console.log(`Mode: changesets + patchsets`) + + // DB lookup + console.log(`\nLooking up in CDP database...`) + const pgp = pgPromise() + const db = pgp({ + host: CONFIG.pg.host, + port: CONFIG.pg.port, + user: CONFIG.pg.user, + password: CONFIG.pg.password, + database: CONFIG.pg.database, + ssl: CONFIG.pg.ssl ? { rejectUnauthorized: false } : false, + }) + + let integrationId: string + let segmentId: string + let orgUrl: string + let repoName: string + try { + const info = await validateInDb(db, plainUrl) + integrationId = info.integrationId + segmentId = info.segmentId + orgUrl = info.orgUrl + repoName = info.repoName + console.log(` Integration: ${integrationId}`) + console.log(` Segment: ${segmentId}`) + console.log(` Repo: ${repoName}`) + } finally { + pgp.end() + } + + // ── Changesets ────────────────────────────────────────────────────────────── + + console.log(`\nFetching changeset data from Gerrit...`) + const gerrit = await fetchGerritChanges(orgUrl, repoName) + console.log(` ${gerrit.all.size} total ${gerrit.merged.size} merged ${gerrit.abandoned.size} abandoned`) + + console.log(`\nFetching changeset counts from Tinybird...`) + const tbChangesetCounts = await fetchTinybirdCounts(channelUrl, segmentId, CHANGESET_TYPES) + console.log(` ${tbChangesetCounts['changeset-created']} created ${tbChangesetCounts['changeset-merged']} merged ${tbChangesetCounts['changeset-abandoned']} abandoned`) + + const changeset = await buildTypeDiffsWithSets(channelUrl, segmentId, [ + { type: 'changeset-created', gerritSet: gerrit.all }, + { type: 'changeset-merged', gerritSet: gerrit.merged }, + { type: 'changeset-abandoned', gerritSet: gerrit.abandoned }, + ], tbChangesetCounts) + + // ── Patchsets (optional) ──────────────────────────────────────────────────── + + let patchset: TypeDiff[] | undefined + let gerritPatchsetStats: Report['gerritPatchsetStats'] + + if (CONFIG.includePatchsets) { + console.log(`\nFetching patchset data from Gerrit (ALL_REVISIONS + DETAILED_LABELS)...`) + const gerritPs = await fetchGerritPatchsets(orgUrl, repoName) + console.log(` ${gerritPs.created.size} patchsets ${gerritPs.approvals.size} approvals`) + gerritPatchsetStats = { pagesWalked: gerritPs.pagesWalked, skipErrors: gerritPs.skipErrors } + + console.log(`\nFetching patchset counts from Tinybird...`) + const tbPatchsetCounts = await fetchTinybirdCounts(channelUrl, segmentId, PATCHSET_TYPES) + console.log(` ${tbPatchsetCounts['patchset-created']} created ${tbPatchsetCounts['patchset_approval-created']} approvals ${tbPatchsetCounts['patchset_comment-created']} comments`) + + patchset = await buildTypeDiffsWithSets(channelUrl, segmentId, [ + { type: 'patchset-created', gerritSet: gerritPs.created }, + { type: 'patchset_approval-created', gerritSet: gerritPs.approvals }, + // Gerrit count requires one extra API call per change — omitted + { + type: 'patchset_comment-created', + gerritCount: -1, + countOnly: true, + note: 'Gerrit count requires per-change API calls — showing Tinybird count only', + }, + ], tbPatchsetCounts) + + } + + // ── Report ────────────────────────────────────────────────────────────────── + + const durationMs = Date.now() - startMs + const report: Report = { + url: plainUrl, + channelUrl, + integrationId, + repoName, + runAt: new Date().toISOString(), + durationMs, + changeset, + patchset, + gerritStats: { + pagesWalked: gerrit.pagesWalked, + changesProcessed: gerrit.all.size, + skipErrors: gerrit.skipErrors, + }, + gerritPatchsetStats, + } + printReport(report) + + const slug = `${new URL(plainUrl).hostname}-${repoName.replace(/\//g, '-')}` + const ts = new Date().toISOString().replace(/[:.]/g, '-') + await mkdir(CONFIG.outputDir, { recursive: true }) + const outputPath = `${CONFIG.outputDir}/validation-${slug}-${ts}.json` + await writeFile(outputPath, JSON.stringify(report, null, 2), 'utf8') + console.log(`Report saved: ${outputPath}`) +} + +main().catch((err) => { + console.error(`\nError: ${err instanceof Error ? err.message : String(err)}`) + if (err instanceof Error && err.stack) console.error(err.stack) + process.exit(1) +})