Skip to content

feat: infer memberOrganization stint dates from work-email activities (CM-1105)#4054

Open
skwowet wants to merge 6 commits intomainfrom
improve/CM-1105
Open

feat: infer memberOrganization stint dates from work-email activities (CM-1105)#4054
skwowet wants to merge 6 commits intomainfrom
improve/CM-1105

Conversation

@skwowet
Copy link
Copy Markdown
Collaborator

@skwowet skwowet commented Apr 24, 2026

Context

When an activity comes in with a verified work email like jbeulich@suse.com, we already create a memberOrganizations row linking the member to SUSE — but with NULL dateStart/dateEnd. That causes two problems:

  1. A @suse.com activity can lose to an unrelated dated enrichment row, because findAffiliation treats undated rows as last-resort fallback.
  2. The affiliation timeline ignores the row entirely since it has no dates.

We want to use the activity timestamp as evidence that the person was at that company at that moment, and write that into dateStart/dateEnd. The catch: we can't write on every activity (active maintainers generate hundreds per day), we can't collapse a real multi-stint history like Google → Apple → Google into one wrong range, and we can't override user edits or enrichment data.

How it works

Hot path (data_sink_worker): When an activity arrives, we just buffer (org, date) in a Redis hash keyed by member. This involves two Redis ops, no Postgres, and no rule evaluation. Hundreds of same-day activities collapse to a single entry.

Cron (cron_service): Every 5 min, the service pops up to 500 pending members, atomically drains each member's hash, loads their existing email-domain rows, and walks the buffered dates chronologically applying 4 rules:

  1. Date already inside an existing stint → no-op
  2. No dated stint yet for this org → stamp the undated placeholder row
  3. Later than dateEnd → extend forward, with a 30-day debounce and a multi-stint guard (if another org holds a 30+ day stint in the gap, insert a fresh stint instead of bridging)
  4. Earlier than dateStart → extend backward, same multi-stint guard, no debounce (rare, re-ingestion only)

Walking all orgs together in chronological order is what makes multi-stint detection work: by the time the 2008 Google event checks its gap, the 2005-2007 Apple events are already in the working copy and the guard fires correctly.

findAffiliation gets two changes so email-domain rows start contributing meaningfully:

  • Source priority tier (ui > email-domain > enrichment-* > other) inserted into decidePrimaryOrganizationId. Once email-domain rows have dates, they beat enrichment on overlaps.
  • Work-email bias: If the activity payload has a verified email, findMemberWorkExperience pulls in the matching email-domain row as a candidate even when undated. This is the user-visible win that lands immediately — work-email activities resolve to the right org inline, without waiting for the cron to stamp dates.

Partial index on memberOrganizations("memberId") WHERE source='email-domain' AND deletedAt IS NULL backs the cron's per-member fetch so it's a single index seek.

Cleanup

  • Dropped two outdated bin scripts (map-member-to-org.ts, map-tenant-members-to-org.ts).
  • Renamed memberOrganization.tsmember-organization.ts to match the folder's kebab-case convention.

Note

Medium Risk
Medium risk because it changes organization affiliation resolution and introduces new Redis-buffered, cron-driven inference that could affect member→organization timelines and query performance if the new logic or indexing is off.

Overview
Adds a Redis-buffered pipeline to infer and update memberOrganizations stint dateStart/dateEnd for email-domain links based on activity timestamps, including new shared inference logic (inferMemberOrganizationStintChanges) and a cron_service job that batches queued members.

Updates data_sink_worker to buffer per-(member, org) activity dates when assigning orgs by verified work email, and adjusts affiliation resolution to pass an emailDomain hint and prefer higher-confidence organization sources (ui > email-domain > enrichment-* > others).

Improves performance/supporting infrastructure with a new partial Postgres index for per-member email-domain lookups, adds DAL helpers for fetching member orgs by source and email-domain-aware work-experience queries, and removes two obsolete mapping scripts while renaming memberOrganization to member-organization exports.

Reviewed by Cursor Bugbot for commit 801fbd9. Bugbot is set up for automated code reviews on this repo. Configure here.

Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
@skwowet skwowet self-assigned this Apr 24, 2026
@skwowet skwowet requested review from Copilot and themarolt April 24, 2026 10:41
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds infrastructure to infer and persist memberOrganizations stint dates from verified work-email activities, so email-domain affiliations become timeline-aware and can compete with enrichment on overlaps.

