-
Notifications
You must be signed in to change notification settings - Fork 731
feat: add recalculate all affiliation (IN-1083) #4038
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ulemons
wants to merge
8
commits into
main
Choose a base branch
from
feat/optimize-fetch-for-affiliation-recalculation
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+341
−0
Open
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
e12aad1
feat: add recalculate all affiliation
ulemons 93e6f36
fix: apply limit also in dry run
ulemons 7094e5e
fix: use uuid
ulemons c93f174
feat: add max pages
ulemons d20ab23
feat: add empty page delay
ulemons 8db2257
fix: optimize logging
ulemons b9782ca
fix: lint
ulemons c0a7980
refactor: remove useless code
ulemons File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
340 changes: 340 additions & 0 deletions
340
services/apps/script_executor_worker/src/bin/recalculate-all-affiliations.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,340 @@ | ||
| import { WorkflowIdReusePolicy } from '@temporalio/client' | ||
|
|
||
| import { DEFAULT_TENANT_ID } from '@crowd/common' | ||
| import { WRITE_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database' | ||
| import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor' | ||
| import { getServiceChildLogger } from '@crowd/logging' | ||
| import { TEMPORAL_CONFIG, getTemporalClient } from '@crowd/temporal' | ||
|
|
||
| const log = getServiceChildLogger('recalculate-all-affiliations') | ||
|
|
||
| interface BrokenMember { | ||
| memberId: string | ||
| activeOrgIds: string[] | ||
| staleOrgIds: string[] | ||
| } | ||
|
|
||
| interface ScriptOptions { | ||
| pageSize: number | ||
| concurrency: number | ||
| pageDelayMs: number | ||
| workflowDelayMs: number | ||
| startAfter: string | null | ||
| dryRun: boolean | ||
| limit: number | null | ||
| maxPages: number | null | ||
| emptyPageDelayMs: number | null | ||
| } | ||
|
|
||
| function parseArgs(): ScriptOptions { | ||
| const args = process.argv.slice(2) | ||
|
|
||
| const getArg = (flag: string): string | undefined => { | ||
| const idx = args.indexOf(flag) | ||
| if (idx !== -1 && idx + 1 < args.length) return args[idx + 1] | ||
| return undefined | ||
| } | ||
|
|
||
| const pageSize = parseInt(getArg('--page-size') ?? '100', 10) | ||
| const concurrency = parseInt(getArg('--concurrency') ?? '20', 10) | ||
| const pageDelayMs = parseInt(getArg('--page-delay') ?? '2000', 10) | ||
| const workflowDelayMs = parseInt(getArg('--workflow-delay') ?? '0', 10) | ||
| const startAfter = getArg('--start-after') ?? null | ||
| const dryRun = args.includes('--dry-run') | ||
| const emptyPageDelayRaw = getArg('--empty-page-delay') | ||
| const emptyPageDelayMs = emptyPageDelayRaw !== undefined ? parseInt(emptyPageDelayRaw, 10) : null | ||
| const limitRaw = getArg('--limit') | ||
| const limit = limitRaw !== undefined ? parseInt(limitRaw, 10) : null | ||
| const maxPagesRaw = getArg('--max-pages') | ||
| const maxPages = maxPagesRaw !== undefined ? parseInt(maxPagesRaw, 10) : null | ||
|
|
||
| if (isNaN(pageSize) || pageSize <= 0) { | ||
| log.error('--page-size must be a positive integer') | ||
| process.exit(1) | ||
| } | ||
| if (isNaN(concurrency) || concurrency <= 0) { | ||
| log.error('--concurrency must be a positive integer') | ||
| process.exit(1) | ||
| } | ||
| if (isNaN(pageDelayMs) || pageDelayMs < 0) { | ||
| log.error('--page-delay must be a non-negative integer') | ||
| process.exit(1) | ||
| } | ||
| if (isNaN(workflowDelayMs) || workflowDelayMs < 0) { | ||
| log.error('--workflow-delay must be a non-negative integer') | ||
| process.exit(1) | ||
| } | ||
| if (emptyPageDelayMs !== null && (isNaN(emptyPageDelayMs) || emptyPageDelayMs < 0)) { | ||
| log.error('--empty-page-delay must be a non-negative integer') | ||
| process.exit(1) | ||
| } | ||
| if (limit !== null && (isNaN(limit) || limit <= 0)) { | ||
| log.error('--limit must be a positive integer') | ||
| process.exit(1) | ||
| } | ||
| if (maxPages !== null && (isNaN(maxPages) || maxPages <= 0)) { | ||
| log.error('--max-pages must be a positive integer') | ||
| process.exit(1) | ||
| } | ||
|
|
||
| return { | ||
| pageSize, | ||
| concurrency, | ||
| pageDelayMs, | ||
| workflowDelayMs, | ||
| startAfter, | ||
| dryRun, | ||
| limit, | ||
| maxPages, | ||
| emptyPageDelayMs, | ||
| } | ||
| } | ||
|
|
||
| async function fetchMemberIdPage( | ||
| qx: ReturnType<typeof pgpQx>, | ||
| afterMemberId: string | null, | ||
| pageSize: number, | ||
| ): Promise<string[]> { | ||
| const cursorClause = afterMemberId ? `AND "memberId" > $(afterMemberId)` : '' | ||
|
|
||
| const rows = await qx.select( | ||
| ` | ||
| SELECT "memberId" | ||
| FROM "memberOrganizations" | ||
| WHERE TRUE ${cursorClause} | ||
| GROUP BY "memberId" | ||
| ORDER BY "memberId" | ||
| LIMIT $(pageSize) | ||
| `, | ||
| { afterMemberId, pageSize }, | ||
| ) | ||
|
|
||
| return rows.map((r: Record<string, unknown>) => r.memberId as string) | ||
| } | ||
|
|
||
| async function findBrokenMembers( | ||
| qx: ReturnType<typeof pgpQx>, | ||
| memberIds: string[], | ||
| ): Promise<BrokenMember[]> { | ||
| // Deduplicate to (memberId, organizationId) pairs first — a member may have thousands | ||
| // of activities but only a handful of distinct org attributions. | ||
| const staleRows = await qx.select( | ||
| ` | ||
| WITH pairs AS ( | ||
| SELECT DISTINCT "memberId", "organizationId" | ||
| FROM "activityRelations" | ||
| WHERE "memberId" = ANY($(memberIds)::uuid[]) | ||
| AND "organizationId" IS NOT NULL | ||
| ) | ||
| SELECT p."memberId", p."organizationId" AS "staleOrgId" | ||
| FROM pairs p | ||
| WHERE NOT EXISTS ( | ||
| SELECT 1 FROM "memberOrganizations" mo | ||
| WHERE mo."memberId" = p."memberId" | ||
| AND mo."organizationId" = p."organizationId" | ||
| AND mo."deletedAt" IS NULL | ||
| ) | ||
| `, | ||
| { memberIds }, | ||
| ) | ||
|
|
||
| if (staleRows.length === 0) { | ||
| return [] | ||
| } | ||
|
|
||
| const staleMap = new Map<string, string[]>() | ||
| for (const r of staleRows as Record<string, string>[]) { | ||
| const existing = staleMap.get(r.memberId) ?? [] | ||
| existing.push(r.staleOrgId) | ||
| staleMap.set(r.memberId, existing) | ||
| } | ||
|
|
||
| const brokenMemberIds = [...staleMap.keys()] | ||
|
|
||
| const orgRows = await qx.select( | ||
| ` | ||
| SELECT "memberId", array_agg(DISTINCT "organizationId") AS "activeOrgIds" | ||
| FROM "memberOrganizations" | ||
| WHERE "memberId" = ANY($(brokenMemberIds)::uuid[]) | ||
| AND "deletedAt" IS NULL | ||
| GROUP BY "memberId" | ||
| `, | ||
| { brokenMemberIds }, | ||
| ) | ||
|
|
||
| const activeOrgMap = new Map<string, string[]>( | ||
| orgRows.map((r: Record<string, unknown>) => [ | ||
| r.memberId as string, | ||
| (r.activeOrgIds as string[] | null) ?? [], | ||
| ]), | ||
| ) | ||
|
|
||
| return brokenMemberIds.map((memberId: string) => ({ | ||
| memberId, | ||
| activeOrgIds: activeOrgMap.get(memberId) ?? [], | ||
| staleOrgIds: staleMap.get(memberId) ?? [], | ||
| })) | ||
| } | ||
|
|
||
| async function main() { | ||
| const opts = parseArgs() | ||
|
|
||
| log.info( | ||
| { | ||
| pageSize: opts.pageSize, | ||
| concurrency: opts.concurrency, | ||
| pageDelayMs: opts.pageDelayMs, | ||
| workflowDelayMs: opts.workflowDelayMs, | ||
| startAfter: opts.startAfter, | ||
| dryRun: opts.dryRun, | ||
| limit: opts.limit, | ||
| maxPages: opts.maxPages, | ||
| emptyPageDelayMs: opts.emptyPageDelayMs, | ||
| }, | ||
| 'Starting recalculate-all-affiliations', | ||
| ) | ||
|
|
||
| const dbConnection = await getDbConnection(WRITE_DB_CONFIG()) | ||
| const qx = pgpQx(dbConnection) | ||
| const temporal = await getTemporalClient(TEMPORAL_CONFIG()) | ||
|
|
||
| let cursor: string | null = opts.startAfter | ||
| let pageNum = 0 | ||
| let totalScanned = 0 | ||
| let totalBroken = 0 | ||
| let totalSucceeded = 0 | ||
| let totalFailed = 0 | ||
|
|
||
| let hasMore = true | ||
| while (hasMore) { | ||
| pageNum++ | ||
|
|
||
| const memberIds = await fetchMemberIdPage(qx, cursor, opts.pageSize) | ||
|
|
||
| if (memberIds.length === 0) { | ||
| log.info('No more members to process.') | ||
| hasMore = false | ||
| continue | ||
| } | ||
|
|
||
| const lastMemberId = memberIds[memberIds.length - 1] | ||
| totalScanned += memberIds.length | ||
|
|
||
| const brokenMembers = await findBrokenMembers(qx, memberIds) | ||
| totalBroken += brokenMembers.length | ||
|
|
||
| log.info( | ||
| `Page ${pageNum}: scanned ${memberIds.length} | broken: ${brokenMembers.length} | cursor: ${lastMemberId}`, | ||
| ) | ||
|
|
||
| if (brokenMembers.length > 0) { | ||
| if (opts.dryRun) { | ||
| const loggedSoFar = totalBroken - brokenMembers.length | ||
| const remaining = opts.limit !== null ? opts.limit - loggedSoFar : brokenMembers.length | ||
| const toLog = brokenMembers.slice(0, remaining) | ||
| for (const { memberId, activeOrgIds, staleOrgIds } of toLog) { | ||
| log.info( | ||
| `[DRY RUN] broken member: ${memberId} | stale orgs: [${staleOrgIds.join(', ')}] | active orgs: ${activeOrgIds.length}`, | ||
| ) | ||
| } | ||
| if (opts.limit !== null && loggedSoFar + toLog.length >= opts.limit) { | ||
| log.info(`Limit of ${opts.limit} members reached.`) | ||
| hasMore = false | ||
| cursor = lastMemberId | ||
| continue | ||
| } | ||
| } else { | ||
| const triggeredSoFar = totalSucceeded + totalFailed | ||
| const remaining = opts.limit !== null ? opts.limit - triggeredSoFar : brokenMembers.length | ||
| if (remaining <= 0) { | ||
| log.info(`Limit of ${opts.limit} workflows reached.`) | ||
| hasMore = false | ||
| continue | ||
| } | ||
|
|
||
| const toProcess = brokenMembers.slice(0, remaining) | ||
|
|
||
| let index = 0 | ||
| const worker = async () => { | ||
| while (index < toProcess.length) { | ||
| const { memberId, activeOrgIds, staleOrgIds } = toProcess[index++] | ||
| try { | ||
| log.info( | ||
| `Triggering memberUpdate: ${memberId} | stale orgs: [${staleOrgIds.join(', ')}] | active orgs: ${activeOrgIds.length}`, | ||
| ) | ||
| await temporal.workflow.start('memberUpdate', { | ||
| taskQueue: 'profiles', | ||
| workflowId: `member-update/${DEFAULT_TENANT_ID}/${memberId}`, | ||
| workflowIdReusePolicy: | ||
| WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING, | ||
| retry: { maximumAttempts: 10 }, | ||
| args: [ | ||
| { | ||
| member: { id: memberId }, | ||
| memberOrganizationIds: activeOrgIds, | ||
| syncToOpensearch: true, | ||
| }, | ||
| ], | ||
| searchAttributes: { TenantId: [DEFAULT_TENANT_ID] }, | ||
| }) | ||
| if (opts.workflowDelayMs > 0) { | ||
| await new Promise((resolve) => setTimeout(resolve, opts.workflowDelayMs)) | ||
| } | ||
| totalSucceeded++ | ||
| } catch (err) { | ||
| totalFailed++ | ||
| log.error({ err }, 'Failed to process member') | ||
| } | ||
| } | ||
| } | ||
|
|
||
| await Promise.all(Array.from({ length: Math.min(opts.concurrency, toProcess.length) }, worker)) | ||
|
Check failure on line 291 in services/apps/script_executor_worker/src/bin/recalculate-all-affiliations.ts
|
||
| log.info(`Page ${pageNum} done: ${totalSucceeded} ok, ${totalFailed} failed`) | ||
|
|
||
| if (opts.limit !== null && totalSucceeded + totalFailed >= opts.limit) { | ||
| log.info(`Limit of ${opts.limit} workflows reached.`) | ||
| hasMore = false | ||
| continue | ||
| } | ||
| } | ||
| } | ||
|
|
||
| log.info(`Resume with: --start-after ${lastMemberId}`) | ||
| cursor = lastMemberId | ||
|
|
||
| if (memberIds.length < opts.pageSize) { | ||
| hasMore = false | ||
| } | ||
|
|
||
| if (opts.maxPages !== null && pageNum >= opts.maxPages) { | ||
| log.info(`Max pages of ${opts.maxPages} reached.`) | ||
| hasMore = false | ||
| } | ||
|
|
||
| const delayMs = | ||
| brokenMembers.length === 0 && opts.emptyPageDelayMs !== null | ||
| ? opts.emptyPageDelayMs | ||
| : opts.pageDelayMs | ||
| if (delayMs > 0) { | ||
| await new Promise((resolve) => setTimeout(resolve, delayMs)) | ||
| } | ||
| } | ||
|
|
||
| const brokenPct = totalScanned > 0 ? ((totalBroken / totalScanned) * 100).toFixed(2) : '0.00' | ||
| log.info( | ||
| { | ||
| pagesProcessed: pageNum, | ||
| membersScanned: totalScanned, | ||
| membersBroken: `${totalBroken} (${brokenPct}%)`, | ||
| ...(opts.dryRun ? {} : { workflowsSucceeded: totalSucceeded, workflowsFailed: totalFailed }), | ||
| }, | ||
| 'Done', | ||
| ) | ||
|
|
||
| process.exit(totalFailed > 0 ? 1 : 0) | ||
| } | ||
|
|
||
| main().catch((err) => { | ||
| log.error({ err }, 'Unexpected error') | ||
| process.exit(1) | ||
| }) | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stale orgs excluded from OpenSearch sync after recalculation
Medium Severity
The
memberOrganizationIdspassed to thememberUpdateworkflow only containsactiveOrgIds, butstaleOrgIdsare also needed. AfterrefreshMemberOrganizationAffiliationsrecalculates affiliations in the DB, activities previously attributed to stale orgs get re-attributed. The stale orgs' data in OpenSearch remains outdated becausesyncOrganizationis never called for them. BothactiveOrgIdsandstaleOrgIdsneed to be included so all affected organizations are synced.Additional Locations (1)
services/apps/script_executor_worker/src/bin/recalculate-all-affiliations.ts#L164-L176Reviewed by Cursor Bugbot for commit c0a7980. Configure here.