Deploy workflow workers as an ephemeral ECS Fargate fleet — Closes #33#34
Draft
conradbzura wants to merge 41 commits into30-add-preprocessing-indexing-workflowfrom
Draft
Deploy workflow workers as an ephemeral ECS Fargate fleet — Closes #33#34conradbzura wants to merge 41 commits into30-add-preprocessing-indexing-workflowfrom
conradbzura wants to merge 41 commits into30-add-preprocessing-indexing-workflowfrom
Conversation
Introduce the foundational types for the preprocessing/indexing
subsystem that serves Gosling Designer's client-side fetchers:
- JobRecord, JobStatus, and ArtifactKind model the persistent
workflow state that lives in the Mongo jobs collection.
- cache_key and workflow_key are pure functions that derive
content-addressed keys from upstream md5 plus pipeline/processor
versions. A byte change upstream with a refreshed md5 naturally
invalidates cached artifacts, and a version bump forces fresh
processing without any manual purge.
- Processor is the ABC every preprocessing pipeline subclasses. A
ProcessorRegistry resolves a file document to the first matching
processor by format name. PassthroughProcessor covers formats
Gosling can consume directly (CSV, TSV, bigWig).
No cache backend, executor, or HTTP surface is wired up yet — those
land in follow-on commits that depend on these types.
CacheBackend is a byte-range-aware pluggable store for workflow artifacts. LocalFsCache writes keys as relative paths under a configured root, puts via os.replace for atomicity, and streams reads in 64 KiB chunks with inclusive-range support — Gosling's BAM, tabix, and bbi fetchers all issue Range requests that must pass through to the cache transparently. Key validation rejects path traversal so malformed input cannot escape the cache root. The workflow mutex uses a Mongo partial unique index on workflow_key filtered to active statuses, mirroring the atomic-upsert idiom already used by services.locks for the sync and cutover locks. claim_workflow either inserts a fresh PENDING record or — on DuplicateKeyError — attaches to the existing active job so concurrent /data and /index requests converge on one workflow per source file. A stale-threshold reclamation path covers the case where a worker crashes without releasing the mutex.
JobExecutor is the abstract interface bridging /data and /index to the processing pipeline. WoolExecutor is the concrete backend: ensure_workflow claims or attaches to the per-source mutex, and on a fresh claim fires a background task that dispatches the processor through a Wool WorkerPool. Long-running jobs are bounded with asyncio.wait_for at the call site (Wool's own @routine timeout is dispatch-only). A short retry loop tolerates the window between WorkerPool startup and discovery catching up. Two concrete processors land alongside: - BamIndexProcessor: SAM->BAM conversion (when needed) plus samtools sort and samtools index. Emits the sorted BAM and its BAI. - TabixIntervalProcessor: covers VCF, GFF, BED, BroadPeak, NarrowPeak with decompress + sort + bgzip + tabix; GTF adds a gffread conversion to GFF3 first; bigBed is converted via bigBedToBed before the BED pipeline. Both processors consult the cache per artifact kind before running the corresponding stage, giving partial-commit recovery for free: a stage-2 failure leaves the stage-1 artifact in cache, and the next retry skips the expensive stage. fetcher.download_source centralizes DRS/HTTPS source retrieval so processors can focus on the tool-invocation pipeline. An integration marker is registered for tests that shell out to real samtools or htslib, gated off by default.
Both /data and /index now converge on ensure_workflow when a file's
processor requires preprocessing and the requested artifact is not
yet in cache. The router returns 202 with Location: /jobs/{job_id}
and Retry-After so clients can poll. On cache hit the artifact
streams directly with byte-range support. Passthrough formats (CSV,
TSV, bigWig) fall through to the existing direct-streaming path
unchanged.
/index preserves the pre-existing 4DN sidecar fast path — files
whose extra.extra_files or extra.fourdn.extra_files carries a tbi or
beddb entry continue to serve from upstream without entering the
workflow. Files with no sidecar and no processor emitting an index
artifact return 404.
/jobs/{job_id} is a new endpoint exposing the persisted JobRecord:
status, stages_done, artifact cache keys, optional progress, and
error on failure.
The FastAPI lifespan brings up a Wool WorkerPool, a LocalFsCache
under SYNC_DATA_DIR, and the processor registry (passthrough plus
BAM and tabix). When SYNC_DATA_DIR is unset the subsystem stays
disabled and routers fall back to direct streaming — keeps minimal
test deployments and older configurations working unchanged.
The api container now installs samtools, tabix, bcftools, gffread, and UCSC bigBedToBed so the Wool workers spawned by the API process can shell out to the preprocessing tools without any separate image. apt-shipped packages cover everything but bigBedToBed, which is pulled as a UCSC release binary. create-indexes.js adds three indexes to the new jobs collection: a partial unique index on workflow_key (filtered to active statuses) that backs the per-source mutex, a unique index on job_id for /jobs lookups, and a compound status+updated_at index that the stale-lock reclamation path scans.
HEAD probes on /data and /index no longer trigger preprocessing. The previous behavior bound HEAD and GET to the same handler with no method check, so monitoring probes or prefetch tools claiming to be side-effect-free were actually dispatching samtools and tabix jobs via ensure_workflow. Cache-miss now returns 404 on HEAD; clients must issue an explicit GET to trigger processing. Alongside the HEAD fix, a ?raw query parameter lets clients opt out of the preprocessing pipeline entirely. With raw=false (the default) the router returns the visualization-ready artifact (cache hit or a 202 with a Location header for GET). With raw=true the router serves the upstream bytes directly for /data, and restricts /index to upstream sidecars only (404 when none exists). Passthrough formats (CSV, TSV, bigWig) behave the same regardless of raw since there is no processed output distinct from the upstream file. The default is raw=false so Gosling and CVH clients get preprocessed artifacts without needing to know about the flag.
…eaders The VCF sort path ran GNU sort over the whole file, which scrambled the ## header block into the data block and produced artifacts tabix could not index. Fix is a two-pass pipeline for VCF: grep '^#' emits the header block, then grep -v '^#' pipes the data block through sort, and the brace group joins both streams into bgzip. Headers end up at the top of the output as tabix requires. Neither GNU sort nor samtools sort had a memory cap, so on Lambda instances sized 512 MB - 2 GB either could OOM the worker on large inputs. Add CFDB_SORT_MEMORY_CAP and CFDB_SAMTOOLS_MEMORY_CAP env vars (both default 256M) and plumb them into sort -S / samtools sort -m with -T pointed at the per-job workdir so spillage lands on disk instead of in memory. LC_ALL=C on sort makes collation byte-wise and independent of the worker's locale. The format-specific pipelines are now single streaming shell pipes rather than separate decompress / sort / bgzip invocations writing intermediate files. Two intermediates remain where the tool can't stream input: VCF (two passes for header/body split) and GTF (gffread reads files). Other formats flow zcat -f or bigBedToBed straight into sort and bgzip without materializing intermediate text. GFF3 was silently bypassed because _TABIX_PRESET had only GFF, not GFF3, while the ontology mapper emits GFF3 for .gff3 files. Add the GFF3 key pointing at the gff preset so .gff3 inputs flow through the same pipeline. Tests exercise each per-format pipeline shape (grep-split VCF, gffread GTF, bigBedToBed bigBed, generic zcat-f text), assert the -S/-T/-m flags and LC_ALL=C prefix are present, and confirm the tabix preset matches the format. Integration tests add an end-to-end VCF run that verifies the output header block survives and data lines are position-sorted. Fixes R2-B2, R2-B3, and R2 non-blocking F (locale-sensitive sort).
release_workflow and record_stage_complete updated the jobs document
by job_id alone, with no filter on current status. A worker that
stalled past the one-hour stale threshold would get its record
flipped to FAILED by a subsequent claim_workflow call, but when the
stalled worker eventually woke and called its own release, the
update would overwrite the successor-owned status and leak stage
completions onto a record the new claimant now holds.
Both writes now filter on status in {pending, running}. A late call
from a reclaimed worker matches zero docs and becomes a no-op; a
warning is logged so stomping attempts remain visible in operations.
mark_running was already correctly fenced on status=pending and is
unchanged.
The /data and /index routers reached across package boundaries to import workflows.executor._extract_identity via in-function imports, and the test suite referenced the private name directly — both symptoms of a helper that was always public in practice despite its leading underscore. Drop the underscore, move the import to module top in both routers, and update the test file accordingly. Resolves the test-guide violation (tests MUST NOT reference private symbols) and the leaky-abstraction smell in the router imports in one go. The function's behavior, signature, and docstring are unchanged.
The stale-reclaim test used lock._utcnow() to build a backdated timestamp. Per the test guide, tests MUST NOT reference private functions. The equivalent datetime.now(timezone.utc) expression is already available from stdlib; use it directly.
The three original tests in test_index.py dropped the <method_name> prefix required by the test guide's naming pattern test_<method_name>_should_<outcome>[_when_<condition>]. Prefix them all with test_stream_index_file_should_... so the target method is clear from the test name alone and pytest output reads consistently with the rest of the suite.
download_source is a public function but was only ever exercised indirectly via mocks in the BAM and tabix processor tests — the missing-access_url, direct-HTTPS, and DRS-resolution code paths were all effectively untested. New test module covers: ValueError on missing access_url, HTTPS passthrough with assertion that fetch_drs_object is not called, drs:// URIs resolving through fetch_drs_object + get_https_download_url before streaming, and parent-directory creation for nested dest paths.
artifact_kind_str was a one-line wrapper exposing ArtifactKind.value with zero callers anywhere in the codebase. Drop the function and its now-unused ArtifactKind import. Callers that need the string form can call .value directly on the enum member.
Both routers had a near-byte-identical _stream_cache_entry function handling Range parsing, HEAD-vs-GET branching, and the Content-Range header — so any future change to range behavior needed to land in lockstep across both files. Move the helper into a new module, routers._cache_stream, and import it from both callers. The helper now accepts a media_type kwarg (defaulting to application/octet-stream) so callers can override if they want to signal a specific type for cached artifacts. No behavior change.
identity_dcc, shell_quote, run_argv, run_shell, and copy_from_cache were byte-identical duplicates between the BAM and tabix processor modules. Move them into workflows.processors._tools and re-export in each processor under their existing local alias so test mocks that patch the module-level names keep working without modification. No behavior change. The processor modules now focus on pipeline shape and let the shared subprocess / cache-staging machinery live in one place.
Four related shutdown and mid-run hygiene issues are resolved
together in the executor's _run_workflow loop and the FastAPI
lifespan:
1. record_stage_complete failures used to leak the mutex — the
release_workflow call was unreachable when any stage commit
raised. The loop is now wrapped in try/finally so release always
fires with the appropriate terminal status.
2. asyncio.CancelledError bypassed except Exception, leaving
cancelled jobs stuck in RUNNING until the 1h stale threshold. A
dedicated except branch now marks the job FAILED with a cancel
reason before re-raising so cancellation still propagates.
3. The per-job workdir under SYNC_DATA_DIR/jobs/{id}/ was never
cleaned up, slowly filling the Lambda /tmp partition. shutil.rmtree
in the finally block (via asyncio.to_thread) removes it on every
exit path.
4. The FastAPI lifespan used to close the WorkerPool while fire-and-
forget tasks were still in flight, so those tasks would dispatch
into a closed pool and try to release against a closed Mongo
client. The lifespan now drains pending workflow tasks with a
10-second cap before exiting the pool context.
Tests add coverage for workdir cleanup on success and for the
release-still-fires path when record_stage_complete raises.
The processors reached for identity_dcc from _tools and separately pulled local_id and md5 from file_meta dict access. extract_identity (made public in an earlier commit) already returns the full (dcc, local_id, md5) triple over the same preference chain. Drop identity_dcc from _tools, call extract_identity in BamIndexProcessor and TabixIntervalProcessor, and destructure the triple. One helper, one call site, one contract.
The "# --- Workflow subsystem ---" divider is a section banner; the style guide forbids them because they duplicate what member naming and ordering should carry. The substantive comment immediately below (explaining the SYNC_DATA_DIR=unset fallback) stays.
…MING_CASE The style guide reserves SCREAMING_CASE for values whose binding AND referent never change. The previous list comprehension was SCREAMING- cased but its referent was a mutable list, so callers could in principle mutate it. Switching to a tuple makes the value immutable and the name legitimate. The $in Mongo operand accepts either type, so the downstream call sites are unchanged.
The style guide mandates public method ordering static, class, instance within a visibility tier. Processor's public block had instance (needs_processing) then classmethod (artifact_kinds_produced) then abstractmethod instance (run). Move artifact_kinds_produced above needs_processing so the public block reads class then instance then instance as required.
The bare "from cfdb.workflows.processors import _tools" plus the "_ = _tools" discard existed only to keep a module handle reachable for an imagined patch target. Nothing in either processor actually touches the module object — the re-exported aliases from the "from _tools import ..." block cover every use — so the suppression line was a workaround for a warning caused by dead code. Remove both.
The cancelled-bool scaffolding in _run_workflow had no observable effect: the only place it was read was a trailing "if cancelled: pass" in the finally block whose comment noted the raise was handled by the already-executed "raise" inside each except asyncio.CancelledError branch. Remove the flag, both assignments, and the terminal if-block. Cancellation propagation is unchanged — the "raise" inside each except branch still escapes through the finally after the terminal release write.
bigBedToBed ships as a UCSC release binary in the api image; its sibling bedToBigBed was missing. The integration test fixture builder uses bedToBigBed to materialize a deterministic sample.bb from a tiny sorted BED — without the tool the bigBed integration test skips. Add it alongside bigBedToBed so CI exercises the full bigBed pipeline end-to-end.
Closes the coverage gap left by the two existing integration tiers:
TestWoolExecutorPickleBoundary exercises Wool dispatch with a stub
processor that does no I/O, and the per-processor integration tests
run real samtools/htslib but call processor.run directly in the
test's event loop. Neither exercises the full claim to dispatch to
cache path from inside a Wool worker subprocess.
The new tests/integration package covers that gap:
- fixtures/make_samples.py builds deterministic ~1 MB of sample
data per session (SAM, BAM, VCF.gz, BED.gz, GFF3.gz, GTF.gz,
narrowPeak.gz, broadPeak.gz, optional bigBed). The builders
shell out to samtools / gzip / bedToBigBed; seeds are fixed so
two runs produce byte-identical files and content-addressed
cache keys stay stable.
- conftest.py runs an http.server background thread serving the
sample directory, so the Wool worker process can download over
real HTTPS the same way production does. Mocking
fetcher.download_source in the test process is insufficient —
the worker re-imports its own copy and sees the real function.
- test_processor_e2e.py parametrizes per format and asserts cache
population, samtools quickcheck on the cached BAM, tabix query
on staged bgz/tbi pairs, GFF3-attribute-syntax on converted
GTF, and VCF header-block preservation end-to-end.
- test_router_e2e.py drives the stream_file / stream_index_file /
get_job_status handlers against a live executor: 202-then-200
workflow, Range requests on cached artifacts, HEAD-no-dispatch,
/jobs/{id} reporting, and the cached-index serve path.
- test_concurrency.py asserts the Mongo partial-unique-index
mutex dedupes two asyncio.gather'd /data requests onto a single
workflow and converges /data + /index onto one workflow too.
Runtime is ~60 s under the integration marker with function-scoped
pools for cleanest isolation. Tests that require gffread or
bedToBigBed skip gracefully when the tool is absent.
The tabix processor assembles multi-stage shell pipelines like "zcat -f | sort | bgzip > out.bgz" and invokes them through run_shell. asyncio.create_subprocess_shell runs via /bin/sh, which on Debian (the Dockerfile.api base) is dash — dash does not support pipefail. A non-terminal stage failure (truncated gzip, gffread syntax error, bigBedToBed read error, grep out-of-memory) exits non-zero but the pipeline's overall returncode is the last stage's. bgzip happily consumes whatever partial bytes arrived and exits 0, run_shell returns success, and cache.put commits a truncated artifact under its content-addressed key. Every subsequent request then serves the corrupt pair until the upstream md5 changes. Invoke the shell via bash -o pipefail -c so any stage's non-zero exit surfaces at the pipeline boundary. bash is already in the Dockerfile.api image. The BAM processor's samtools view | samtools sort pipeline gains the same guarantee. New tests assert that run_shell traps both final-stage and non-terminal-stage failures, so a regression would fail loudly rather than leak through the cache.
bam.py, tabix.py, and jobs.py each defined a module-level logger = logging.getLogger(__name__) with zero callers in the respective module. Drop each binding and the logging import. When structured logging lands on the preprocessing path, re-add targeted log statements rather than a speculative handle.
The FastAPI lifespan was reaching into WoolExecutor._pending_tasks, a leading-underscore attribute explicitly marked private, to implement the shutdown drain. That tightly couples the API entry point to an executor implementation detail and leaks the private marker's meaning. Expose drain(timeout) as a public method on JobExecutor (abstract) and WoolExecutor (concrete). The lifespan now calls executor.drain(timeout=SHUTDOWN_DRAIN_TIMEOUT_SECONDS), which returns the number of tasks that were pending at drain entry for logging purposes. Tasks still running after the timeout are left for stale-reclamation, same semantics as before. The asyncio import drops out of main.py.
Processor._format_name was a leading-underscore staticmethod with three external callers — ProcessorRegistry.lookup_for, and the per-module _source_suffix helpers in bam.py and tabix.py — which violates the underscore's meaning as a class-private marker. Promote the helper to a module-level free function format_name in processors/_tools.py (alongside the other shared subprocess / cache-staging helpers) and update all four call sites. The function is a pure dict reader with no self or cls state, so it belongs at module scope rather than hanging off the ABC.
Two methods in TestExtractIdentity were typed without the underscore between test and extract — testextract_identity_should_... Pytest still collected them because "test" is a matching prefix, but the test-naming convention test_<method>_should_<outcome>[_when_<cond>] requires <method> to mirror the callable under test. Fix the name so the target function (extract_identity) is readable from the test identifier alone.
The project test guide mandates that integration tests live in a
dedicated tests/integration/ package. Three @pytest.mark.integration
classes remained under tests/test_workflows/ — a layout violation
now corrected:
- TestWoolExecutorPickleBoundary from test_executor.py →
tests/integration/test_executor_boundary.py, carrying its stub
processor and the cloudpickle-by-value registration.
- TestBamIndexProcessorIntegration from test_processors_bam.py →
tests/integration/test_direct_processors.py as
TestBamIndexProcessorDirectCall.
- TestTabixIntervalProcessorIntegration (BED + VCF) from
test_processors_tabix.py → the same new file as
TestTabixIntervalProcessorDirectCall.
The direct-call classes are kept rather than deleted even though
tests/integration/test_processor_e2e.py covers the same formats
end-to-end through Wool: the direct-call variant skips the Wool
boundary and serves as a diagnostic layer — when an e2e test fails
you can still run the direct variant to isolate whether the
regression is in the processor pipeline itself or in the Wool /
cache / executor scaffolding around it.
test_executor.py drops its cloudpickle import, the by-value
registration, and the stub processor dependency now that they only
serve the moved class.
Both concurrency tests were named after the scenario (concurrent data requests, concurrent data and index) rather than the callable under test, missing the test_<method>_should_<outcome>[_when_<cond>] segment. Rename to test_stream_file_should_... and test_stream_file_and_stream_index_file_should_... so pytest output identifies which handler each test drives.
stream_cache_entry is a new public helper in routers/_cache_stream.py shared by /data and /index. It had no direct unit coverage — every test reached it only through a router. Add tests asserting the five response shapes it can produce: 200 GET full body, 200 HEAD no body, 206 with a Range slice, 416 for an out-of-bounds Range, and 400 for a malformed Range header. Each assertion hits the helper directly so future regressions surface here first without depending on router integration.
The style guide forbids function-local / method-local / conditional imports unless breaking a circular import with an explanatory comment. New and touched test files accumulated ~25 such imports inside test bodies — a holdover from the implementation round when processors and registries were being wired up incrementally. Hoist every test-body import to the module's import block across test_data.py, test_index.py, test_workflows/test_executor.py, test_workflows/test_processors_bam.py, test_workflows/test_processors_tabix.py, integration/test_processor_e2e.py, and integration/test_router_e2e.py. Also replace "from pathlib import Path as _P" aliases with the top-level Path import that now exists in each file.
Bundled low-risk improvements from the review's non-blocking list: - lock.py: swap Tuple from typing for the lowercase builtin tuple to match the rest of the new workflow code (from __future__ import annotations is already present). - tests/integration/conftest.py: the samples fixture was calling generate_all a second time after sample_data_root had already built every file, paying samtools/gzip/bedToBigBed costs twice. Restructure so a new session-scoped _session_samples fixture runs generate_all exactly once, and sample_data_root and samples both read from it. - tests/integration/conftest.py: drop the fixed 3-second sleep in wool_pool. The executor's _dispatch_with_retry already tolerates the LocalDiscovery startup window, so the sleep was pure overhead on every test. Integration suite runtime drops from ~60s to ~10s. - fetcher.py: offload file writes with asyncio.to_thread so the download loop doesn't block the event loop on large artifacts. Matters when the executor runs in-process (tests, no WorkerPool); harmless when dispatched to a Wool worker with its own loop.
The ECS Fargate worker fleet needs boto3 at runtime for the S3 cache backend, the ECS provisioner, and the worker discovery loop. moto is added as a dev-only dependency so the unit tests can exercise the boto3 code paths without standing up LocalStack or hitting real AWS.
14 tasks
8ea9145 to
906c063
Compare
S3Cache is a CacheBackend implementation over boto3 with the same range-aware semantics as LocalFsCache (head_object, get_object with a Range header, upload_file, delete_object). Production points it at real S3; LocalStack-backed dev points it at the LocalStack endpoint via AWS_ENDPOINT_URL — only the endpoint differs. The backend supports an optional key prefix for sharing a single bucket across environments, rejects path-traversal segments, and yields an empty iterator on missing objects so router code can treat cache misses uniformly across backends.
EcsProvisioner is a thin boto3 RunTask wrapper that launches an ephemeral worker container per workflow, with awsvpc network configuration, concurrent-call dedup keyed on the workflow mutex key (so a burst of ensure_workflow calls on the same source doesn't fan out into multiple tasks), and a semaphore guarding the ~20 req/s RunTask rate limit. CapacityException covers both ClientError- and failures[].reason-shaped capacity / ENI errors so the executor can retry rather than hang. EcsDiscovery is a Wool DiscoveryLike poll-and-diff over list_tasks + describe_tasks: it filters on healthStatus HEALTHY, extracts the awsvpc IP, and emits worker-added / worker-dropped events to non- blocking subscribers via per-subscriber asyncio.Queue. State replay on subscribe means subscribers attached after startup observe the existing healthy fleet.
worker_main.py is the entrypoint baked into the worker container image. It starts a wool.LocalWorker so the API can dispatch routines to the task, exposes a tiny aiohttp /health endpoint that returns 503 during drain — so ECS marks the task unhealthy before stop_task kills the gRPC port — and installs SIGTERM/SIGINT handlers so a stop_task issued by the API or the Fargate scheduler shuts down cleanly. The idle-shutdown timeout is configurable so tasks self-terminate when their workflow completes. The entrypoint deliberately does not register itself with discovery; EcsDiscovery polls ECS directly and surfaces healthy tasks to the pool, so the worker only needs to be listening and HEALTHY.
heartbeat_workflow bumps updated_at on an active job record so a multi-hour preprocessing run (samtools sort on a multi-GB BAM, tabix on a large text-interval file) doesn't trip the stale-reclaim predicate in claim_workflow. The update is fenced on an active status so a heartbeat against a terminal or stale-reclaimed record is a silent no-op — the executor uses the False return as a stop signal. STALE_WORKFLOW_THRESHOLD drops from 1 h to 15 min: with the heartbeat refreshing updated_at every ~5 min while a job is in flight, a 15-min threshold catches actually-dead workers usefully fast without the risk of false-positive reclaiming a long sort that's still making progress. DEFAULT_HEARTBEAT_INTERVAL_SECONDS exposes the cadence so the executor and tests share one knob.
ensure_workflow now calls EcsProvisioner.request on a fresh claim so the worker container is launched in parallel with the dispatch loop; CapacityException is surfaced as a terminal FAILED job with a retryable error string rather than hanging. _NO_WORKERS_RETRY_ATTEMPTS widens from 5 to 60 (~60s window) to cover the Fargate cold-start budget — booting a worker, pulling the SOCI-indexed image, binding gRPC, and surfacing as HEALTHY in EcsDiscovery's next poll. A heartbeat sidecar task bumps updated_at every heartbeat_interval seconds while the workflow body runs and is cancelled in finally before the terminal release, so the next tick can't observe an active record while the executor is flipping status. DEFAULT_WORKFLOW_ DURATION_CAP_SECONDS rises from 20 min to 4 h to match the multi-hour runtime profile; operators may bump it further via env. The JobExecutor docstring drops the LambdaExecutor mention since the production path is the Wool worker pool fed by EcsDiscovery.
The API gains the env vars needed to drive the new runtime: AWS_ ENDPOINT_URL and AWS_REGION (consumed by every boto3 client we build, set to a LocalStack address in dev), WORKFLOW_S3_BUCKET / PREFIX (when set, the lifespan picks S3Cache over LocalFsCache), ECS_CLUSTER, ECS_WORKER_TASK_DEFINITION, ECS_WORKER_SUBNETS, ECS_WORKER_SECURITY_GROUPS, ECS_WORKER_ASSIGN_PUBLIC_IP (when all required vars are present, _maybe_build_provisioner returns an EcsProvisioner so dispatch can launch ephemeral worker tasks), and WORKFLOW_DURATION_CAP_SECONDS for operators bumping the multi-hour runtime cap. The bare PoC profile sets none of these and continues to use the local filesystem cache plus the in-process WorkerPool. Production sets all of them and the lifespan transparently switches to the ECS-backed substrate.
906c063 to
89f4685
Compare
c8585f1 to
b92c47a
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Add the runtime application substrate the workflow subsystem needs to run on the ECS Fargate fleet outlined in #32: an
EcsProvisionerthat launches ephemeral worker containers per workflow viaRunTask, anEcsDiscoverypoll-and-diff loop overListTasks+DescribeTasksthat surfaces healthy tasks to Wool's worker pool, anS3Cachebackend that mirrorsLocalFsCachesemantics over boto3, a worker-container entrypoint, and a workflow heartbeat that lets multi-hour preprocessing runs survive the stale-reclaim predicate. The same boto3-backed code targets real AWS in production and LocalStack in dev — onlyAWS_ENDPOINT_URLdiffers.Two
WoolExecutordefaults move with the multi-hour profile:DEFAULT_WORKFLOW_DURATION_CAP_SECONDSrises from 20 min to 4 h (env-driven), and_NO_WORKERS_RETRY_ATTEMPTSwidens from 5 to 60 to cover the Fargate cold-start budget.STALE_WORKFLOW_THRESHOLDdrops from 1 h to 15 min — paired with a 5-min heartbeat sidecar in the executor, that catches actually-dead workers fast without false-positive reclaiming legitimate long sorts.Scope intentionally narrows to the runtime application code so it lands self-contained with full unit coverage. Defer for follow-up PRs once the runtime classes have integration coverage and a deploy pipeline exists: LocalStack
docker-compose.yml, CloudFormation worker task-def updates, multi-stage worker Dockerfile + SOCI index, README documentation, and LocalStack-backed integration tests.Closes #33
Proposed changes
S3 cache backend
Add
S3Cache(src/cfdb/workflows/cache.py) as aCacheBackendover boto3 withhead_object/get_object(withRange) /upload_file/delete_object. Support an optional key prefix for sharing a single bucket across environments, reject path-traversal segments, and yield an empty iterator on missing objects to matchLocalFsCachesemantics. Centralize client construction inbuild_s3_clientso production, LocalStack-backed dev, and unit tests all produce the same shape of client — only the endpoint differs.ECS provisioner and worker discovery
Add
EcsProvisioner(src/cfdb/workflows/provisioner.py) wrappingRunTaskwith awsvpc network configuration. Dedupe concurrent calls keyed on the workflow mutex key so a burst ofensure_workflowcalls on the same source does not fan out into multiple tasks, and guard the ~20 req/sRunTaskrate limit with a semaphore.CapacityExceptioncovers bothClientError- andfailures[].reason-shaped capacity / ENI errors so the executor can surface them as retryable rather than hanging.Add
EcsDiscovery(src/cfdb/workflows/discovery.py) implementing Wool'sDiscoveryLikeprotocol with a poll-and-diff loop overlist_tasks+describe_tasks. Filter onhealthStatus: HEALTHY, extract the awsvpc IP, and emitworker-added/worker-droppedevents to non-blocking subscribers via per-subscriberasyncio.Queue. Replay state on subscribe so subscribers attached after startup observe the existing healthy fleet.Worker container entrypoint
Add
worker_main.py(src/cfdb/workflows/worker_main.py) startingwool.LocalWorker, exposing a tiny aiohttp/healthendpoint that returns 503 during drain so ECS marks the task unhealthy beforestop_taskkills gRPC, installing SIGTERM/SIGINT handlers, and self-terminating after a configurable idle timeout. Deliberately omit discovery registration —EcsDiscoverypolls ECS directly, so the worker only needs to bind its gRPC port and stay HEALTHY.Workflow heartbeat
Add
heartbeat_workflow(src/cfdb/workflows/lock.py) bumpingupdated_aton an active job record so a multi-hour run does not tripclaim_workflow's stale-reclaim predicate. Fence the update on an active status so a heartbeat against a terminal or stale-reclaimed record is a silent no-op — the executor uses theFalsereturn as a stop signal. ShortenSTALE_WORKFLOW_THRESHOLDfrom 1 h to 15 min and exposeDEFAULT_HEARTBEAT_INTERVAL_SECONDSas the cadence shared between executor and tests.Executor integration
Extend
WoolExecutorwithprovisionerandheartbeat_interval_secondsctor args. Invoke the provisioner fromensure_workflowon a fresh claim and surfaceCapacityExceptionas a terminalFAILEDjob with a retryable error string. Spawn a heartbeat sidecar task that bumpsupdated_atwhile the workflow body runs and cancel it infinallybefore the terminal release so the next tick cannot observe an active record while status is flipping. RaiseDEFAULT_WORKFLOW_DURATION_CAP_SECONDSfrom 1200 to 4 h and widen_NO_WORKERS_RETRY_ATTEMPTSfrom 5 to 60 (~60 s window) for Fargate cold-start. Drop theLambdaExecutormention from theJobExecutorABC docstring.API lifespan wiring
Add env vars to
src/cfdb/api/__init__.py:AWS_ENDPOINT_URL,AWS_REGION,WORKFLOW_S3_BUCKET/PREFIX,ECS_CLUSTER,ECS_WORKER_TASK_DEFINITION,ECS_WORKER_SUBNETS,ECS_WORKER_SECURITY_GROUPS,ECS_WORKER_ASSIGN_PUBLIC_IP,WORKFLOW_DURATION_CAP_SECONDS. Have the lifespan pickS3CacheoverLocalFsCachewhenWORKFLOW_S3_BUCKETis set and build anEcsProvisionerwhen ECS env config is present (helper_maybe_build_provisioner). Leave the bare PoC profile unchanged: with none of the new env set, the API continues to use the local filesystem cache plus the in-processWorkerPool.Build dependencies
Add
boto3to runtime deps andmoto[ecs,s3]to dev deps so the unit tests can exercise boto3 paths without LocalStack or real AWS.Test cases
TestS3CacheS3Cacheconstructed with empty bucketValueErroris raisedTestS3Cacheheadis awaitedNoneis returnedTestS3Cacheputheadis awaited for the same keyTestS3Cachegetis iterated without a byte rangeTestS3Cachegetis iterated with an inclusive byte rangeTestS3Cachegetis iteratedTestS3Cachedeleteis awaitedTrueis returned and the object is goneTestS3Cachedeleteis awaitedFalseis returnedTestS3Cache..segmentputis awaitedValueErroris raisedTestS3CacheS3Cacheconstructed with a key prefixTestEcsProvisionerValueErroris raisedTestEcsProvisionerValueErroris raisedTestEcsProvisionerValueErroris raisedTestEcsProvisionerrequestis awaited onceRunTaskis called once with the expected payloadTestEcsProvisionerrequestcalls await in parallelRunTaskis invoked once and all callers receive the same ARNTestEcsProvisionerrequestcalls await in parallelRunTaskis invoked once per keyTestEcsProvisionerClientErrorfor capacityrequestis awaitedCapacityExceptionis raisedTestEcsProvisionerRunTaskresponse with capacity infailures[]requestis awaitedCapacityExceptionis raisedTestEcsDiscoveryValueErroris raisedTestEcsDiscoveryValueErroris raisedTestEcsDiscovery_poll_onceis awaitedworker-addedevents are emitted with the expected addressesTestEcsDiscovery_poll_onceis awaitedTestEcsDiscovery_poll_onceis awaited againworker-droppedis emitted for the gone workerTestEcsDiscovery_poll_onceis awaited twiceTestEcsDiscoveryaddeventTestParseArgs_parse_argsruns without argsTestParseArgs_parse_argsruns without argsTestParseArgs_parse_argsruns with the flagsTestHeartbeatWorkflowupdated_athas been backdatedheartbeat_workflowis awaitedTrueis returned andupdated_atadvancesTestHeartbeatWorkflowCOMPLETEDheartbeat_workflowis awaitedFalseis returnedTestStaleWorkflowThresholdSTALE_WORKFLOW_THRESHOLDconstantTestWoolExecutorWithProvisionerensure_workflowis awaited for a fresh keyTestWoolExecutorWithProvisionerCapacityExceptionensure_workflowis awaitedFAILEDwith a "capacity"-prefixed errorTestWoolExecutorHeartbeatensure_workflowis awaited and the test polls during the runupdated_atadvances past the original claim timestamp