Changes:

  • Extend affiliation resolution to bias toward email-domain rows when a verified email domain is present, and add a source-priority tier in decidePrimaryOrganizationId.
  • Buffer (memberId, orgId, YYYY-MM-DD) activity evidence in Redis on the ingestion hot path and introduce a cron job to infer stint insert/update operations from buffered dates.
  • Add a partial Postgres index to speed up per-member fetches of email-domain memberOrganizations; remove legacy mapping scripts and rename the shared member-organization service file.

Reviewed changes

Copilot reviewed 14 out of 16 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
services/libs/types/src/organizations.ts Adds shared types for buffered org-dates and inferred stint changes.
services/libs/data-access-layer/src/old/apps/data_sink_worker/repo/memberAffiliation.data.ts Extends work-experience data shape to include source.
services/libs/data-access-layer/src/members/segments.ts Adds optional email-domain candidate inclusion in findMemberWorkExperience.
services/libs/data-access-layer/src/members/organizations.ts Adds fetchMemberOrganizationsBySource for cron’s targeted reads.
services/libs/common_services/src/services/memberOrganization.ts Deleted (renamed).
services/libs/common_services/src/services/member/unmerge.ts Updates import to new member-organization module path.
services/libs/common_services/src/services/member-organization.ts New module: keeps unmerge helpers and adds stint inference logic + Redis key constants.
services/libs/common_services/src/services/index.ts Re-exports renamed member-organization module.
services/libs/common_services/src/services/common.member.service.ts Threads emailDomain through findAffiliation and adds source-priority selection logic.
services/apps/data_sink_worker/src/service/member.service.ts Buffers per-member per-org activity dates in Redis and enqueues member IDs for cron.
services/apps/data_sink_worker/src/service/activity.service.ts Extracts verified email domain from activity payload and passes it into affiliation lookup.
services/apps/data_sink_worker/src/bin/map-tenant-members-to-org.ts Removed outdated script.
services/apps/data_sink_worker/src/bin/map-member-to-org.ts Removed outdated script.
services/apps/data_sink_worker/package.json Removes script entries for deleted bin scripts.
services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts New cron job to drain Redis buffers and compute stint changes (currently dry-run).
backend/src/database/migrations/V1776931245__member-organizations-email-domain-partial-index.sql Adds partial index to support efficient per-member email-domain org reads.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts Outdated
Comment thread services/apps/cron_service/src/jobs/inferMemberOrganizationStintChanges.job.ts Outdated
Comment thread services/libs/common_services/src/services/member-organization.ts Outdated
Comment thread services/apps/data_sink_worker/src/service/member.service.ts Outdated
Comment thread services/libs/data-access-layer/src/members/segments.ts
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 7b7e744. Configure here.

Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Copilot AI review requested due to automatic review settings April 24, 2026 17:50
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 16 changed files in this pull request and generated 5 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread services/apps/data_sink_worker/src/service/activity.service.ts
Comment thread services/apps/data_sink_worker/src/service/member.service.ts Outdated
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Copilot AI review requested due to automatic review settings April 24, 2026 19:49
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 14 out of 16 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +33 to +73
try {
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
const hash = await redis.hGetAll(datesKey)

// If no data, just remove from queue and move on
if (!hash || Object.keys(hash).length === 0) {
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
continue
}

// 2. Parse Redis data into domain objects
const { activityDates, orgIds } = parseMemberActivityHash(hash)

if (activityDates.length > 0) {
// 3. Compare with DB and calculate delta
const existingOrgs = await fetchMemberOrganizationsBySource(
qx,
memberId,
OrganizationSource.EMAIL_DOMAIN,
)

const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates)

if (changes.length > 0) {
ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.')
stats.inserts += changes.filter((c) => c.type === 'insert').length
stats.updates += changes.filter((c) => c.type === 'update').length
}
}

// 4. Cleanup: Remove only the fields we actually read
await redis
.multi()
.hDel(datesKey, orgIds)
.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
.exec()

stats.processed++
} catch (err) {
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
}
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redis draining/cleanup here is not atomic: the job does hGetAll and later hDel/sRem without a lock/watch/Lua script. Any activity buffered for the same member+org between the read and hDel will be deleted and the member removed from the queue, causing silent data loss. Consider an atomic “read+clear” (e.g., Lua to fetch+del fields, or rename-and-process pattern) and only remove the member from the queue after successfully persisting DB changes.

