feat: infer memberOrganization stint dates from work-email activities (CM-1105)#4054
feat: infer memberOrganization stint dates from work-email activities (CM-1105)#4054
Conversation
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Adds infrastructure to infer and persist memberOrganizations stint dates from verified work-email activities, so email-domain affiliations become timeline-aware and can compete with enrichment on overlaps.
Changes:
- Extend affiliation resolution to bias toward email-domain rows when a verified email domain is present, and add a source-priority tier in
decidePrimaryOrganizationId. - Buffer
(memberId, orgId, YYYY-MM-DD)activity evidence in Redis on the ingestion hot path and introduce a cron job to infer stint insert/update operations from buffered dates. - Add a partial Postgres index to speed up per-member fetches of
email-domainmemberOrganizations; remove legacy mapping scripts and rename the shared member-organization service file.
Reviewed changes
Copilot reviewed 14 out of 16 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| services/libs/types/src/organizations.ts | Adds shared types for buffered org-dates and inferred stint changes. |
| services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts | Extends work-experience data shape to include source. |
| services/libs/data-access-layer/src/members/segments.ts | Adds optional email-domain candidate inclusion in findMemberWorkExperience. |
| services/libs/data-access-layer/src/members/organizations.ts | Adds fetchMemberOrganizationsBySource for cron’s targeted reads. |
| services/libs/common_services/src/services/memberOrganization.ts | Deleted (renamed). |
| services/libs/common_services/src/services/member/unmerge.ts | Updates import to new member-organization module path. |
| services/libs/common_services/src/services/member-organization.ts | New module: keeps unmerge helpers and adds stint inference logic + Redis key constants. |
| services/libs/common_services/src/services/index.ts | Re-exports renamed member-organization module. |
| services/libs/common_services/src/services/common.member.service.ts | Threads emailDomain through findAffiliation and adds source-priority selection logic. |
| services/apps/data_sink_worker/src/service/member.service.ts | Buffers per-member per-org activity dates in Redis and enqueues member IDs for cron. |
| services/apps/data_sink_worker/src/service/activity.service.ts | Extracts verified email domain from activity payload and passes it into affiliation lookup. |
| services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts | Removed outdated script. |
| services/apps/data_sink_worker/src/bin/map-member-to-org.ts | Removed outdated script. |
| services/apps/data_sink_worker/package.json | Removes script entries for deleted bin scripts. |
| services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts | New cron job to drain Redis buffers and compute stint changes (currently dry-run). |
| backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql | Adds partial index to support efficient per-member email-domain org reads. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 7b7e744. Configure here.
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 16 changed files in this pull request and generated 5 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 14 out of 16 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| try { | ||
| const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` | ||
| const hash = await redis.hGetAll(datesKey) | ||
|
|
||
| // If no data, just remove from queue and move on | ||
| if (!hash || Object.keys(hash).length === 0) { | ||
| await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) | ||
| continue | ||
| } | ||
|
|
||
| // 2. Parse Redis data into domain objects | ||
| const { activityDates, orgIds } = parseMemberActivityHash(hash) | ||
|
|
||
| if (activityDates.length > 0) { | ||
| // 3. Compare with DB and calculate delta | ||
| const existingOrgs = await fetchMemberOrganizationsBySource( | ||
| qx, | ||
| memberId, | ||
| OrganizationSource.EMAIL_DOMAIN, | ||
| ) | ||
|
|
||
| const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates) | ||
|
|
||
| if (changes.length > 0) { | ||
| ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.') | ||
| stats.inserts += changes.filter((c) => c.type === 'insert').length | ||
| stats.updates += changes.filter((c) => c.type === 'update').length | ||
| } | ||
| } | ||
|
|
||
| // 4. Cleanup: Remove only the fields we actually read | ||
| await redis | ||
| .multi() | ||
| .hDel(datesKey, orgIds) | ||
| .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) | ||
| .exec() | ||
|
|
||
| stats.processed++ | ||
| } catch (err) { | ||
| ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') | ||
| } |
There was a problem hiding this comment.
Redis draining/cleanup here is not atomic: the job does hGetAll and later hDel/sRem without a lock/watch/Lua script. Any activity buffered for the same member+org between the read and hDel will be deleted and the member removed from the queue, causing silent data loss. Consider an atomic “read+clear” (e.g., Lua to fetch+del fields, or rename-and-process pattern) and only remove the member from the queue after successfully persisting DB changes.
| try { | |
| const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` | |
| const hash = await redis.hGetAll(datesKey) | |
| // If no data, just remove from queue and move on | |
| if (!hash || Object.keys(hash).length === 0) { | |
| await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) | |
| continue | |
| } | |
| // 2. Parse Redis data into domain objects | |
| const { activityDates, orgIds } = parseMemberActivityHash(hash) | |
| if (activityDates.length > 0) { | |
| // 3. Compare with DB and calculate delta | |
| const existingOrgs = await fetchMemberOrganizationsBySource( | |
| qx, | |
| memberId, | |
| OrganizationSource.EMAIL_DOMAIN, | |
| ) | |
| const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates) | |
| if (changes.length > 0) { | |
| ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.') | |
| stats.inserts += changes.filter((c) => c.type === 'insert').length | |
| stats.updates += changes.filter((c) => c.type === 'update').length | |
| } | |
| } | |
| // 4. Cleanup: Remove only the fields we actually read | |
| await redis | |
| .multi() | |
| .hDel(datesKey, orgIds) | |
| .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) | |
| .exec() | |
| stats.processed++ | |
| } catch (err) { | |
| ctx.log.error(err, { memberId }, 'Failed to process member stint inference.') | |
| } | |
| const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}` | |
| let processed = false | |
| for (let attempt = 1; attempt <= 3 && !processed; attempt++) { | |
| try { | |
| await redis.watch(datesKey, MEMBER_ORG_STINT_CHANGES_QUEUE) | |
| const hash = await redis.hGetAll(datesKey) | |
| // If no data, remove from queue only if nothing changed since the read | |
| if (!hash || Object.keys(hash).length === 0) { | |
| const emptyResult = await redis.multi().sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId).exec() | |
| if (emptyResult === null) { | |
| ctx.log.warn( | |
| { memberId, attempt }, | |
| 'Member activity changed during empty cleanup, retrying.', | |
| ) | |
| continue | |
| } | |
| processed = true | |
| continue | |
| } | |
| // 2. Parse Redis data into domain objects | |
| const { activityDates, orgIds } = parseMemberActivityHash(hash) | |
| if (activityDates.length > 0) { | |
| // 3. Compare with DB and calculate delta | |
| const existingOrgs = await fetchMemberOrganizationsBySource( | |
| qx, | |
| memberId, | |
| OrganizationSource.EMAIL_DOMAIN, | |
| ) | |
| const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates) | |
| if (changes.length > 0) { | |
| ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.') | |
| stats.inserts += changes.filter((c) => c.type === 'insert').length | |
| stats.updates += changes.filter((c) => c.type === 'update').length | |
| } | |
| } | |
| // 4. Cleanup: Remove only the fields we actually read, but only if the hash/queue | |
| // were unchanged since the watched read. If anything changed, retry so we never | |
| // delete activity that arrived after the snapshot we processed. | |
| const result = await redis | |
| .multi() | |
| .hDel(datesKey, orgIds) | |
| .sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId) | |
| .exec() | |
| if (result === null) { | |
| ctx.log.warn( | |
| { memberId, attempt }, | |
| 'Member activity changed during cleanup, retrying.', | |
| ) | |
| continue | |
| } | |
| stats.processed++ | |
| processed = true | |
| } catch (err) { | |
| await redis.unwatch() | |
| ctx.log.error(err, { memberId, attempt }, 'Failed to process member stint inference.') | |
| break | |
| } | |
| } | |
| if (!processed) { | |
| await redis.unwatch() | |
| ctx.log.warn( | |
| { memberId }, | |
| 'Skipped member stint inference after repeated Redis concurrency conflicts.', | |
| ) | |
| } |
| isNew: false, | ||
| })) | ||
|
|
||
| for (const { organizationId, date: targetDate } of orgDates) { |
There was a problem hiding this comment.
inferMemberOrganizationStintChanges relies on orgDates being in chronological order for the multi-stint guard to work, but it doesn’t sort or validate the input. Since this is exported shared logic, it’s easy for a future caller to pass unsorted dates and get incorrect stint splitting/bridging; consider sorting defensively inside the function (or asserting/documenting the precondition clearly).
| for (const { organizationId, date: targetDate } of orgDates) { | |
| const sortedOrgDates = orgDates | |
| .slice() | |
| .sort((a, b) => Date.parse(a.date) - Date.parse(b.date)) | |
| for (const { organizationId, date: targetDate } of sortedOrgDates) { |
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>

Context
When an activity comes in with a verified work email like
jbeulich@suse.com, we already create amemberOrganizationsrow linking the member to SUSE — but with NULLdateStart/dateEnd. That causes two problems:@suse.comactivity can lose to an unrelated dated enrichment row, becausefindAffiliationtreats undated rows as last-resort fallback.We want to use the activity timestamp as evidence that the person was at that company at that moment, and write that into
dateStart/dateEnd. The catch: we can't write on every activity (active maintainers generate hundreds per day), we can't collapse a real multi-stint history like Google → Apple → Google into one wrong range, and we can't override user edits or enrichment data.How it works
Hot path (
data_sink_worker): When an activity arrives, we just buffer(org, date)in a Redis hash keyed by member. This involves two Redis ops, no Postgres, and no rule evaluation. Hundreds of same-day activities collapse to a single entry.Cron (
cron_service): Every 5 min, the service pops up to 500 pending members, atomically drains each member's hash, loads their existing email-domain rows, and walks the buffered dates chronologically applying 4 rules:dateEnd→ extend forward, with a 30-day debounce and a multi-stint guard (if another org holds a 30+ day stint in the gap, insert a fresh stint instead of bridging)dateStart→ extend backward, same multi-stint guard, no debounce (rare, re-ingestion only)Walking all orgs together in chronological order is what makes multi-stint detection work: by the time the 2008 Google event checks its gap, the 2005-2007 Apple events are already in the working copy and the guard fires correctly.
findAffiliationgets two changes so email-domain rows start contributing meaningfully:ui > email-domain > enrichment-* > other) inserted intodecidePrimaryOrganizationId. Once email-domain rows have dates, they beat enrichment on overlaps.findMemberWorkExperiencepulls in the matching email-domain row as a candidate even when undated. This is the user-visible win that lands immediately — work-email activities resolve to the right org inline, without waiting for the cron to stamp dates.Partial index on
memberOrganizations("memberId") WHERE source='email-domain' AND deletedAt IS NULLbacks the cron's per-member fetch so it's a single index seek.Cleanup
map-member-to-org.ts,map-tenant-members-to-org.ts).memberOrganization.ts→member-organization.tsto match the folder's kebab-case convention.Note
Medium Risk
Medium risk because it changes organization affiliation resolution and introduces new Redis-buffered, cron-driven inference that could affect member→organization timelines and query performance if the new logic or indexing is off.
Overview
Adds a Redis-buffered pipeline to infer and update
memberOrganizationsstintdateStart/dateEndforemail-domainlinks based on activity timestamps, including new shared inference logic (inferMemberOrganizationStintChanges) and acron_servicejob that batches queued members.Updates
data_sink_workerto buffer per-(member, org) activity dates when assigning orgs by verified work email, and adjusts affiliation resolution to pass anemailDomainhint and prefer higher-confidence organization sources (ui>email-domain>enrichment-*> others).Improves performance/supporting infrastructure with a new partial Postgres index for per-member
email-domainlookups, adds DAL helpers for fetching member orgs by source and email-domain-aware work-experience queries, and removes two obsolete mapping scripts while renamingmemberOrganizationtomember-organizationexports.Reviewed by Cursor Bugbot for commit 801fbd9. Bugbot is set up for automated code reviews on this repo. Configure here.