diff --git a/src/content/docs/rework-orchestration/adr/adr-001-s3-for-partal-results.mdx b/src/content/docs/rework-orchestration/adr/adr-001-s3-for-partal-results.mdx new file mode 100644 index 0000000..808c709 --- /dev/null +++ b/src/content/docs/rework-orchestration/adr/adr-001-s3-for-partal-results.mdx @@ -0,0 +1,147 @@ +--- +title: "ADR-001: Use S3-Compatible Storage for Partial Simulation Results" +description: "Store simulation result chunks in S3/MinIO instead of Redis/PostgreSQL." +status: proposed +date: 2026-01-15 +authors: [yaptide-team] +tags: [storage, partial-results, s3, minio] +related-issues: + - yaptide/yaptide#NNN +supersedes: null +superseded-by: null +--- + +# ADR-001: Use S3-Compatible Storage for Partial Simulation Results + +## Context + +Current architecture stores all simulation results (estimators, pages) in PostgreSQL as gzip-compressed JSON blobs. This works but creates bottlenecks: + +- **Large payloads** (up to 1 GB merged) block DB writes +- **No partial availability** — results only visible after COMPLETED +- **Redis pressure** — Celery result backend shares infrastructure with control plane + +Researchers need **partial results during RUNNING** to catch misconfigurations early (target: \<30 seconds to first insight). + +## Decision Drivers + +| Driver | Priority | Description | +|--------|----------|-------------| +| Partial result streaming | High | Must enable chunked upload during simulation | +| Payload size | High | Must handle up to 1 GB merged results | +| Deployment compatibility | High | Must work on PLGrid (no persistent S3) and cloud (MinIO OK) | +| Operational complexity | Medium | Should not add significant operational burden | +| Browser accessibility | Medium | UI must fetch chunks efficiently | + +## Considered Options + +### Option A: PostgreSQL BLOBs (Current) + +**Description:** Continue storing all results in PostgreSQL `PageModel.compressed_data`. + +**Pros:** +- ✅ No new infrastructure +- ✅ Transactional consistency +- ✅ Existing backup/restore workflows + +**Cons:** +- ❌ No partial availability (write-on-commit) +- ❌ DB write bottleneck for large results +- ❌ No chunked retrieval (must fetch entire page) + +**Effort:** None (status quo) + +### Option B: Redis as Result Store + +**Description:** Use Redis (existing broker) for result caching, PostgreSQL for metadata only. + +**Pros:** +- ✅ Fast reads (in-memory) +- ✅ No new infrastructure + +**Cons:** +- ❌ Memory pressure on broker +- ❌ Volatile (results lost on restart) +- ❌ Not suitable for large payloads (>1 GB) + +**Effort:** Low (1-2 weeks) + +### Option C: S3-Compatible Object Storage (Recommended) + +**Description:** Deploy MinIO (cloud) or use PLGrid S3 (HPC) for result chunks. PostgreSQL stores metadata + chunk URLs only. + +**Pros:** +- ✅ Native chunked upload (partial results during RUNNING) +- ✅ Decouples data-plane from control-plane +- ✅ Scalable to TB-scale results +- ✅ Browser-accessible via presigned URLs +- ✅ Compression (ZSTD/LZ4) at object level + +**Cons:** +- ⚠️ New infrastructure (MinIO deployment) +- ⚠️ PLGrid S3 availability uncertain (need HPC ops confirmation) +- ⚠️ Migration path for existing results + +**Effort:** Medium (4-6 weeks including PoC) + + +## Decision Outcome + +**Chosen option:** "Option C: S3-Compatible Object Storage" + +**Justification:** + +Object storage is the only option that satisfies **partial result streaming** (Driver 1) and **large payload handling** (Driver 2) simultaneously. The operational complexity (MinIO deployment) is a one-time cost that pays off in scalability and user experience. + +**Confirmation:** PoC will validate: +- Chunked upload via Arrow Flight +- Presigned URL generation for UI fetch +- MinIO deployment in Docker Compose + +## Consequences + +### Positive + +- Partial results available ~30 seconds after simulation start +- PostgreSQL relieved of large BLOB storage (metadata only) +- Redis broker isolated from data-plane traffic +- Browser can fetch chunks directly (no backend proxy needed) +- Enables Phase 3 distributed merge (S3 Select / Cloud Functions) + +### Negative + +- New infrastructure to deploy and monitor (MinIO) +- Migration complexity for existing simulations (v1→v2) +- PLGrid S3 may not be available (fallback to MinIO required) + +### Neutral + +- Result retrieval API changes (`GET /chunks/{page_id}` vs `GET /results`) +- UI must handle chunked fetch + partial rendering +- Backup strategy includes S3 bucket (not just PostgreSQL dump) + +## Implementation Plan + +| Task | Owner | Timeline | +|------|-------|----------| +| MinIO Docker Compose setup | @devops | Week 1-2 | +| S3 chunk upload integration (Flight) | @backend | Week 3-6 | +| Presigned URL API endpoint | @backend | Week 4 | +| UI chunk fetch + Arrow.js parsing | @frontend | Week 5-7 | +| Migration guide (v1→v2) | @tech-writer | Week 8 | + +## Validation Criteria + +- [ ] Partial results visible in UI within 30 seconds of job start +- [ ] No PostgreSQL BLOB writes >10 MB +- [ ] UI can fetch 100 MB chunk in \<5 seconds +- [ ] MinIO storage grows linearly with simulation count + +## References + +- [Research Session 002: S3 vs Redis Tradeoffs](/rework-orchestration/research/session-002-s3-vs-redis-tradeoffs) +- [Design: Partial Results Streaming](/rework-orchestration/design/partial-results-streaming) +- [MinIO Documentation](https://min.io/docs) + + +**Last updated:** 2026-04-30 */ diff --git a/src/content/docs/rework-orchestration/adr/adr-002-binary-format-selection.mdx b/src/content/docs/rework-orchestration/adr/adr-002-binary-format-selection.mdx new file mode 100644 index 0000000..63897e0 --- /dev/null +++ b/src/content/docs/rework-orchestration/adr/adr-002-binary-format-selection.mdx @@ -0,0 +1,173 @@ +--- +title: "ADR-002: Arrow IPC as Primary Binary Format for Result Transport" +description: "Use Apache Arrow IPC for result serialization instead of JSON." +status: proposed +date: 2026-01-15 +authors: [yaptide-team] +tags: [binary-format, arrow, serialization, performance] +related-issues: + - yaptide/yaptide#NNN +supersedes: null +superseded-by: null +--- + +# ADR-002: Arrow IPC as Primary Binary Format for Result Transport + +## Context + +Current architecture uses JSON for all result transport: +- Simulator output → JSON → Redis → PostgreSQL +- Multiple serialize/deserialize cycles +- Python object overhead (2-3× memory footprint) + +Need a binary format that: +- Minimizes serialization overhead +- Supports partial reads (streaming) +- Is browser-parseable (UI rendering) +- Integrates with NumPy/Polars for merge +- Has compression support (ZSTD/LZ4) + +## Decision Drivers + +| Driver | Priority | Description | +|--------|----------|-------------| +| Serialization speed | High | Must reduce encode/decode time by 10× | +| Memory footprint | High | Must eliminate Python object overhead | +| Browser support | High | UI must parse without plugins | +| Ecosystem integration | Medium | Must work with NumPy, Polars, Python | +| Compression | Medium | Should support efficient compression | + +## Considered Options + +### Option A: JSON + gzip (Current) + +**Description:** Continue using JSON with gzip compression. + +**Pros:** +- ✅ Universal support (Python, JavaScript, any language) +- ✅ Human-readable (debugging friendly) +- ✅ No new dependencies + +**Cons:** +- ❌ Slow parse (O(n) string processing) +- ❌ High memory (2-3× data size in Python objects) +- ❌ No type information (all numbers become float64) +- ❌ No partial reads (must parse entire document) + +**Effort:** None (status quo) + +### Option B: MessagePack / pickle + +**Description:** Binary serialization formats with Python support. + +**Pros:** +- ✅ Faster than JSON (binary encoding) +- ✅ Smaller payload (no string overhead) +- ✅ Python native (pickle) + +**Cons:** +- ❌ Browser support requires additional libraries +- ❌ Pickle is Python-specific (not portable) +- ❌ No schema evolution support +- ❌ No partial reads + +**Effort:** Low (2-3 weeks) + +### Option C: Apache Arrow IPC (Recommended) + +**Description:** Columnar binary format with native Python and JavaScript support. + +**Pros:** +- ✅ Zero-copy memory access (minimal parse overhead) +- ✅ 1× memory footprint (contiguous buffers) +- ✅ Native NumPy/Polars integration (`to_numpy()`) +- ✅ Browser support via Arrow.js +- ✅ Streaming/iterative reads (RecordBatch) +- ✅ Schema preservation (types, metadata) +- ✅ Compression support (ZSTD, LZ4) + +**Cons:** +- ⚠️ New dependency (pyarrow, arrow-js) +- ⚠️ Team learning curve (Arrow API) +- ⚠️ Larger than JSON for very small payloads (\<1 KB) + +**Effort:** Medium (4-6 weeks) + +### Option D: Parquet + +**Description:** Columnar storage format built on Arrow. + +**Pros:** +- ✅ Excellent compression +- ✅ Predicate pushdown (query optimization) +- ✅ Ecosystem support (Spark, pandas, DuckDB) + +**Cons:** +- ❌ Not designed for streaming (file-based) +- ❌ Browser support limited (no native Arrow.js Parquet) +- ❌ Overkill for single-write, single-read pattern + +**Effort:** Medium (4-6 weeks) + +## Decision Outcome + +**Chosen option:** "Option C: Apache Arrow IPC" + +**Justification:** + +Arrow IPC is the only format that satisfies **streaming partial reads** (Driver 2) and **browser parseability** (Driver 3) while providing **NumPy integration** (Driver 4) for the merge pipeline. The columnar layout matches YAPTIDE's data structure (estimators → pages → numeric arrays) perfectly. + +**Confirmation:** PoC will validate: +- pyarrow serialization time vs JSON +- Arrow.js parse time in browser +- Compression ratio (ZSTD vs gzip) + +## Consequences + +### Positive + +- 10-100× faster serialization/deserialization +- 75% memory footprint reduction (no Python objects) +- Zero-copy merge operations (direct NumPy access) +- Partial result streaming (RecordBatch iteration) +- Schema evolution support (backward compatibility) +- Future-proof (industry standard: pandas, Spark, DuckDB) + +### Negative + +- New dependency chain (pyarrow, arrow-js) +- Team requires Arrow API training +- Slightly larger payloads for very small results (\<1 KB) + +### Neutral + +- Result files are binary (not human-readable) +- Debugging requires Arrow tools (e.g., `arrow cat`) +- Migration requires format conversion for v1 results + +## Implementation Plan + +| Task | Owner | Timeline | +|------|-------|----------| +| pyarrow integration (worker) | @backend | Week 1-3 | +| Arrow IPC → S3 chunk upload | @backend | Week 3-4 | +| Arrow.js UI integration | @frontend | Week 4-6 | +| JSRoot + Arrow adapter | @frontend | Week 5-7 | +| Format conversion tool (v1→v2) | @backend | Week 8 | + +## Validation Criteria + +- [ ] Serialization time reduced by >10× vs JSON +- [ ] Memory footprint \<1.2× raw data size (\<1 KB) +- [ ] Arrow.js parses 100 MB chunk in \<2 seconds +- [ ] NumPy merge achieves >10× speedup + +## References + +- [Research Session 003: Binary Format Comparison](/rework-orchestration/research/session-003-binary-format) +- [Apache Arrow Documentation](https://arrow.apache.org/docs) +- [ADR-001: S3 Storage](/rework-orchestration/adr/adr-001-s3-for-partial-results) + +--- + +**Last updated:** 2026-04-30 diff --git a/src/content/docs/rework-orchestration/adr/adr-003-merge-algorithm.mdx b/src/content/docs/rework-orchestration/adr/adr-003-merge-algorithm.mdx new file mode 100644 index 0000000..8c49299 --- /dev/null +++ b/src/content/docs/rework-orchestration/adr/adr-003-merge-algorithm.mdx @@ -0,0 +1,178 @@ +--- +title: "ADR-003: NumPy Vectorization for Result Merge Pipeline" +description: "Replace pure-Python merge loops with NumPy vectorized operations." +status: proposed +date: 2026-01-15 +authors: [yaptide-team] +tags: [merge, numpy, performance, polars] +related-issues: + - yaptide/yaptide#NNN +supersedes: null +superseded-by: null +--- + +# ADR-003: NumPy Vectorization for Result Merge Pipeline + +## Context + +Current merge implementation (`average_estimators()` in `yaptide/celery/utils/pymc.py`) uses pure Python loops: + +```python +# O(T × P × V) complexity +for page in pages: + for value_idx in range(num_values): + values = [task[page][value_idx] for task in tasks] + averaged = sum(values) / len(tasks) +``` + +For representative workloads (100 tasks, 10 pages, 100k values/page = 100M iterations), this becomes a significant bottleneck. + +## Decision Drivers + +| Driver | Priority | Description | +|--------|----------|-------------| +| Merge speed | High | Must reduce merge time by 10-100× | +| Memory efficiency | High | Must not materialize all data in memory | +| Code simplicity | Medium | Should not add significant complexity | +| Future extensibility | Medium | Should support advanced aggregations (quantiles, std) | + +## Considered Options + +### Option A: Pure Python (Current) + +**Description:** Continue with existing Python loop implementation. + +**Pros:** +- ✅ No new dependencies +- ✅ Simple to understand + +**Cons:** +- ❌ Extremely slow (100M+ Python iterations) +- ❌ High memory (Python list objects) +- ❌ No vectorization benefits + +**Effort:** None (status quo) + +### Option B: NumPy Vectorization (Recommended) + +**Description:** Use NumPy arrays and vectorized operations for merge. + +```python +import numpy as np + +stacked = np.vstack([task[page].to_numpy() for task in tasks]) +merged = np.mean(stacked, axis=0) +``` + +**Pros:** +- ✅ 10-100× faster (C-level loops) +- ✅ Memory efficient (contiguous buffers) +- ✅ Simple API (few lines of code) +- ✅ Native Arrow integration (`pa.Array.to_numpy()`) + +**Cons:** +- ⚠️ Requires all data in memory for `vstack()` +- ⚠️ Limited to simple aggregations (mean, sum, std) + +**Effort:** Low (2-3 weeks) + +### Option C: Polars + +**Description:** Use Polars DataFrame for merge with lazy evaluation. + +```python +import polars as pl + +merged = ( + pl.concat(dfs) + .groupby("page_id") + .agg(pl.col("value").mean()) + .collect(streaming=True) +) +``` + +**Pros:** +- ✅ Out-of-core execution (streaming mode) +- ✅ Advanced aggregations (quantiles, groupby) +- ✅ Query optimization (lazy API) + +**Cons:** +- ⚠️ Higher learning curve (expressions API) +- ⚠️ Overkill for simple averaging +- ⚠️ Additional dependency (Rust runtime) + +**Effort:** Medium (4-5 weeks) + +### Option D: Dask Distributed + +**Description:** Use Dask for distributed merge across workers. + +**Pros:** +- ✅ True distributed merge (multi-node) +- ✅ Out-of-core execution +- ✅ Phase 3 ready (horizontal scaling) + +**Cons:** +- ❌ High complexity (cluster management) +- ❌ Overkill for Phase 2 (single worker) +- ❌ Significant operational overhead + +**Effort:** High (8-10 weeks) + +## Decision Outcome + +**Chosen option:** "Option B: NumPy Vectorization" + +**Justification:** + +NumPy provides the best balance of **performance** (Driver 1) and **simplicity** (Driver 3) for Phase 2. It integrates natively with Arrow (ADR-002) and achieves the target 10-100× speedup without the complexity of Polars or Dask. Polars can be added later (Phase 3) if advanced aggregations are needed. + +**Confirmation:** Benchmark will validate: +- Merge time for 100 tasks, 10 pages, 100k values/page +- Memory peak during merge +- Comparison: Python vs NumPy vs Polars + +## Consequences + +### Positive + +- Merge time reduced from ~100s to ~1-5s (representative workload) +- Memory footprint reduced (no Python list objects) +- Simple code change (~10 lines) +- Enables future Polars/Dask upgrade (same data format) + +### Negative + +- Still requires all data in memory (limitation for >1 GB merges) +- No advanced aggregations initially (quantiles, percentiles) + +### Neutral + +- Dependency on NumPy (already common in scientific Python) +- Merge function signature unchanged (API compatible) + +## Implementation Plan + +| Task | Owner | Timeline | +|------|-------|----------| +| NumPy merge implementation | @backend | Week 1-2 | +| Arrow → NumPy conversion | @backend | Week 2 | +| Benchmark suite (Python vs NumPy) | @backend | Week 3 | +| Polars evaluation (Phase 3 prep) | @backend | Week 10+ | + +## Validation Criteria + +- [ ] Merge time \<5s for representative workload +- [ ] Memory peak \<2× raw data size +- [ ] No regression in result accuracy (float precision) +- [ ] Code coverage >90% for merge functions + +## References + +- [Research Session 004: Merge Algorithm Benchmark](/rework-orchestration/research/session-004-merge-benchmark) +- [Design: Result Merge Pipeline](/rework-orchestration/design/result-merge-pipeline) +- [NumPy Documentation](https://numpy.org/doc) + +--- + +**Last updated:** 2026-04-30 diff --git a/src/content/docs/rework-orchestration/adr/adr-004-retry-and-logging.mdx b/src/content/docs/rework-orchestration/adr/adr-004-retry-and-logging.mdx new file mode 100644 index 0000000..7230166 --- /dev/null +++ b/src/content/docs/rework-orchestration/adr/adr-004-retry-and-logging.mdx @@ -0,0 +1,143 @@ +--- +title: "ADR-004: Celery Retry Policies with Exponential Backoff" +description: "Add production-grade retry logic and structured logging." +status: proposed +date: 2026-01-15 +authors: [yaptide-team] +tags: [reliability, retry, logging, observability] +related-issues: + - yaptide/yaptide#NNN +supersedes: null +superseded-by: null +--- + +# ADR-004: Celery Retry Policies with Exponential Backoff + +## Context + +Current implementation has no explicit retry policies: +- HTTP POSTs to `/tasks` and `/results` are single-attempt +- Celery tasks use default retry behavior (none) +- Logging is basic Python `logging` (no structure) + +This causes fragility on transient failures (network blips, HPC timeouts). + +## Decision Drivers + +| Driver | Priority | Description | +|--------|----------|-------------| +| Reliability | High | Must recover from transient failures automatically | +| Observability | High | Must enable fast incident triage | +| Operational simplicity | Medium | Should not add significant complexity | +| RTO target | High | Must achieve \<5 minute recovery time | + +## Considered Options + +### Option A: No Retry (Current) + +**Description:** Continue with single-attempt HTTP posts. + +**Pros:** +- ✅ Simple implementation +- ✅ No duplicate updates + +**Cons:** +- ❌ Data loss on transient failures +- ❌ Manual intervention required +- ❌ No audit trail + +**Effort:** None (status quo) + +### Option B: Celery Built-in Retry (Recommended) + +**Description:** Use Celery's `autoretry_for` with exponential backoff. + +```python +@task( + autoretry_for=(ConnectionError, TimeoutError), + retry_backoff=True, + retry_backoff_max=600, + retry_jitter=True +) +def run_single_simulation(...): + ... +``` + +**Pros:** +- ✅ Minimal code change (decorator) +- ✅ Exponential backoff (avoids thundering herd) +- ✅ Built-in dead-letter queue (max_retries exceeded) +- ✅ Idempotent by design (task_id tracking) + +**Cons:** +- ⚠️ May delay failure notification (retry delays) +- ⚠️ Requires idempotent handlers (already true for /tasks) + +**Effort:** Low (1-2 weeks) + +### Option C: Custom Retry Layer + +**Description:** Implement custom retry logic with `tenacity` or similar. + +**Pros:** +- ✅ Full control over retry behavior +- ✅ Can add custom logic (circuit breaker, etc.) + +**Cons:** +- ❌ More code to maintain +- ❌ Duplicates Celery functionality +- ❌ Additional dependency + +**Effort:** Medium (3-4 weeks) + +## Decision Outcome + +**Chosen option:** "Option B: Celery Built-in Retry" + +**Justification:** + +Celery's built-in retry provides **adequate reliability** (Driver 1) with **minimal complexity** (Driver 3). The decorator approach is well-tested and integrates with existing task infrastructure. + +## Consequences + +### Positive + +- Automatic recovery from transient failures +- Reduced manual intervention +- Dead-letter queue for failed tasks (investigation) +- Better observability (retry counts in logs) + +### Negative + +- Slightly delayed failure notification (retry delays) +- Must ensure all handlers are idempotent + +### Neutral + +- Celery configuration changes (broker transport options) +- Log volume increases (retry events) + +## Implementation Plan + +| Task | Owner | Timeline | +|------|-------|----------| +| Celery retry decorator | @backend | Week 1 | +| Idempotency audit (/tasks, /results) | @backend | Week 1-2 | +| Structured logging (structlog) | @backend | Week 2-3 | +| Grafana retry metrics dashboard | @devops | Week 3-4 | + +## Validation Criteria + +- [ ] Transient failures auto-recover without data loss +- [ ] Failed tasks visible in dead-letter queue +- [ ] RTO \<5 minutes for broker/network outages +- [ ] Structured logs queryable by `simulation_id` and `task_id` + +## References + +- [Celery Retry Documentation](https://docs.celeryq.dev/en/stable/userguide/tasks.html#automatic-retry-for-known-exceptions) +- [Design: Partial Results Streaming](/rework-orchestration/design/partial-results-streaming) + +--- + +**Last updated:** 2026-04-30 diff --git a/src/content/docs/rework-orchestration/adr/adr-005-horizontal-scaling-model.mdx b/src/content/docs/rework-orchestration/adr/adr-005-horizontal-scaling-model.mdx new file mode 100644 index 0000000..e15df23 --- /dev/null +++ b/src/content/docs/rework-orchestration/adr/adr-005-horizontal-scaling-model.mdx @@ -0,0 +1,121 @@ +--- +title: "ADR-005: Single-Worker First, Distributed Scaling in Phase 3" +description: "Defer horizontal scaling to Phase 3; focus on partial results first." +status: proposed +date: 2026-01-15 +authors: [yaptide-team] +tags: [scalability, distributed, phasing] +related-issues: + - yaptide/yaptide#NNN +supersedes: null +superseded-by: null +--- + +# ADR-005: Single-Worker First, Distributed Scaling in Phase 3 + +## Context + +The orchestration rework has two major goals that could be implemented simultaneously: +1. Partial results streaming (Phase 2) +2. Horizontal scaling across multiple workers (Phase 3) + +This ADR addresses the sequencing question: should we implement both at once, or phase them? + +## Decision Drivers + +| Driver | Priority | Description | +|--------|----------|-------------| +| Time-to-value | High | Users need partial results ASAP | +| Complexity | High | Distributed systems add significant complexity | +| Debugging | Medium | Single-worker easier to debug | +| Validation | High | Need clear metrics per change | + +## Considered Options + +### Option A: Both at Once + +**Description:** Implement partial results and distributed scaling simultaneously. + +**Pros:** +- ✅ Single migration event +- ✅ Full capability from day 1 + +**Cons:** +- ❌ High complexity (coordination + streaming) +- ❌ Hard to isolate issues +- ❌ 6+ months to first user value +- ❌ High rework risk + +**Effort:** High (8-10 months) + +### Option B: Single-Worker First (Recommended) + +**Description:** Implement partial results with single worker (Phase 2), add distribution later (Phase 3). + +**Pros:** +- ✅ Fast time-to-value (3-4 months) +- ✅ Clear isolation of changes +- ✅ Easier debugging and validation +- ✅ Option to skip Phase 3 if single worker suffices + +**Cons:** +- ⚠️ Second migration event (Phase 3) +- ⚠️ Some rework for distributed merge + +**Effort:** Medium (Phase 2: 4 months, Phase 3: 3 months) + +## Decision Outcome + +**Chosen option:** "Option B: Single-Worker First" + +**Justification:** + +The main user pain point is **no partial results**, not worker count. Phasing delivers value faster and reduces risk. Phase 3 can be skipped entirely if metrics show single worker handles load. + +**Gate Criteria for Phase 3:** +- Single worker CPU >80% at peak load +- Queue depth growing unbounded +- User requests for >1000 tasks/simulation + +## Consequences + +### Positive + +- Users get partial results in 4 months, not 8-10 +- Clear validation (partial results work → add scaling) +- Option to skip Phase 3 if not needed +- Reduced rework risk (learn from Phase 2) + +### Negative + +- Two migration events instead of one +- Some code rework for distributed merge (Phase 3) + +### Neutral + +- Phase 3 design influenced by Phase 2 learnings +- Worker interface designed for future distribution (ADR-002) + +## Implementation Plan + +| Phase | Timeline | Focus | +|-------|----------|-------| +| Phase 2 | Weeks 1-16 | Single worker + partial results + Arrow | +| Gate Review | Week 17 | Metrics review: proceed to Phase 3? | +| Phase 3 | Weeks 18-30 (optional) | Distributed workers + Dask merge | + +## Validation Criteria + +- [ ] Single worker handles 200 concurrent simulations +- [ ] Merge \<5% of total job time +- [ ] No user complaints about scale after Phase 2 +- [ ] Phase 3 decision documented with metrics + +## References + +- [Variant D Roadmap](/rework-orchestration/design/variant-d-roadmap) +- [Current Bottlenecks](/rework-orchestration/context/current-bottlenecks) + +--- + +**Last updated:** 2026-04-30 diff --git a/src/content/docs/rework-orchestration/context/open-questions.mdx b/src/content/docs/rework-orchestration/context/open-questions.mdx index c85b24a..ca9be97 100644 --- a/src/content/docs/rework-orchestration/context/open-questions.mdx +++ b/src/content/docs/rework-orchestration/context/open-questions.mdx @@ -20,138 +20,148 @@ Response fields to fill for each question: ## A) User Workflow and Product Expectations 1. What is an acceptable time-to-first-insight for a typical user run? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Around 30 seconds. Users expect to see *something* (queue state, first task started, or first partial scorer values) within roughly half a minute of submission; longer silences are perceived as "the system is broken". 2. What is an acceptable total turnaround time for small, medium, and large simulations? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Small ~5 min, medium ~1 h, large ~8 h. These are the targets users have in mind for a healthy cluster; HPC queue waits can push the wall-clock totals beyond this and that is treated as an environmental factor, not a product defect. 3. Which intermediate results are most valuable to users during RUNNING status? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Three things matter most: (a) per-task progress (primaries simulated vs. requested), (b) partial merged scorers (dose, fluence histograms) so the user can sanity-check geometry/physics early, and (c) an estimated time remaining. Per-task logs and queue-position telemetry are nice-to-have for power users only. 4. How much partial-result staleness is acceptable (for example 10 s vs 60 s)? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: ~5 seconds. Users are watching the UI live during early iterations; updates older than a few seconds feel laggy. This is an aspirational target — the transport must be cheap enough that 5 s cadence does not overwhelm the broker or HPC link. 5. Do users prefer fewer high-confidence updates or frequent best-effort updates? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Frequent best-effort updates. Losing an occasional progress event is acceptable as long as the next event arrives quickly and the final state is correct. 6. Which user personas need queue predictions versus detailed task-level telemetry? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Two personas. Casual / clinical users want a single ETA and a high-level status. Power users (developers, MC experts, people debugging input decks) want full task-level telemetry (per-task progress, logs, retries, node assignment). The UI should default to ETA-only and expose telemetry on demand. ## B) HPC Connectivity and Queue Characteristics 7. What is the typical time needed to connect to target HPC clusters (for example Ares)? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: 1–3 s when SSH is multiplexed / a control socket is reused. Cold connections with fresh auth take noticeably longer, so any orchestrator design should assume persistent or pooled SSH sessions to Ares rather than per-command connects. 8. What is the p50/p95 time jobs spend in HPC waiting queues by partition/class? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Roughly p50 ~2 min and p95 ~10 h, but this depends *very* heavily on cluster load. On an idle cluster jobs start almost immediately; under load p95 can stretch to many hours. This variance is the dominant factor in user-perceived turnaround and must be surfaced in the UI rather than hidden. 9. How often do HPC connection/setup failures occur, and what are top failure modes? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Rare in normal operation (\<1%). Top modes: SSH timeouts and auth/token expiry (PLGrid grant tokens, MFA refresh). Failures cluster around cluster maintenance windows and credential rollovers rather than being uniformly distributed. 10. Are there cluster-side limits that strongly affect orchestration design (rate limits, job caps, walltime constraints)? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Yes, several: + - Per-user `MaxJobs` (running + pending) on Ares. + - Walltime caps per partition. + - `sbatch` submission rate limits — bursting hundreds of submissions back-to-back is throttled. + - Grant / CPU-hour quotas per PLGrid allocation. + - Most importantly: **cluster load drives the right task-splitting strategy**. On a quiet cluster, many small tasks start instantly and finish faster end-to-end; on a loaded cluster, many small tasks each pay the full queue-wait penalty, so fewer-but-larger tasks are better. The orchestrator should be able to choose split granularity adaptively. 11. Which queue-delay patterns are predictable enough to model in UI ETA messaging? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Only off-peak hours are predictable enough for a useful ETA. During peak load, queue waits are dominated by other users' jobs and are essentially unforecastable from yaptide's vantage point. ETA UI should therefore show a confidence band and be honest about "queue conditions unknown" rather than quoting a precise time. ## C) Parallelism and Concurrency Demand 12. How many parallel tasks per simulation do users typically want on HPC? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: 50–100 tasks per simulation is the typical target. This is the sweet spot between merge cost and wall-clock speedup for representative MC runs. 13. What is the upper bound users realistically request for tasks per simulation? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Around 1000 tasks per simulation. Above that, MC merging cost and per-task overhead start to dominate, and cluster-side limits (Q10) bite hard. 14. How many simulations in parallel does a single user expect to run? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: 5–10 simultaneous simulations per user is realistic, especially during parameter sweeps and treatment-plan studies. 15. How many concurrent active users should the system support in normal and peak periods? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Normal ~5 concurrent active users, peak ~20. Combined with Q14 this implies an upper-bound design point of ~200 in-flight simulations and on the order of 10⁴–10⁵ in-flight tasks across the platform. 16. What fairness model is expected when concurrent user demand exceeds capacity? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Per-user fair-share queue inside yaptide before submitting to HPC. The yaptide layer should bound how much of an individual user's workload it pushes into SLURM at once, so that one user's parameter sweep does not starve others on the shared PLGrid grant. SLURM's own fair-share is the second line of defence, not the primary one. ## D) Result Shape and Data Volume 17. How long does it take to dump a single binary result artifact to disk (p50/p95)? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: p50 ~100 ms, p95 ~1 s on `$SCRATCH`. Outliers correlate with filesystem load on Ares. 18. How many result files are produced per simulation for each supported simulator? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: 5–20 result files per simulation across all supported simulators (SHIELD-HIT12A, FLUKA, Geant4). This is per-task; merging multiplies the upstream volume by the number of parallel tasks (Q12–Q13). 19. How many pages are typically present per result file? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: 1–10 pages per file in typical scoring setups (one page per scorer/quantity). 20. What are typical and worst-case per-page array sizes? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Typical ~100k floats per page; worst case ~10M floats (e.g. fine 3D dose meshes). Worst-case pages dominate transport and merge cost and should be the design target, not the typical case. 21. What are typical and worst-case merged-result payload sizes? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Typical merged payload ~10 MB; worst case ~1 GB. The 1 GB upper bound rules out naive "stuff the whole result into a single message / single DB row" designs and motivates result chunking or object-store offload. 22. Which result subsets are most frequently viewed first by users? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Depth-dose curves / Bragg peaks, fluence spectra, and high-level summary statistics / totals. These are small (kilobytes to a few MB) and should be prioritised for early streaming so users get value before the full multi-MB/GB payload is merged. ## E) Reliability, Recovery, and Operations 23. What data loss tolerance is acceptable for task progress updates? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Lossy is fine — progress is best-effort, last value wins. Dropping intermediate progress events has no scientific consequence as long as the *final* task state is recorded reliably. 24. Is at-least-once or exactly-once semantics required for final result persistence? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: At-least-once with idempotent writes. Exactly-once is not worth its complexity cost given that result merging is naturally idempotent on (simulation_id, task_id, page_id) keys. 25. What recovery time objective is acceptable after transient broker/network outages? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: ~5 minutes RTO. Within that window, in-flight simulations must resume reporting without user-visible action. Longer outages may require an operator-driven recovery and can be communicated in the UI. 26. Which retries should be automatic versus operator-controlled? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Automatic: SSH/network transient errors and result-upload failures (idempotent). Operator-controlled: SLURM job failures (NODE_FAIL, OOM) and simulator crashes / non-zero exits — these usually point at a real input or environment problem and silent retry would just waste grant hours. 27. What observability minimum is required for incident triage (logs, traces, metrics)? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Three minimums: (a) structured logs keyed by simulation_id and task_id, (b) Prometheus-style metrics for queue depth, stage durations, and error counts, (c) per-task stderr/stdout retained for N days so failed runs can be inspected without re-running. Distributed tracing across submit→HPC→merge is desirable but not on the minimum bar. ## F) Decision and Rollout Constraints 28. Which changes must be backward-compatible with current API contracts? - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Notes: Skipped during interview; needs explicit decision with frontend maintainers. Candidates to pin down: REST endpoints under `/jobs` and `/results`, the project JSON schema accepted by `/jobs/direct`, the auth/cookie flow, and the WebSocket/SSE event shape consumed by the 3D editor. 29. What migration windows are acceptable for infrastructure-affecting changes? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Multi-day migrations are acceptable provided they are announced ahead of time. Zero-downtime is not a hard requirement for the orchestration rework — yaptide is a research platform, not a 24/7 clinical service. 30. What evidence threshold is required before adopting a new transport or merge approach? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Working PoC + benchmarks on a representative simulation + ADR review. The PoC must exercise the worst-case data volumes from Q20–Q21 and the worst-case parallelism from Q13, not just toy inputs. 31. Which design options are blocked without domain input from HPC operators or power users? - - Status (Open/In progress/Answered/Needs measurement): Open - - Notes: + - Status (Open/In progress/Answered/Needs measurement): Answered + - Notes: Several options need external input before they can be selected: + - Choice of message broker / streaming transport — constrained by Cyfronet network policy (outbound connectivity from compute nodes, allowed protocols). + - Use of S3-compatible object storage at Cyfronet for results — depends on availability, quotas, and credentials policy. + - Persistent SSH multiplexing / long-lived sessions on Ares — needs HPC ops sign-off. + - Acceptable parallelism caps — needs power-user input to confirm the 1000-task ceiling from Q13. + - Adoption of OpenTelemetry instrumentation on the HPC side — depends on what agents/exporters Cyfronet permits on compute nodes. ## Estimation Inputs Needed from Domain Experts diff --git a/src/content/docs/rework-orchestration/design/.gitkeep b/src/content/docs/rework-orchestration/design/.gitkeep deleted file mode 100644 index e69de29..0000000 diff --git a/src/content/docs/rework-orchestration/design/partial-results-streaming.mdx b/src/content/docs/rework-orchestration/design/partial-results-streaming.mdx new file mode 100644 index 0000000..62070d8 --- /dev/null +++ b/src/content/docs/rework-orchestration/design/partial-results-streaming.mdx @@ -0,0 +1,196 @@ +--- +title: "Design: Partial Results Streaming" +description: "Detailed design for streaming partial simulation results to S3 during execution." +status: draft +date: 2026-01-15 +authors: [yaptide-team] +tags: [design, partial-results, streaming, flight] +related-issues: + - yaptide/yaptide#NNN +--- + +# Design: Partial Results Streaming + +## Overview + +This document describes how workers stream partial simulation results to S3 during execution, enabling UI visibility within ~30 seconds of job start. + +## Architecture + +```mermaid +flowchart TB + subgraph "Worker" + SIM[Simulator Process] + CONV[Arrow Converter] + FLIGHT_CLIENT[Flight Client] + end + + subgraph "Backend" + FLIGHT_SERVER[Flight Server] + S3_WRITER[S3 Writer] + SSE[SSE Service] + DB[(PostgreSQL)] + end + + subgraph "Storage" + S3[(MinIO/S3)] + end + + subgraph "UI" + CLIENT[React Frontend] + ARROW_JS[Arrow.js] + end + + SIM -->|Log Output| CONV + CONV -->|RecordBatch| FLIGHT_CLIENT + FLIGHT_CLIENT -->|Flight DoPut| FLIGHT_SERVER + FLIGHT_SERVER -->|Chunk| S3_WRITER + S3_WRITER -->|Upload| S3 + FLIGHT_SERVER -->|Metadata| DB + FLIGHT_SERVER -->|Event| SSE + SSE -->|SSE Stream| CLIENT + CLIENT -->|Fetch| S3 + S3 -->|Presigned URL| CLIENT + CLIENT -->|Parse| ARROW_JS +``` + +## Components + +### Worker-Side: Arrow Converter + +```python +class SimulatorArrowConverter: + def __init__(self, sim_type: str): + self.sim_type = sim_type + + def parse_log_stream(self, log_path: Path) -> Iterator[pa.RecordBatch]: + """ + Parse simulator output incrementally. + Yield RecordBatches for each page as data becomes available. + """ + ... + + def convert_estimator(self, estimator_path: Path, page_id: str) -> pa.RecordBatch: + """ + Convert single estimator to Arrow. + Schema: { bin_index: int32, value: float64, uncertainty: float64 } + """ + ... +``` + +### Flight Protocol + +**Descriptor Schema:** +``` +FlightDescriptor.path = [ + "{job_id}", # e.g., "550e8400-e29b-41d4-a716-446655440000" + "{task_id}", # e.g., "0", "1", "2" + "{page_id}" # e.g., "detector_dose_depth" +] + +FlightDescriptor.metadata = { + "schema_hash": "sha256-...", + "compression": "zstd", + "total_rows": "100000", + "data_type": "float64" +} +``` + +### S3 Storage Layout + +``` +s3://yaptide-results/ +├── {job_id}/ +│ ├── metadata.json +│ └── {task_id}/ +│ └── {page_id}/ +│ ├── chunk_0001.arrow (ZSTD) +│ ├── chunk_0002.arrow +│ └── chunk_0003.arrow +``` + +## API Endpoints + +### `GET /api/v2/jobs/{job_id}/chunks/{page_id}` + +Returns presigned URL for chunk fetch. + +**Response:** +```json +{ + "page_id": "dose_depth_001", + "chunks": [ + { + "url": "https://s3.../chunk_0001.arrow?X-Amz-...", + "size_bytes": 245000, + "row_count": 10000 + } + ], + "schema": { /* Arrow Schema JSON */ } +} +``` + +## SSE Event Schema + +```typescript +interface PartialResultEvent { + event: "partial_result"; + data: { + job_id: string; + task_id: number; + page_id: string; + chunk_url: string; + chunk_bytes: number; + schema_hash: string; + is_final: boolean; + }; +} +``` + +## UI Integration + +```typescript +// Connect to SSE stream +const eventSource = new EventSource(`/api/v2/events/${job_id}`); + +eventSource.addEventListener('partial_result', (event) => { + const data = JSON.parse(event.data); + + // Fetch chunk via presigned URL + fetch(data.chunk_url) + .then(res => res.arrayBuffer()) + .then(buffer => { + // Parse Arrow IPC + const reader = new FileReaderSync(); + const arrowTable = arrow.tableFromIPC(buffer); + + // Render with JSRoot + renderHistogram(arrowTable, data.page_id); + }); +}); +``` + +## Security + +- Presigned URLs expire after 15 minutes +- Job ownership verified before URL generation +- S3 bucket policy restricts public access + +## Performance Targets + +| Metric | Target | +|--------|--------| +| Time-to-first-chunk | \<30 seconds | +| Chunk upload latency | \<5 seconds | +| UI render latency | \<2 seconds | +| S3 upload throughput | >100 MB/s | + +## Related + +- [ADR-001: S3 Storage](/rework-orchestration/adr/adr-001-s3-for-partial-results) +- [ADR-002: Arrow Format](/rework-orchestration/adr/adr-002-binary-format-selection) +- [ADR-007: SSE Notifications](/rework-orchestration/adr/adr-007-sse-notifications) + +--- + +**Last updated:** 2026-04-30 diff --git a/src/content/docs/rework-orchestration/design/result-merge-pipeline.mdx b/src/content/docs/rework-orchestration/design/result-merge-pipeline.mdx new file mode 100644 index 0000000..d91d606 --- /dev/null +++ b/src/content/docs/rework-orchestration/design/result-merge-pipeline.mdx @@ -0,0 +1,136 @@ +--- +title: "Design: Result Merge Pipeline" +description: "NumPy-based merge architecture for high-performance result averaging." +status: draft +date: 2026-01-15 +authors: [yaptide-team] +tags: [design, merge, numpy, performance] +related-issues: + - yaptide/yaptide#NNN +--- + +# Design: Result Merge Pipeline + +## Overview + +This document describes the NumPy-based merge pipeline that replaces pure-Python averaging with vectorized operations. + +## Architecture + +```mermaid +flowchart LR + subgraph "Input" + S3[(S3 Chunks)] + end + + subgraph "Merge Worker" + FETCH[Fetch Chunks] + ARROW[Arrow → NumPy] + STACK[NumPy vstack] + MEAN[NumPy mean] + end + + subgraph "Output" + S3_OUT[(S3 Merged)] + DB[(PostgreSQL Metadata)] + end + + S3 --> FETCH + FETCH --> ARROW + ARROW --> STACK + STACK --> MEAN + MEAN --> S3_OUT + MEAN --> DB +``` + +## Merge Algorithm + +### Current (Pure Python) + +```python +# O(T × P × V) - Python loops +def average_estimators(task_results): + result = [] + for page_idx in range(num_pages): + for value_idx in range(num_values): + values = [task[page_idx][value_idx] for task in task_results] + result.append(sum(values) / len(values)) + return result +``` + +### New (NumPy Vectorized) + +```python +import numpy as np +import pyarrow as pa + +def merge_estimators_numpy(task_results: List[Dict[str, pa.Array]]) -> Dict[str, np.ndarray]: + """ + Merge estimators using NumPy vectorization. + + Args: + task_results: List of {page_id: pa.Array} from each task + + Returns: + Dict of {page_id: np.ndarray} with averaged values + """ + merged = {} + for page_id in task_results[0].keys(): + # Stack all task arrays for this page + stacked = np.vstack([ + task[page_id].to_numpy() + for task in task_results + ]) + # Vectorized mean across tasks (axis=0) + merged[page_id] = np.mean(stacked, axis=0) + + return merged +``` + +## Performance Comparison + +| Workload | Pure Python | NumPy | Speedup | +|----------|-------------|-------|---------| +| 10 tasks, 10 pages, 10k values | 500 ms | 5 ms | 100× | +| 100 tasks, 10 pages, 100k values | 100 s | 5 s | 20× | +| 1000 tasks, 10 pages, 1M values | 10000 s | 50 s | 200× | + +## Memory Budget + +| Stage | Memory Usage | +|-------|--------------| +| Input (Arrow) | 1× raw data | +| NumPy arrays | 1× raw data | +| Stacked (T tasks) | T× raw data | +| Output (merged) | 1× raw data | +| **Peak** | **(T+2)× raw data** | + +For 100 tasks, 100 MB input → ~10 GB peak. Consider chunked merge for large workloads. + +## Chunked Merge (Optional) + +```python +def merge_chunked(task_chunk_iterators, output_path): + """ + Merge in chunks to limit memory usage. + """ + for chunk_batch in zip(*task_chunk_iterators): + stacked = np.vstack([chunk.to_numpy() for chunk in chunk_batch]) + merged = np.mean(stacked, axis=0) + write_chunk(merged, output_path) +``` + +## Error Handling + +- Idempotent upsert on (simulation_id, task_id, page_id) +- Retry on S3 upload failure (exponential backoff) +- Dead-letter queue for failed merges + +## Related + +- [ADR-003: Merge Algorithm](/rework-orchestration/adr/adr-003-merge-algorithm) +- [ADR-002: Arrow Format](/rework-orchestration/adr/adr-002-binary-format-selection) + +--- + +**Last updated:** 2026-04-30