Suggested change
try {
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
const hash = await redis.hGetAll(datesKey)
// If no data, just remove from queue and move on
if (!hash || Object.keys(hash).length === 0) {
await redis.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
continue
}
// 2. Parse Redis data into domain objects
const { activityDates, orgIds } = parseMemberActivityHash(hash)
if (activityDates.length > 0) {
// 3. Compare with DB and calculate delta
const existingOrgs = await fetchMemberOrganizationsBySource(
qx,
memberId,
OrganizationSource.EMAIL_DOMAIN,
)
const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates)
if (changes.length > 0) {
ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.')
stats.inserts += changes.filter((c) => c.type === 'insert').length
stats.updates += changes.filter((c) => c.type === 'update').length
}
}
// 4. Cleanup: Remove only the fields we actually read
await redis
.multi()
.hDel(datesKey, orgIds)
.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
.exec()
stats.processed++
} catch (err) {
ctx.log.error(err, { memberId }, 'Failed to process member stint inference.')
}
const datesKey = `${MEMBER_ORG_STINT_CHANGES_DATES_PREFIX}:${memberId}`
let processed = false
for (let attempt = 1; attempt <= 3 && !processed; attempt++) {
try {
await redis.watch(datesKey, MEMBER_ORG_STINT_CHANGES_QUEUE)
const hash = await redis.hGetAll(datesKey)
// If no data, remove from queue only if nothing changed since the read
if (!hash || Object.keys(hash).length === 0) {
const emptyResult = await redis.multi().sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId).exec()
if (emptyResult === null) {
ctx.log.warn(
{ memberId, attempt },
'Member activity changed during empty cleanup, retrying.',
)
continue
}
processed = true
continue
}
// 2. Parse Redis data into domain objects
const { activityDates, orgIds } = parseMemberActivityHash(hash)
if (activityDates.length > 0) {
// 3. Compare with DB and calculate delta
const existingOrgs = await fetchMemberOrganizationsBySource(
qx,
memberId,
OrganizationSource.EMAIL_DOMAIN,
)
const changes = inferMemberOrganizationStintChanges(memberId, existingOrgs, activityDates)
if (changes.length > 0) {
ctx.log.info({ memberId, count: changes.length }, 'Stint changes identified.')
stats.inserts += changes.filter((c) => c.type === 'insert').length
stats.updates += changes.filter((c) => c.type === 'update').length
}
}
// 4. Cleanup: Remove only the fields we actually read, but only if the hash/queue
// were unchanged since the watched read. If anything changed, retry so we never
// delete activity that arrived after the snapshot we processed.
const result = await redis
.multi()
.hDel(datesKey, orgIds)
.sRem(MEMBER_ORG_STINT_CHANGES_QUEUE, memberId)
.exec()
if (result === null) {
ctx.log.warn(
{ memberId, attempt },
'Member activity changed during cleanup, retrying.',
)
continue
}
stats.processed++
processed = true
} catch (err) {
await redis.unwatch()
ctx.log.error(err, { memberId, attempt }, 'Failed to process member stint inference.')
break
}
}
if (!processed) {
await redis.unwatch()
ctx.log.warn(
{ memberId },
'Skipped member stint inference after repeated Redis concurrency conflicts.',
)
}

Copilot uses AI. Check for mistakes.
isNew: false,
}))

for (const { organizationId, date: targetDate } of orgDates) {
Copy link

Copilot AI Apr 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inferMemberOrganizationStintChanges relies on orgDates being in chronological order for the multi-stint guard to work, but it doesn’t sort or validate the input. Since this is exported shared logic, it’s easy for a future caller to pass unsorted dates and get incorrect stint splitting/bridging; consider sorting defensively inside the function (or asserting/documenting the precondition clearly).

Suggested change
for (const { organizationId, date: targetDate } of orgDates) {
const sortedOrgDates = orgDates
.slice()
.sort((a, b) => Date.parse(a.date) - Date.parse(b.date))
for (const { organizationId, date: targetDate } of sortedOrgDates) {

Copilot uses AI. Check for mistakes.
Signed-off-by: Yeganathan S <63534555+skwowet@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants