From 464167650a3b4b85fa7c2f8536e0468d46f84930 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Wed, 22 Apr 2026 11:00:41 +0100 Subject: [PATCH 1/2] feat: validation script WIP Signed-off-by: Mouad BANI --- pnpm-lock.yaml | 24 +- services/apps/nango_worker/package.json | 5 +- .../src/bin/validate-gerrit-counts.ts | 459 ++++++++++++++++++ 3 files changed, 476 insertions(+), 12 deletions(-) create mode 100644 services/apps/nango_worker/src/bin/validate-gerrit-counts.ts diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 02afad88e9..8c808a8d0c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1239,6 +1239,9 @@ importers: nodemon: specifier: ^3.0.1 version: 3.1.0 + pg-promise: + specifier: ^11.4.3 + version: 11.6.0 services/apps/organizations_enrichment_worker: dependencies: @@ -6624,6 +6627,7 @@ packages: dottie@2.0.6: resolution: {integrity: sha512-iGCHkfUc5kFekGiqhe8B/mdaurD+lakO9txNnTvKtA6PISrw86LgqHvRzWYPyoE2Ph5aMIrCw9/uko6XHTKCwA==} + deprecated: Package no longer supported. Contact Support at https://www.npmjs.com/support for more info. dtrace-provider@0.8.8: resolution: {integrity: sha512-b7Z7cNtHPhH9EJhNNbbeqTcXB8LGFFZhq1PGgEvpeHlzd36bhbdTWoE/Ba/YguqpBSlAPKnARWhVlhunCMwfxg==} @@ -10859,8 +10863,8 @@ snapshots: dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sso-oidc': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11054,11 +11058,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sso-oidc@3.572.0(@aws-sdk/client-sts@3.572.0)': + '@aws-sdk/client-sso-oidc@3.572.0': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11097,7 +11101,6 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: - - '@aws-sdk/client-sts' - aws-crt '@aws-sdk/client-sso@3.556.0': @@ -11273,11 +11276,11 @@ snapshots: transitivePeerDependencies: - aws-crt - '@aws-sdk/client-sts@3.572.0': + '@aws-sdk/client-sts@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: '@aws-crypto/sha256-browser': 3.0.0 '@aws-crypto/sha256-js': 3.0.0 - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/core': 3.572.0 '@aws-sdk/credential-provider-node': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0) '@aws-sdk/middleware-host-header': 3.567.0 @@ -11316,6 +11319,7 @@ snapshots: '@smithy/util-utf8': 2.3.0 tslib: 2.6.2 transitivePeerDependencies: + - '@aws-sdk/client-sso-oidc' - aws-crt '@aws-sdk/client-sts@3.985.0': @@ -11481,7 +11485,7 @@ snapshots: '@aws-sdk/credential-provider-ini@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/credential-provider-env': 3.568.0 '@aws-sdk/credential-provider-process': 3.572.0 '@aws-sdk/credential-provider-sso': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) @@ -11658,7 +11662,7 @@ snapshots: '@aws-sdk/credential-provider-web-identity@3.568.0(@aws-sdk/client-sts@3.572.0)': dependencies: - '@aws-sdk/client-sts': 3.572.0 + '@aws-sdk/client-sts': 3.572.0(@aws-sdk/client-sso-oidc@3.572.0) '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/types': 2.12.0 @@ -11970,7 +11974,7 @@ snapshots: '@aws-sdk/token-providers@3.572.0(@aws-sdk/client-sso-oidc@3.572.0)': dependencies: - '@aws-sdk/client-sso-oidc': 3.572.0(@aws-sdk/client-sts@3.572.0) + '@aws-sdk/client-sso-oidc': 3.572.0 '@aws-sdk/types': 3.567.0 '@smithy/property-provider': 2.2.0 '@smithy/shared-ini-file-loader': 2.4.0 diff --git a/services/apps/nango_worker/package.json b/services/apps/nango_worker/package.json index fcfd97a680..4f9b58c450 100644 --- a/services/apps/nango_worker/package.json +++ b/services/apps/nango_worker/package.json @@ -18,8 +18,8 @@ "@crowd/archetype-standard": "workspace:*", "@crowd/archetype-worker": "workspace:*", "@crowd/common": "workspace:*", - "@crowd/data-access-layer": "workspace:*", "@crowd/common_services": "workspace:*", + "@crowd/data-access-layer": "workspace:*", "@crowd/logging": "workspace:*", "@crowd/nango": "workspace:*", "@crowd/redis": "workspace:*", @@ -34,6 +34,7 @@ }, "devDependencies": { "@types/node": "^20.8.2", - "nodemon": "^3.0.1" + "nodemon": "^3.0.1", + "pg-promise": "^11.4.3" } } diff --git a/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts b/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts new file mode 100644 index 0000000000..c47fba98ec --- /dev/null +++ b/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts @@ -0,0 +1,459 @@ +/** + * Gerrit Remote vs Local Count Validator + * + * Compares per-activity-type counts for a Gerrit repo between the upstream + * Gerrit REST API (source of truth) and local Tinybird storage. + * + * Usage: + * pnpm tsx src/bin/validate-gerrit-counts.ts + * + * Edit the CONFIG block below before running. + * Every value falls back to the corresponding env var if left empty. + */ + +import { writeFile } from 'node:fs/promises' +import pgPromise from 'pg-promise' + +// ─── config ─────────────────────────────────────────────────────────────────── +// Edit these values or export the corresponding env vars before running. + +const CONFIG = { + pg: { + host: process.env.CROWD_DB_READ_HOST || '', + port: parseInt(process.env.CROWD_DB_PORT || '5432'), + user: process.env.CROWD_DB_USERNAME || '', + password: process.env.CROWD_DB_PASSWORD || '', + database: process.env.CROWD_DB_DATABASE || '', + ssl: (process.env.CROWD_DB_SSL || 'true') !== 'false', + }, + tinybird: { + baseUrl: (process.env.CROWD_TINYBIRD_BASE_URL || '').replace(/\/$/, ''), + token: process.env.CROWD_TINYBIRD_ACTIVITIES_TOKEN || '', + }, + gerritPageSize: 500, + verbose: false, + outputDir: '.', // directory for the JSON artifact +} + +// ─── types ──────────────────────────────────────────────────────────────────── + +interface RemoteCounts { + 'changeset-created': number + 'changeset-merged': number + 'changeset-abandoned': number + 'changeset_comment-created': number + 'patchset-created': number + 'patchset_approval-created': number + pagesWalked: number + changesProcessed: number + skipErrors: number +} + +interface TypeResult { + type: string + remote: number | 'n/a' + local: number + diff: number | 'n/a' + note?: string +} + +interface Report { + url: string + channelUrl: string + integrationId: string + repoName: string + runAt: string + durationMs: number + results: TypeResult[] + gerritStats: { pagesWalked: number; changesProcessed: number; skipErrors: number } +} + +// ─── startup ────────────────────────────────────────────────────────────────── + +function mustHave(value: string, label: string): string { + if (!value) throw new Error(`Missing required config: ${label}`) + return value +} + +function validateConfig(): void { + mustHave(CONFIG.pg.host, 'pg.host / CROWD_DB_READ_HOST') + mustHave(CONFIG.pg.user, 'pg.user / CROWD_DB_USERNAME') + mustHave(CONFIG.pg.password, 'pg.password / CROWD_DB_PASSWORD') + mustHave(CONFIG.pg.database, 'pg.database / CROWD_DB_DATABASE') + mustHave(CONFIG.tinybird.baseUrl, 'tinybird.baseUrl / CROWD_TINYBIRD_BASE_URL') + mustHave(CONFIG.tinybird.token, 'tinybird.token / CROWD_TINYBIRD_ACTIVITIES_TOKEN') +} + +// ─── url utils ──────────────────────────────────────────────────────────────── + +function parseRepoUrl(rawUrl: string): { plainUrl: string; channelUrl: string } { + let plain = rawUrl.trim().replace(/\/$/, '') + + // Normalise /q/project: form → plain form + // e.g. https://review.opendev.org/q/project:openstack/nova + // → https://review.opendev.org/openstack/nova + const qMatch = plain.match(/^(https?:\/\/[^/]+(?:\/[^/]+)*)\/q\/project:(.+)$/) + if (qMatch) plain = `${qMatch[1]}/${qMatch[2]}` + + // Derive channel URL following repos_to_channels.pipe logic: + // /r/{project} → /r/q/project:{project} + // /gerrit/{project} → /gerrit/q/project:{project} + // /{project} → /q/project:{project} + const { origin, pathname } = new URL(plain) + let channelUrl: string + if (pathname.startsWith('/r/')) { + channelUrl = `${origin}/r/q/project:${pathname.slice(3)}` + } else if (pathname.startsWith('/gerrit/')) { + channelUrl = `${origin}/gerrit/q/project:${pathname.slice(8)}` + } else { + channelUrl = `${origin}/q/project:${pathname.slice(1)}` + } + + return { plainUrl: plain, channelUrl } +} + +// ─── db validation ──────────────────────────────────────────────────────────── + +async function validateInDb( + db: pgPromise.IDatabase, + plainUrl: string, +): Promise<{ integrationId: string; orgUrl: string; repoName: string }> { + const repo = await db.oneOrNone<{ sourceIntegrationId: string }>( + `SELECT "sourceIntegrationId" FROM repositories WHERE url = $1 AND "deletedAt" IS NULL`, + [plainUrl], + ) + if (!repo) { + throw new Error( + `No repositories row found for: ${plainUrl}\n` + + `Tip: use the plain form without /q/project: (e.g. https://host/org/repo)`, + ) + } + + const integration = await db.oneOrNone<{ + id: string + settings: { remote: { orgURL: string; repoNames: string[] } } + }>( + `SELECT id, settings FROM integrations WHERE id = $1 AND platform = 'gerrit' AND "deletedAt" IS NULL`, + [repo.sourceIntegrationId], + ) + if (!integration) { + throw new Error( + `Integration ${repo.sourceIntegrationId} not found or is not platform='gerrit'.`, + ) + } + + const { orgURL, repoNames } = integration.settings.remote + const repoName = plainUrl.replace(`${orgURL.replace(/\/$/, '')}/`, '') + + if (!repoNames.includes(repoName)) { + throw new Error( + `Repo '${repoName}' not found in integration repoNames.\n` + + `Available: [${repoNames.slice(0, 10).join(', ')}${repoNames.length > 10 ? ', …' : ''}]`, + ) + } + + return { integrationId: integration.id, orgUrl: orgURL, repoName } +} + +// ─── gerrit rest ────────────────────────────────────────────────────────────── + +function stripXssi(text: string): string { + // Gerrit prepends )]}\' + newline to prevent XSSI + return text.startsWith(")]}'") ? text.slice(5) : text +} + +async function fetchChangePage(url: string): Promise { + const resp = await fetch(url) + if (!resp.ok) throw new Error(`Gerrit ${resp.status}: GET ${url}`) + return JSON.parse(stripXssi(await resp.text())) as unknown[] +} + +async function walkGerritChanges(orgUrl: string, repoName: string): Promise { + const counts: RemoteCounts = { + 'changeset-created': 0, + 'changeset-merged': 0, + 'changeset-abandoned': 0, + 'changeset_comment-created': 0, + 'patchset-created': 0, + 'patchset_approval-created': 0, + pagesWalked: 0, + changesProcessed: 0, + skipErrors: 0, + } + + const base = orgUrl.replace(/\/$/, '') + const options = 'o=MESSAGES&o=ALL_REVISIONS&o=DETAILED_LABELS' + let start = 0 + + // eslint-disable-next-line no-constant-condition + while (true) { + const url = `${base}/changes/?q=project:${encodeURIComponent(repoName)}&n=${CONFIG.gerritPageSize}&S=${start}&${options}` + if (CONFIG.verbose) console.log(` [gerrit] page S=${start}`) + + let page: unknown[] + try { + page = await fetchChangePage(url) + } catch (err) { + console.error(` [gerrit] error at S=${start}: ${err}`) + counts.skipErrors++ + break + } + + if (!Array.isArray(page) || page.length === 0) break + + counts.pagesWalked++ + const hasMore = (page[page.length - 1] as Record)._more_changes === true + + for (const raw of page) { + const change = raw as Record + try { + counts['changeset-created']++ + if (change['status'] === 'MERGED') counts['changeset-merged']++ + if (change['status'] === 'ABANDONED') counts['changeset-abandoned']++ + + counts['changeset_comment-created'] += ((change['messages'] as unknown[]) ?? []).length + + counts['patchset-created'] += Object.keys( + (change['revisions'] as Record) ?? {}, + ).length + + for (const labelData of Object.values( + (change['labels'] as Record) ?? {}, + )) { + for (const vote of labelData.all ?? []) { + if (vote.value !== undefined && vote.value !== 0) { + counts['patchset_approval-created']++ + } + } + } + + counts.changesProcessed++ + } catch { + counts.skipErrors++ + } + } + + if (CONFIG.verbose) console.log(` [gerrit] cumulative: ${counts.changesProcessed} changes`) + if (!hasMore) break + start += CONFIG.gerritPageSize + } + + return counts +} + +// ─── tinybird ───────────────────────────────────────────────────────────────── + +async function tinybirdSql(sql: string): Promise { + const resp = await fetch(`${CONFIG.tinybird.baseUrl}/v0/sql`, { + method: 'POST', + headers: { + Authorization: `Bearer ${CONFIG.tinybird.token}`, + 'Content-Type': 'application/json', + }, + body: JSON.stringify({ q: sql }), + }) + if (!resp.ok) { + const body = await resp.text() + throw new Error(`Tinybird ${resp.status}: ${body}`) + } + return resp.json() as Promise +} + +async function getLocalCounts(channelUrl: string): Promise> { + const escaped = channelUrl.replace(/'/g, "\\'") + const sql = + `SELECT type, count() AS cnt ` + + `FROM activityRelations_deduplicated_cleaned_bucket_union ` + + `WHERE platform = 'gerrit' AND channel = '${escaped}' ` + + `GROUP BY type FORMAT JSON` + + const result = await tinybirdSql<{ data: Array<{ type: string; cnt: number }> }>(sql) + const counts: Record = {} + for (const row of result.data ?? []) { + counts[row.type] = Number(row.cnt) + } + return counts +} + +// ─── report ─────────────────────────────────────────────────────────────────── + +const IN_SCOPE: Array> = + [ + 'changeset-created', + 'changeset-merged', + 'changeset-abandoned', + 'changeset_comment-created', + 'patchset-created', + 'patchset_approval-created', + ] + +const EXCLUDED = new Set(['changeset-new', 'changeset-closed', 'patchset_comment-created']) + +function buildReport( + url: string, + channelUrl: string, + integrationId: string, + repoName: string, + remote: RemoteCounts, + local: Record, + durationMs: number, +): Report { + const results: TypeResult[] = [] + + for (const type of IN_SCOPE) { + const r = remote[type] + const l = local[type] ?? 0 + results.push({ + type, + remote: r, + local: l, + diff: l - r, + note: + type === 'patchset_approval-created' + ? 'remote = current-state votes only (non-historical estimate)' + : undefined, + }) + } + + // Surface excluded types that unexpectedly have local rows + for (const type of EXCLUDED) { + const l = local[type] ?? 0 + if (l > 0) { + const reason = + type === 'changeset-new' + ? 'dead type (registry-deleted by migration V1759325136)' + : type === 'changeset-closed' + ? 'derivation logic lives in external Nango sync only' + : 'out of scope (Tier 3)' + results.push({ type, remote: 'n/a', local: l, diff: 'n/a', note: reason }) + } + } + + // Any other unexpected local types + for (const type of Object.keys(local)) { + if (!(IN_SCOPE as string[]).includes(type) && !EXCLUDED.has(type)) { + results.push({ type, remote: 'n/a', local: local[type], diff: 'n/a', note: 'unknown type' }) + } + } + + return { + url, + channelUrl, + integrationId, + repoName, + runAt: new Date().toISOString(), + durationMs, + results, + gerritStats: { + pagesWalked: remote.pagesWalked, + changesProcessed: remote.changesProcessed, + skipErrors: remote.skipErrors, + }, + } +} + +function printReport(report: Report): void { + const W = 38 + const N = 11 + const header = 'Type'.padEnd(W) + 'Remote'.padStart(N) + 'Local'.padStart(N) + 'Δ'.padStart(N) + + console.log('') + console.log(`Repo ${report.url}`) + console.log(`Channel ${report.channelUrl}`) + console.log(`Integration ${report.integrationId}`) + console.log(`Run at ${report.runAt} (${(report.durationMs / 1000).toFixed(1)}s)`) + console.log( + `Gerrit ${report.gerritStats.changesProcessed} changes / ` + + `${report.gerritStats.pagesWalked} pages` + + (report.gerritStats.skipErrors ? ` / ${report.gerritStats.skipErrors} skip errors` : ''), + ) + console.log('') + console.log(header) + console.log('─'.repeat(header.length)) + + for (const r of report.results) { + const remote = r.remote === 'n/a' ? 'n/a' : (r.remote as number).toLocaleString() + const local = r.local.toLocaleString() + const diff = + r.diff === 'n/a' + ? 'n/a' + : `${(r.diff as number) > 0 ? '+' : ''}${(r.diff as number).toLocaleString()}` + console.log(r.type.padEnd(W) + remote.padStart(N) + local.padStart(N) + diff.padStart(N)) + if (r.note) console.log(`${''.padEnd(W + 1)}* ${r.note}`) + } + console.log('') +} + +// ─── main ───────────────────────────────────────────────────────────────────── + +async function main(): Promise { + const rawUrl = process.argv[2] + if (!rawUrl) { + console.error('Usage: pnpm tsx src/bin/validate-gerrit-counts.ts ') + process.exit(1) + } + + validateConfig() + const startMs = Date.now() + + console.log(`\nValidating Gerrit counts for: ${rawUrl}`) + + const { plainUrl, channelUrl } = parseRepoUrl(rawUrl) + if (plainUrl !== rawUrl.trim()) console.log(`Normalized to: ${plainUrl}`) + console.log(`Channel URL: ${channelUrl}`) + + // ── DB lookup ── + console.log(`\nLooking up in CDP database...`) + const pgp = pgPromise() + const db = pgp({ + host: CONFIG.pg.host, + port: CONFIG.pg.port, + user: CONFIG.pg.user, + password: CONFIG.pg.password, + database: CONFIG.pg.database, + ssl: CONFIG.pg.ssl ? { rejectUnauthorized: false } : false, + }) + + let integrationId: string + let orgUrl: string + let repoName: string + + try { + const info = await validateInDb(db, plainUrl) + integrationId = info.integrationId + orgUrl = info.orgUrl + repoName = info.repoName + console.log(` Integration: ${integrationId}`) + console.log(` Org URL: ${orgUrl}`) + console.log(` Repo: ${repoName}`) + } finally { + pgp.end() + } + + // ── Remote counts ── + console.log(`\nFetching remote counts from Gerrit (page size: ${CONFIG.gerritPageSize})...`) + const remote = await walkGerritChanges(orgUrl, repoName) + console.log(` ${remote.changesProcessed} changes in ${remote.pagesWalked} pages`) + if (remote.skipErrors > 0) console.warn(` ⚠ ${remote.skipErrors} skip errors`) + + // ── Local counts ── + console.log(`\nFetching local counts from Tinybird...`) + const local = await getLocalCounts(channelUrl) + console.log(` ${Object.keys(local).length} activity types found locally`) + + // ── Report ── + const durationMs = Date.now() - startMs + const report = buildReport(plainUrl, channelUrl, integrationId, repoName, remote, local, durationMs) + printReport(report) + + const slug = `${new URL(plainUrl).hostname}-${repoName.replace(/\//g, '-')}` + const ts = new Date().toISOString().replace(/[:.]/g, '-') + const outputPath = `${CONFIG.outputDir}/validation-${slug}-${ts}.json` + await writeFile(outputPath, JSON.stringify(report, null, 2), 'utf8') + console.log(`Report saved: ${outputPath}`) +} + +main().catch((err) => { + console.error(`\nError: ${err instanceof Error ? err.message : String(err)}`) + process.exit(1) +}) From 8cb417fdbd013186b662600555ab60da63093752 Mon Sep 17 00:00:00 2001 From: Mouad BANI Date: Thu, 23 Apr 2026 17:54:13 +0100 Subject: [PATCH 2/2] feat: validation script for changesets and patchsets Signed-off-by: Mouad BANI --- .../src/bin/validate-gerrit-counts.ts | 592 +++++++++++------- 1 file changed, 365 insertions(+), 227 deletions(-) diff --git a/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts b/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts index c47fba98ec..0cd8deb839 100644 --- a/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts +++ b/services/apps/nango_worker/src/bin/validate-gerrit-counts.ts @@ -1,21 +1,20 @@ /** - * Gerrit Remote vs Local Count Validator + * Gerrit Changeset Count Validator * - * Compares per-activity-type counts for a Gerrit repo between the upstream - * Gerrit REST API (source of truth) and local Tinybird storage. + * Compares changeset activity counts for a Gerrit repo between the upstream + * Gerrit REST API and local Tinybird storage, with set-diff to show exactly + * which change numbers are missing from each type. * * Usage: - * pnpm tsx src/bin/validate-gerrit-counts.ts + * pnpm tsx src/bin/validate-gerrit-counts.ts [--since=] [--include-patchsets] * * Edit the CONFIG block below before running. - * Every value falls back to the corresponding env var if left empty. */ -import { writeFile } from 'node:fs/promises' +import { mkdir, writeFile } from 'node:fs/promises' import pgPromise from 'pg-promise' // ─── config ─────────────────────────────────────────────────────────────────── -// Edit these values or export the corresponding env vars before running. const CONFIG = { pg: { @@ -27,33 +26,47 @@ const CONFIG = { ssl: (process.env.CROWD_DB_SSL || 'true') !== 'false', }, tinybird: { - baseUrl: (process.env.CROWD_TINYBIRD_BASE_URL || '').replace(/\/$/, ''), + baseUrl: process.env.CROWD_TINYBIRD_BASE_URL || '', token: process.env.CROWD_TINYBIRD_ACTIVITIES_TOKEN || '', }, gerritPageSize: 500, - verbose: false, - outputDir: '.', // directory for the JSON artifact + gerritRequestDelayMs: 100, // delay between Gerrit pages to avoid rate-limiting + gerritMaxRetries: 3, // per-page retry attempts before giving up + // Optional ISO date string — when set, only changes updated on/after this date are compared. + // Gerrit uses `after:""`, Tinybird filters `timestamp >= ''`. + // Leave empty ('') for a full historical comparison. + since: '', + // When true, also validate patchset activity types (requires a second Gerrit walk with extra options). + includePatchsets: true, + verbose: true, + outputDir: './validation-reports', } // ─── types ──────────────────────────────────────────────────────────────────── -interface RemoteCounts { - 'changeset-created': number - 'changeset-merged': number - 'changeset-abandoned': number - 'changeset_comment-created': number - 'patchset-created': number - 'patchset_approval-created': number +interface GerritChanges { + all: Set // all change numbers (→ changeset-created) + merged: Set // status=MERGED (→ changeset-merged) + abandoned: Set // status=ABANDONED (→ changeset-abandoned) pagesWalked: number - changesProcessed: number skipErrors: number } -interface TypeResult { +interface GerritPatchsets { + created: Set // sourceId = `${change._number}-${revision._number}` + approvals: Set // sourceId = `${change.id}-${vote._account_id}` + pagesWalked: number + skipErrors: number +} + +interface TypeDiff { type: string - remote: number | 'n/a' - local: number - diff: number | 'n/a' + gerrit: number // -1 means "not countable from Gerrit API without expensive per-change calls" + tinybird: number + diff: number + missingFromTinybird: string[] // in Gerrit but not in Tinybird + extraInTinybird: string[] // in Tinybird but not in Gerrit + sourceIdDiffSkipped: boolean // true when counts matched — sourceId fetch was skipped note?: string } @@ -64,11 +77,13 @@ interface Report { repoName: string runAt: string durationMs: number - results: TypeResult[] + changeset: TypeDiff[] + patchset?: TypeDiff[] gerritStats: { pagesWalked: number; changesProcessed: number; skipErrors: number } + gerritPatchsetStats?: { pagesWalked: number; skipErrors: number } } -// ─── startup ────────────────────────────────────────────────────────────────── +// ─── validation ─────────────────────────────────────────────────────────────── function mustHave(value: string, label: string): string { if (!value) throw new Error(`Missing required config: ${label}`) @@ -76,29 +91,21 @@ function mustHave(value: string, label: string): string { } function validateConfig(): void { - mustHave(CONFIG.pg.host, 'pg.host / CROWD_DB_READ_HOST') - mustHave(CONFIG.pg.user, 'pg.user / CROWD_DB_USERNAME') - mustHave(CONFIG.pg.password, 'pg.password / CROWD_DB_PASSWORD') - mustHave(CONFIG.pg.database, 'pg.database / CROWD_DB_DATABASE') - mustHave(CONFIG.tinybird.baseUrl, 'tinybird.baseUrl / CROWD_TINYBIRD_BASE_URL') - mustHave(CONFIG.tinybird.token, 'tinybird.token / CROWD_TINYBIRD_ACTIVITIES_TOKEN') + mustHave(CONFIG.pg.host, 'pg.host') + mustHave(CONFIG.pg.user, 'pg.user') + mustHave(CONFIG.pg.password, 'pg.password') + mustHave(CONFIG.pg.database, 'pg.database') + mustHave(CONFIG.tinybird.baseUrl, 'tinybird.baseUrl') + mustHave(CONFIG.tinybird.token, 'tinybird.token') } // ─── url utils ──────────────────────────────────────────────────────────────── function parseRepoUrl(rawUrl: string): { plainUrl: string; channelUrl: string } { let plain = rawUrl.trim().replace(/\/$/, '') - - // Normalise /q/project: form → plain form - // e.g. https://review.opendev.org/q/project:openstack/nova - // → https://review.opendev.org/openstack/nova const qMatch = plain.match(/^(https?:\/\/[^/]+(?:\/[^/]+)*)\/q\/project:(.+)$/) if (qMatch) plain = `${qMatch[1]}/${qMatch[2]}` - // Derive channel URL following repos_to_channels.pipe logic: - // /r/{project} → /r/q/project:{project} - // /gerrit/{project} → /gerrit/q/project:{project} - // /{project} → /q/project:{project} const { origin, pathname } = new URL(plain) let channelUrl: string if (pathname.startsWith('/r/')) { @@ -112,53 +119,51 @@ function parseRepoUrl(rawUrl: string): { plainUrl: string; channelUrl: string } return { plainUrl: plain, channelUrl } } -// ─── db validation ──────────────────────────────────────────────────────────── +// ─── db ─────────────────────────────────────────────────────────────────────── async function validateInDb( db: pgPromise.IDatabase, plainUrl: string, -): Promise<{ integrationId: string; orgUrl: string; repoName: string }> { +): Promise<{ integrationId: string; segmentId: string; orgUrl: string; repoName: string }> { const repo = await db.oneOrNone<{ sourceIntegrationId: string }>( `SELECT "sourceIntegrationId" FROM repositories WHERE url = $1 AND "deletedAt" IS NULL`, [plainUrl], ) - if (!repo) { + if (!repo) throw new Error( `No repositories row found for: ${plainUrl}\n` + - `Tip: use the plain form without /q/project: (e.g. https://host/org/repo)`, + `Tip: use the plain form without /q/project:`, ) - } const integration = await db.oneOrNone<{ id: string + segmentId: string settings: { remote: { orgURL: string; repoNames: string[] } } }>( - `SELECT id, settings FROM integrations WHERE id = $1 AND platform = 'gerrit' AND "deletedAt" IS NULL`, + `SELECT id, "segmentId", settings FROM integrations WHERE id = $1 AND platform = 'gerrit' AND "deletedAt" IS NULL`, [repo.sourceIntegrationId], ) - if (!integration) { - throw new Error( - `Integration ${repo.sourceIntegrationId} not found or is not platform='gerrit'.`, - ) - } + if (!integration) + throw new Error(`Integration ${repo.sourceIntegrationId} not found or not platform='gerrit'.`) const { orgURL, repoNames } = integration.settings.remote const repoName = plainUrl.replace(`${orgURL.replace(/\/$/, '')}/`, '') - - if (!repoNames.includes(repoName)) { + if (!repoNames.includes(repoName)) throw new Error( - `Repo '${repoName}' not found in integration repoNames.\n` + + `Repo '${repoName}' not in integration repoNames.\n` + `Available: [${repoNames.slice(0, 10).join(', ')}${repoNames.length > 10 ? ', …' : ''}]`, ) - } - return { integrationId: integration.id, orgUrl: orgURL, repoName } + return { integrationId: integration.id, segmentId: integration.segmentId, orgUrl: orgURL, repoName } } // ─── gerrit rest ────────────────────────────────────────────────────────────── +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)) +} + function stripXssi(text: string): string { - // Gerrit prepends )]}\' + newline to prevent XSSI return text.startsWith(")]}'") ? text.slice(5) : text } @@ -168,77 +173,108 @@ async function fetchChangePage(url: string): Promise { return JSON.parse(stripXssi(await resp.text())) as unknown[] } -async function walkGerritChanges(orgUrl: string, repoName: string): Promise { - const counts: RemoteCounts = { - 'changeset-created': 0, - 'changeset-merged': 0, - 'changeset-abandoned': 0, - 'changeset_comment-created': 0, - 'patchset-created': 0, - 'patchset_approval-created': 0, - pagesWalked: 0, - changesProcessed: 0, - skipErrors: 0, +async function fetchPageWithRetry(url: string, label: string): Promise { + let attempt = 0 + while (attempt < CONFIG.gerritMaxRetries) { + try { + return await fetchChangePage(url) + } catch (err) { + attempt++ + const delay = CONFIG.gerritRequestDelayMs * Math.pow(2, attempt) + console.error(` [gerrit] error ${label} (attempt ${attempt}/${CONFIG.gerritMaxRetries}): ${err} — retrying in ${delay}ms`) + await sleep(delay) + } } + console.error(` [gerrit] giving up ${label} after ${CONFIG.gerritMaxRetries} attempts`) + return null +} +// Changeset walk — lightweight, no extra options needed +async function fetchGerritChanges(orgUrl: string, repoName: string): Promise { + const result: GerritChanges = { all: new Set(), merged: new Set(), abandoned: new Set(), pagesWalked: 0, skipErrors: 0 } const base = orgUrl.replace(/\/$/, '') - const options = 'o=MESSAGES&o=ALL_REVISIONS&o=DETAILED_LABELS' + const sinceFilter = CONFIG.since ? `+after:"${CONFIG.since}"` : '' let start = 0 - // eslint-disable-next-line no-constant-condition while (true) { - const url = `${base}/changes/?q=project:${encodeURIComponent(repoName)}&n=${CONFIG.gerritPageSize}&S=${start}&${options}` - if (CONFIG.verbose) console.log(` [gerrit] page S=${start}`) + const url = `${base}/changes/?q=project:${encodeURIComponent(repoName)}${sinceFilter}&n=${CONFIG.gerritPageSize}&S=${start}` + if (CONFIG.verbose) console.log(` [gerrit] changeset page S=${start}`) - let page: unknown[] - try { - page = await fetchChangePage(url) - } catch (err) { - console.error(` [gerrit] error at S=${start}: ${err}`) - counts.skipErrors++ - break + const page = await fetchPageWithRetry(url, `S=${start}`) + if (!page) { result.skipErrors++; break } + if (!Array.isArray(page) || page.length === 0) break + + result.pagesWalked++ + const hasMore = (page[page.length - 1] as Record)._more_changes === true + + for (const raw of page) { + const c = raw as Record + const num = String(c['_number'] ?? c['number'] ?? '') + if (!num) { result.skipErrors++; continue } + result.all.add(num) + if (c['status'] === 'MERGED') result.merged.add(num) + if (c['status'] === 'ABANDONED') result.abandoned.add(num) } + if (!hasMore) break + start += CONFIG.gerritPageSize + await sleep(CONFIG.gerritRequestDelayMs) + } + + return result +} + +// Patchset walk — separate, fetches ALL_REVISIONS + DETAILED_LABELS +async function fetchGerritPatchsets(orgUrl: string, repoName: string): Promise { + const result: GerritPatchsets = { created: new Set(), approvals: new Set(), pagesWalked: 0, skipErrors: 0 } + const base = orgUrl.replace(/\/$/, '') + const sinceFilter = CONFIG.since ? `+after:"${CONFIG.since}"` : '' + let start = 0 + + while (true) { + const url = + `${base}/changes/?q=project:${encodeURIComponent(repoName)}${sinceFilter}` + + `&o=ALL_REVISIONS&o=DETAILED_LABELS&n=${CONFIG.gerritPageSize}&S=${start}` + if (CONFIG.verbose) console.log(` [gerrit] patchset page S=${start}`) + + const page = await fetchPageWithRetry(url, `patchset S=${start}`) + if (!page) { result.skipErrors++; break } if (!Array.isArray(page) || page.length === 0) break - counts.pagesWalked++ + result.pagesWalked++ const hasMore = (page[page.length - 1] as Record)._more_changes === true for (const raw of page) { - const change = raw as Record - try { - counts['changeset-created']++ - if (change['status'] === 'MERGED') counts['changeset-merged']++ - if (change['status'] === 'ABANDONED') counts['changeset-abandoned']++ - - counts['changeset_comment-created'] += ((change['messages'] as unknown[]) ?? []).length - - counts['patchset-created'] += Object.keys( - (change['revisions'] as Record) ?? {}, - ).length - - for (const labelData of Object.values( - (change['labels'] as Record) ?? {}, - )) { - for (const vote of labelData.all ?? []) { - if (vote.value !== undefined && vote.value !== 0) { - counts['patchset_approval-created']++ - } - } + const c = raw as Record + const changeNum = String(c['_number'] ?? '') + if (!changeNum) { result.skipErrors++; continue } + + // patchset-created: one per revision + const revisions = c['revisions'] as Record> | undefined + if (revisions) { + for (const revDetail of Object.values(revisions)) { + const revNum = revDetail['_number'] + if (revNum != null) result.created.add(`${changeNum}-${revNum}`) } + } - counts.changesProcessed++ - } catch { - counts.skipErrors++ + // patchset_approval-created: sourceId = `${change.id}-${vote._account_id}` (mirrors Nango mapper) + const changeId = String(c['id'] ?? '') + const labels = c['labels'] as Record }> | undefined + const codeReview = labels?.['Code-Review']?.all ?? [] + for (const vote of codeReview) { + if (vote.value != null && vote.value !== 0 && vote._account_id != null) { + result.approvals.add(`${changeId}-${vote._account_id}`) + } } } - if (CONFIG.verbose) console.log(` [gerrit] cumulative: ${counts.changesProcessed} changes`) if (!hasMore) break start += CONFIG.gerritPageSize + await sleep(CONFIG.gerritRequestDelayMs) } - return counts + return result } // ─── tinybird ───────────────────────────────────────────────────────────────── @@ -246,163 +282,212 @@ async function walkGerritChanges(orgUrl: string, repoName: string): Promise(sql: string): Promise { const resp = await fetch(`${CONFIG.tinybird.baseUrl}/v0/sql`, { method: 'POST', - headers: { - Authorization: `Bearer ${CONFIG.tinybird.token}`, - 'Content-Type': 'application/json', - }, + headers: { Authorization: `Bearer ${CONFIG.tinybird.token}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ q: sql }), }) - if (!resp.ok) { - const body = await resp.text() - throw new Error(`Tinybird ${resp.status}: ${body}`) - } + if (!resp.ok) throw new Error(`Tinybird ${resp.status}: ${await resp.text()}`) return resp.json() as Promise } -async function getLocalCounts(channelUrl: string): Promise> { - const escaped = channelUrl.replace(/'/g, "\\'") - const sql = - `SELECT type, count() AS cnt ` + - `FROM activityRelations_deduplicated_cleaned_bucket_union ` + - `WHERE platform = 'gerrit' AND channel = '${escaped}' ` + - `GROUP BY type FORMAT JSON` +function tinybirdWhereClause(channelUrl: string, segmentId: string, types: string[]): string { + const ch = channelUrl.replace(/'/g, "\\'") + const seg = segmentId.replace(/'/g, "\\'") + const sinceClause = CONFIG.since ? `AND timestamp >= '${CONFIG.since}' ` : '' + const typeList = types.map((t) => `'${t}'`).join(', ') + return ( + `segmentId = '${seg}' ` + + `AND platform = 'gerrit' ` + + `AND channel = '${ch}' ` + + sinceClause + + `AND type IN (${typeList})` + ) +} - const result = await tinybirdSql<{ data: Array<{ type: string; cnt: number }> }>(sql) - const counts: Record = {} +// Phase 1: one GROUP BY query for all requested types +async function fetchTinybirdCounts( + channelUrl: string, + segmentId: string, + types: readonly string[], +): Promise> { + const sql = + `SELECT type, COUNT(DISTINCT sourceId) AS cnt ` + + `FROM activityRelations FINAL ` + + `WHERE ${tinybirdWhereClause(channelUrl, segmentId, [...types])} ` + + `GROUP BY type ` + + `FORMAT JSON` + + const result = await tinybirdSql<{ data: Array<{ type: string; cnt: string }> }>(sql) + const counts: Record = Object.fromEntries(types.map((t) => [t, 0])) for (const row of result.data ?? []) { - counts[row.type] = Number(row.cnt) + if (row.type in counts) counts[row.type] = Number(row.cnt) } return counts } -// ─── report ─────────────────────────────────────────────────────────────────── +// Phase 2: sourceIds for a single type — only called when count differs +async function fetchTinybirdSourceIds( + channelUrl: string, + segmentId: string, + type: string, +): Promise> { + const ch = channelUrl.replace(/'/g, "\\'") + const seg = segmentId.replace(/'/g, "\\'") + const sinceClause = CONFIG.since ? `AND timestamp >= '${CONFIG.since}' ` : '' + const sql = + `SELECT sourceId ` + + `FROM activityRelations FINAL ` + + `WHERE segmentId = '${seg}' ` + + `AND platform = 'gerrit' ` + + `AND channel = '${ch}' ` + + sinceClause + + `AND type = '${type}' ` + + `FORMAT JSON` + + const result = await tinybirdSql<{ data: Array<{ sourceId: string }> }>(sql) + return new Set((result.data ?? []).map((r) => r.sourceId)) +} -const IN_SCOPE: Array> = - [ - 'changeset-created', - 'changeset-merged', - 'changeset-abandoned', - 'changeset_comment-created', - 'patchset-created', - 'patchset_approval-created', - ] +// ─── diff ───────────────────────────────────────────────────────────────────── -const EXCLUDED = new Set(['changeset-new', 'changeset-closed', 'patchset_comment-created']) +function setDiff(a: Set, b: Set): string[] { + return [...a].filter((x) => !b.has(x)).sort((x, y) => Number(x) - Number(y)) +} -function buildReport( - url: string, - channelUrl: string, - integrationId: string, - repoName: string, - remote: RemoteCounts, - local: Record, - durationMs: number, -): Report { - const results: TypeResult[] = [] - - for (const type of IN_SCOPE) { - const r = remote[type] - const l = local[type] ?? 0 - results.push({ - type, - remote: r, - local: l, - diff: l - r, - note: - type === 'patchset_approval-created' - ? 'remote = current-state votes only (non-historical estimate)' - : undefined, - }) +function buildDiff(type: string, gerrit: Set, tinybird: Set): TypeDiff { + return { + type, + gerrit: gerrit.size, + tinybird: tinybird.size, + diff: tinybird.size - gerrit.size, + missingFromTinybird: setDiff(gerrit, tinybird), + extraInTinybird: setDiff(tinybird, gerrit), + sourceIdDiffSkipped: false, } +} - // Surface excluded types that unexpectedly have local rows - for (const type of EXCLUDED) { - const l = local[type] ?? 0 - if (l > 0) { - const reason = - type === 'changeset-new' - ? 'dead type (registry-deleted by migration V1759325136)' - : type === 'changeset-closed' - ? 'derivation logic lives in external Nango sync only' - : 'out of scope (Tier 3)' - results.push({ type, remote: 'n/a', local: l, diff: 'n/a', note: reason }) - } +function buildDiffCountsOnly(type: string, gerritCount: number, tinybirdCount: number, note?: string): TypeDiff { + return { + type, + gerrit: gerritCount, + tinybird: tinybirdCount, + diff: gerritCount === -1 ? 0 : tinybirdCount - gerritCount, + missingFromTinybird: [], + extraInTinybird: [], + sourceIdDiffSkipped: false, + note, } +} - // Any other unexpected local types - for (const type of Object.keys(local)) { - if (!(IN_SCOPE as string[]).includes(type) && !EXCLUDED.has(type)) { - results.push({ type, remote: 'n/a', local: local[type], diff: 'n/a', note: 'unknown type' }) +async function buildTypeDiffsWithSets( + channelUrl: string, + segmentId: string, + typeConfigs: Array<{ type: string; gerritSet?: Set; gerritCount?: number; countOnly?: boolean; note?: string }>, + tinybirdCounts: Record, +): Promise { + const diffs: TypeDiff[] = [] + for (const cfg of typeConfigs) { + const tbCount = tinybirdCounts[cfg.type] ?? 0 + + if (cfg.countOnly || cfg.gerritSet == null) { + const gCount = cfg.gerritCount ?? -1 + diffs.push(buildDiffCountsOnly(cfg.type, gCount, tbCount, cfg.note)) + } else { + const gCount = cfg.gerritSet.size + if (gCount === tbCount) { + diffs.push(buildDiffCountsOnly(cfg.type, gCount, tbCount, cfg.note)) + } else { + console.log(` [tinybird] count diff on ${cfg.type} (${gCount} vs ${tbCount}) — fetching sourceIds...`) + const tbIds = await fetchTinybirdSourceIds(channelUrl, segmentId, cfg.type) + diffs.push(buildDiff(cfg.type, cfg.gerritSet, tbIds)) + } } } + return diffs +} - return { - url, - channelUrl, - integrationId, - repoName, - runAt: new Date().toISOString(), - durationMs, - results, - gerritStats: { - pagesWalked: remote.pagesWalked, - changesProcessed: remote.changesProcessed, - skipErrors: remote.skipErrors, - }, +// ─── report ─────────────────────────────────────────────────────────────────── + +function printSection(title: string, rows: TypeDiff[]): void { + const W = 30 + const N = 9 + const header = + 'Type'.padEnd(W) + + 'Gerrit'.padStart(N) + + 'Tinybird'.padStart(N) + + 'Δ'.padStart(N) + + ' Missing'.padStart(N) + + ' Extra'.padStart(N) + + console.log(`\n── ${title} ${'─'.repeat(Math.max(0, header.length - title.length - 4))}`) + console.log(header) + console.log('─'.repeat(header.length)) + + for (const r of rows) { + const gerritLabel = r.gerrit === -1 ? 'N/A' : r.gerrit.toLocaleString() + const diff = r.gerrit === -1 ? 'N/A' : `${r.diff > 0 ? '+' : ''}${r.diff.toLocaleString()}` + const missingLabel = r.sourceIdDiffSkipped ? '(skipped)' : r.missingFromTinybird.length.toLocaleString() + const extraLabel = r.sourceIdDiffSkipped ? '(skipped)' : r.extraInTinybird.length.toLocaleString() + console.log( + r.type.padEnd(W) + + gerritLabel.padStart(N) + + r.tinybird.toLocaleString().padStart(N) + + diff.padStart(N) + + missingLabel.padStart(N + 2) + + extraLabel.padStart(N + 2), + ) + if (r.note) console.log(` * ${r.note}`) + if (!r.sourceIdDiffSkipped) { + if (r.missingFromTinybird.length > 0 && r.missingFromTinybird.length <= 20) { + console.log(` missing from tinybird: ${r.missingFromTinybird.join(', ')}`) + } + if (r.extraInTinybird.length > 0 && r.extraInTinybird.length <= 20) { + console.log(` extra in tinybird: ${r.extraInTinybird.join(', ')}`) + } + } } } function printReport(report: Report): void { - const W = 38 - const N = 11 - const header = 'Type'.padEnd(W) + 'Remote'.padStart(N) + 'Local'.padStart(N) + 'Δ'.padStart(N) - console.log('') console.log(`Repo ${report.url}`) console.log(`Channel ${report.channelUrl}`) console.log(`Integration ${report.integrationId}`) console.log(`Run at ${report.runAt} (${(report.durationMs / 1000).toFixed(1)}s)`) - console.log( - `Gerrit ${report.gerritStats.changesProcessed} changes / ` + - `${report.gerritStats.pagesWalked} pages` + - (report.gerritStats.skipErrors ? ` / ${report.gerritStats.skipErrors} skip errors` : ''), - ) - console.log('') - console.log(header) - console.log('─'.repeat(header.length)) + console.log(`Gerrit ${report.gerritStats.changesProcessed} changes / ${report.gerritStats.pagesWalked} pages`) - for (const r of report.results) { - const remote = r.remote === 'n/a' ? 'n/a' : (r.remote as number).toLocaleString() - const local = r.local.toLocaleString() - const diff = - r.diff === 'n/a' - ? 'n/a' - : `${(r.diff as number) > 0 ? '+' : ''}${(r.diff as number).toLocaleString()}` - console.log(r.type.padEnd(W) + remote.padStart(N) + local.padStart(N) + diff.padStart(N)) - if (r.note) console.log(`${''.padEnd(W + 1)}* ${r.note}`) - } + printSection('Changesets', report.changeset) + if (report.patchset) printSection('Patchsets', report.patchset) console.log('') } // ─── main ───────────────────────────────────────────────────────────────────── +const CHANGESET_TYPES = ['changeset-created', 'changeset-merged', 'changeset-abandoned'] as const +const PATCHSET_TYPES = ['patchset-created', 'patchset_approval-created', 'patchset_comment-created'] as const + async function main(): Promise { - const rawUrl = process.argv[2] + const args = process.argv.slice(2) + const rawUrl = args.find((a) => !a.startsWith('--')) if (!rawUrl) { - console.error('Usage: pnpm tsx src/bin/validate-gerrit-counts.ts ') + console.error('Usage: pnpm tsx src/bin/validate-gerrit-counts.ts [--since=] [--include-patchsets]') process.exit(1) } + const sinceArg = args.find((a) => a.startsWith('--since=')) + if (sinceArg) CONFIG.since = sinceArg.slice('--since='.length) + if (args.includes('--include-patchsets')) CONFIG.includePatchsets = true + validateConfig() const startMs = Date.now() - console.log(`\nValidating Gerrit counts for: ${rawUrl}`) - + console.log(`\nValidating Gerrit changesets for: ${rawUrl}`) const { plainUrl, channelUrl } = parseRepoUrl(rawUrl) - if (plainUrl !== rawUrl.trim()) console.log(`Normalized to: ${plainUrl}`) - console.log(`Channel URL: ${channelUrl}`) + if (plainUrl !== rawUrl.trim()) console.log(`Normalized to: ${plainUrl}`) + console.log(`Channel URL: ${channelUrl}`) + if (CONFIG.since) console.log(`Since: ${CONFIG.since} (incremental)`) + if (CONFIG.includePatchsets) console.log(`Mode: changesets + patchsets`) - // ── DB lookup ── + // DB lookup console.log(`\nLooking up in CDP database...`) const pgp = pgPromise() const db = pgp({ @@ -415,39 +500,91 @@ async function main(): Promise { }) let integrationId: string + let segmentId: string let orgUrl: string let repoName: string - try { const info = await validateInDb(db, plainUrl) integrationId = info.integrationId + segmentId = info.segmentId orgUrl = info.orgUrl repoName = info.repoName console.log(` Integration: ${integrationId}`) - console.log(` Org URL: ${orgUrl}`) + console.log(` Segment: ${segmentId}`) console.log(` Repo: ${repoName}`) } finally { pgp.end() } - // ── Remote counts ── - console.log(`\nFetching remote counts from Gerrit (page size: ${CONFIG.gerritPageSize})...`) - const remote = await walkGerritChanges(orgUrl, repoName) - console.log(` ${remote.changesProcessed} changes in ${remote.pagesWalked} pages`) - if (remote.skipErrors > 0) console.warn(` ⚠ ${remote.skipErrors} skip errors`) + // ── Changesets ────────────────────────────────────────────────────────────── + + console.log(`\nFetching changeset data from Gerrit...`) + const gerrit = await fetchGerritChanges(orgUrl, repoName) + console.log(` ${gerrit.all.size} total ${gerrit.merged.size} merged ${gerrit.abandoned.size} abandoned`) + + console.log(`\nFetching changeset counts from Tinybird...`) + const tbChangesetCounts = await fetchTinybirdCounts(channelUrl, segmentId, CHANGESET_TYPES) + console.log(` ${tbChangesetCounts['changeset-created']} created ${tbChangesetCounts['changeset-merged']} merged ${tbChangesetCounts['changeset-abandoned']} abandoned`) + + const changeset = await buildTypeDiffsWithSets(channelUrl, segmentId, [ + { type: 'changeset-created', gerritSet: gerrit.all }, + { type: 'changeset-merged', gerritSet: gerrit.merged }, + { type: 'changeset-abandoned', gerritSet: gerrit.abandoned }, + ], tbChangesetCounts) + + // ── Patchsets (optional) ──────────────────────────────────────────────────── + + let patchset: TypeDiff[] | undefined + let gerritPatchsetStats: Report['gerritPatchsetStats'] + + if (CONFIG.includePatchsets) { + console.log(`\nFetching patchset data from Gerrit (ALL_REVISIONS + DETAILED_LABELS)...`) + const gerritPs = await fetchGerritPatchsets(orgUrl, repoName) + console.log(` ${gerritPs.created.size} patchsets ${gerritPs.approvals.size} approvals`) + gerritPatchsetStats = { pagesWalked: gerritPs.pagesWalked, skipErrors: gerritPs.skipErrors } + + console.log(`\nFetching patchset counts from Tinybird...`) + const tbPatchsetCounts = await fetchTinybirdCounts(channelUrl, segmentId, PATCHSET_TYPES) + console.log(` ${tbPatchsetCounts['patchset-created']} created ${tbPatchsetCounts['patchset_approval-created']} approvals ${tbPatchsetCounts['patchset_comment-created']} comments`) + + patchset = await buildTypeDiffsWithSets(channelUrl, segmentId, [ + { type: 'patchset-created', gerritSet: gerritPs.created }, + { type: 'patchset_approval-created', gerritSet: gerritPs.approvals }, + // Gerrit count requires one extra API call per change — omitted + { + type: 'patchset_comment-created', + gerritCount: -1, + countOnly: true, + note: 'Gerrit count requires per-change API calls — showing Tinybird count only', + }, + ], tbPatchsetCounts) + + } - // ── Local counts ── - console.log(`\nFetching local counts from Tinybird...`) - const local = await getLocalCounts(channelUrl) - console.log(` ${Object.keys(local).length} activity types found locally`) + // ── Report ────────────────────────────────────────────────────────────────── - // ── Report ── const durationMs = Date.now() - startMs - const report = buildReport(plainUrl, channelUrl, integrationId, repoName, remote, local, durationMs) + const report: Report = { + url: plainUrl, + channelUrl, + integrationId, + repoName, + runAt: new Date().toISOString(), + durationMs, + changeset, + patchset, + gerritStats: { + pagesWalked: gerrit.pagesWalked, + changesProcessed: gerrit.all.size, + skipErrors: gerrit.skipErrors, + }, + gerritPatchsetStats, + } printReport(report) const slug = `${new URL(plainUrl).hostname}-${repoName.replace(/\//g, '-')}` const ts = new Date().toISOString().replace(/[:.]/g, '-') + await mkdir(CONFIG.outputDir, { recursive: true }) const outputPath = `${CONFIG.outputDir}/validation-${slug}-${ts}.json` await writeFile(outputPath, JSON.stringify(report, null, 2), 'utf8') console.log(`Report saved: ${outputPath}`) @@ -455,5 +592,6 @@ async function main(): Promise { main().catch((err) => { console.error(`\nError: ${err instanceof Error ? err.message : String(err)}`) + if (err instanceof Error && err.stack) console.error(err.stack) process.exit(1) })