Skip to content

Deploy workflow workers as an ephemeral ECS Fargate fleet — Closes #33#34

Draft
conradbzura wants to merge 41 commits into30-add-preprocessing-indexing-workflowfrom
33-ecs-fargate-discovery-deployment
Draft

Deploy workflow workers as an ephemeral ECS Fargate fleet — Closes #33#34
conradbzura wants to merge 41 commits into30-add-preprocessing-indexing-workflowfrom
33-ecs-fargate-discovery-deployment

Conversation

@conradbzura
Copy link
Copy Markdown
Collaborator

Summary

Add the runtime application substrate the workflow subsystem needs to run on the ECS Fargate fleet outlined in #32: an EcsProvisioner that launches ephemeral worker containers per workflow via RunTask, an EcsDiscovery poll-and-diff loop over ListTasks + DescribeTasks that surfaces healthy tasks to Wool's worker pool, an S3Cache backend that mirrors LocalFsCache semantics over boto3, a worker-container entrypoint, and a workflow heartbeat that lets multi-hour preprocessing runs survive the stale-reclaim predicate. The same boto3-backed code targets real AWS in production and LocalStack in dev — only AWS_ENDPOINT_URL differs.

Two WoolExecutor defaults move with the multi-hour profile: DEFAULT_WORKFLOW_DURATION_CAP_SECONDS rises from 20 min to 4 h (env-driven), and _NO_WORKERS_RETRY_ATTEMPTS widens from 5 to 60 to cover the Fargate cold-start budget. STALE_WORKFLOW_THRESHOLD drops from 1 h to 15 min — paired with a 5-min heartbeat sidecar in the executor, that catches actually-dead workers fast without false-positive reclaiming legitimate long sorts.

Scope intentionally narrows to the runtime application code so it lands self-contained with full unit coverage. Defer for follow-up PRs once the runtime classes have integration coverage and a deploy pipeline exists: LocalStack docker-compose.yml, CloudFormation worker task-def updates, multi-stage worker Dockerfile + SOCI index, README documentation, and LocalStack-backed integration tests.

Closes #33

Proposed changes

S3 cache backend

Add S3Cache (src/cfdb/workflows/cache.py) as a CacheBackend over boto3 with head_object / get_object (with Range) / upload_file / delete_object. Support an optional key prefix for sharing a single bucket across environments, reject path-traversal segments, and yield an empty iterator on missing objects to match LocalFsCache semantics. Centralize client construction in build_s3_client so production, LocalStack-backed dev, and unit tests all produce the same shape of client — only the endpoint differs.

ECS provisioner and worker discovery

Add EcsProvisioner (src/cfdb/workflows/provisioner.py) wrapping RunTask with awsvpc network configuration. Dedupe concurrent calls keyed on the workflow mutex key so a burst of ensure_workflow calls on the same source does not fan out into multiple tasks, and guard the ~20 req/s RunTask rate limit with a semaphore. CapacityException covers both ClientError- and failures[].reason-shaped capacity / ENI errors so the executor can surface them as retryable rather than hanging.

Add EcsDiscovery (src/cfdb/workflows/discovery.py) implementing Wool's DiscoveryLike protocol with a poll-and-diff loop over list_tasks + describe_tasks. Filter on healthStatus: HEALTHY, extract the awsvpc IP, and emit worker-added / worker-dropped events to non-blocking subscribers via per-subscriber asyncio.Queue. Replay state on subscribe so subscribers attached after startup observe the existing healthy fleet.

Worker container entrypoint

Add worker_main.py (src/cfdb/workflows/worker_main.py) starting wool.LocalWorker, exposing a tiny aiohttp /health endpoint that returns 503 during drain so ECS marks the task unhealthy before stop_task kills gRPC, installing SIGTERM/SIGINT handlers, and self-terminating after a configurable idle timeout. Deliberately omit discovery registration — EcsDiscovery polls ECS directly, so the worker only needs to bind its gRPC port and stay HEALTHY.

Workflow heartbeat

Add heartbeat_workflow (src/cfdb/workflows/lock.py) bumping updated_at on an active job record so a multi-hour run does not trip claim_workflow's stale-reclaim predicate. Fence the update on an active status so a heartbeat against a terminal or stale-reclaimed record is a silent no-op — the executor uses the False return as a stop signal. Shorten STALE_WORKFLOW_THRESHOLD from 1 h to 15 min and expose DEFAULT_HEARTBEAT_INTERVAL_SECONDS as the cadence shared between executor and tests.

Executor integration

Extend WoolExecutor with provisioner and heartbeat_interval_seconds ctor args. Invoke the provisioner from ensure_workflow on a fresh claim and surface CapacityException as a terminal FAILED job with a retryable error string. Spawn a heartbeat sidecar task that bumps updated_at while the workflow body runs and cancel it in finally before the terminal release so the next tick cannot observe an active record while status is flipping. Raise DEFAULT_WORKFLOW_DURATION_CAP_SECONDS from 1200 to 4 h and widen _NO_WORKERS_RETRY_ATTEMPTS from 5 to 60 (~60 s window) for Fargate cold-start. Drop the LambdaExecutor mention from the JobExecutor ABC docstring.

API lifespan wiring

Add env vars to src/cfdb/api/__init__.py: AWS_ENDPOINT_URL, AWS_REGION, WORKFLOW_S3_BUCKET / PREFIX, ECS_CLUSTER, ECS_WORKER_TASK_DEFINITION, ECS_WORKER_SUBNETS, ECS_WORKER_SECURITY_GROUPS, ECS_WORKER_ASSIGN_PUBLIC_IP, WORKFLOW_DURATION_CAP_SECONDS. Have the lifespan pick S3Cache over LocalFsCache when WORKFLOW_S3_BUCKET is set and build an EcsProvisioner when ECS env config is present (helper _maybe_build_provisioner). Leave the bare PoC profile unchanged: with none of the new env set, the API continues to use the local filesystem cache plus the in-process WorkerPool.

Build dependencies

Add boto3 to runtime deps and moto[ecs,s3] to dev deps so the unit tests can exercise boto3 paths without LocalStack or real AWS.

Test cases

# Test Suite Given When Then Coverage Target
1 TestS3Cache An S3Cache constructed with empty bucket The constructor runs ValueError is raised Constructor input validation
2 TestS3Cache A bucket containing no object for the key head is awaited None is returned Cache-miss head
3 TestS3Cache A small payload uploaded via put head is awaited for the same key The reported size matches the payload Round-trip put/head
4 TestS3Cache A cached object get is iterated without a byte range The full payload is yielded Full-object streaming
5 TestS3Cache A cached object get is iterated with an inclusive byte range Only the slice is yielded Range-aware streaming
6 TestS3Cache A bucket with no object for the key get is iterated An empty stream is produced Cache-miss get
7 TestS3Cache A cached object delete is awaited True is returned and the object is gone Existing-key deletion
8 TestS3Cache A bucket with no object for the key delete is awaited False is returned Absent-key deletion
9 TestS3Cache A key containing a .. segment put is awaited ValueError is raised Path-traversal rejection
10 TestS3Cache An S3Cache constructed with a key prefix Round-trip put/head is awaited Object lands under the prefix Prefix application
11 TestEcsProvisioner A provisioner constructed without a cluster The constructor runs ValueError is raised Constructor input validation
12 TestEcsProvisioner A provisioner constructed without a task definition The constructor runs ValueError is raised Constructor input validation
13 TestEcsProvisioner A provisioner constructed without subnets The constructor runs ValueError is raised Constructor input validation
14 TestEcsProvisioner A configured provisioner request is awaited once RunTask is called once with the expected payload Single-caller dispatch
15 TestEcsProvisioner A provisioner with concurrent calls sharing a dedup key Many request calls await in parallel RunTask is invoked once and all callers receive the same ARN In-flight dedup
16 TestEcsProvisioner A provisioner with concurrent calls under distinct dedup keys Many request calls await in parallel RunTask is invoked once per key Dedup boundary
17 TestEcsProvisioner A boto3 client raising ClientError for capacity request is awaited CapacityException is raised Capacity error path
18 TestEcsProvisioner A RunTask response with capacity in failures[] request is awaited CapacityException is raised Failure-payload capacity path
19 TestEcsDiscovery A discovery constructed without a cluster The constructor runs ValueError is raised Constructor input validation
20 TestEcsDiscovery A discovery constructed without a task family The constructor runs ValueError is raised Constructor input validation
21 TestEcsDiscovery An ECS client returning healthy tasks _poll_once is awaited worker-added events are emitted with the expected addresses Initial healthy fleet
22 TestEcsDiscovery An ECS client returning a non-healthy task _poll_once is awaited The unhealthy task is filtered out Health filter
23 TestEcsDiscovery A previously seen worker absent on a later poll _poll_once is awaited again worker-dropped is emitted for the gone worker Drop diff
24 TestEcsDiscovery A steady-state ECS client _poll_once is awaited twice No duplicate events are emitted Idempotent steady state
25 TestEcsDiscovery A subscriber with a filter rejecting a worker The discovery emits an add event The filtered subscriber receives nothing Subscriber filter
26 TestParseArgs A clean environment _parse_args runs without args Documented defaults are returned Default arg parsing
27 TestParseArgs Worker env vars set _parse_args runs without args Env values override defaults Env override
28 TestParseArgs Worker env vars set and CLI flags passed _parse_args runs with the flags CLI values override env CLI precedence
29 TestHeartbeatWorkflow An active job whose updated_at has been backdated heartbeat_workflow is awaited True is returned and updated_at advances Heartbeat happy path
30 TestHeartbeatWorkflow A job already released to COMPLETED heartbeat_workflow is awaited False is returned Heartbeat stop signal
31 TestStaleWorkflowThreshold The current STALE_WORKFLOW_THRESHOLD constant Its value is read It equals 15 minutes Threshold guard
32 TestWoolExecutorWithProvisioner An executor with a recording provisioner ensure_workflow is awaited for a fresh key The provisioner observes one request keyed by the workflow mutex key Provisioner dispatch
33 TestWoolExecutorWithProvisioner A provisioner that always raises CapacityException ensure_workflow is awaited The job is FAILED with a "capacity"-prefixed error Capacity-failure handling
34 TestWoolExecutorHeartbeat An executor with a sub-second heartbeat cadence and a blocking processor ensure_workflow is awaited and the test polls during the run updated_at advances past the original claim timestamp In-flight heartbeat

Introduce the foundational types for the preprocessing/indexing
subsystem that serves Gosling Designer's client-side fetchers:

  - JobRecord, JobStatus, and ArtifactKind model the persistent
    workflow state that lives in the Mongo jobs collection.
  - cache_key and workflow_key are pure functions that derive
    content-addressed keys from upstream md5 plus pipeline/processor
    versions. A byte change upstream with a refreshed md5 naturally
    invalidates cached artifacts, and a version bump forces fresh
    processing without any manual purge.
  - Processor is the ABC every preprocessing pipeline subclasses. A
    ProcessorRegistry resolves a file document to the first matching
    processor by format name. PassthroughProcessor covers formats
    Gosling can consume directly (CSV, TSV, bigWig).

No cache backend, executor, or HTTP surface is wired up yet — those
land in follow-on commits that depend on these types.
CacheBackend is a byte-range-aware pluggable store for workflow
artifacts. LocalFsCache writes keys as relative paths under a
configured root, puts via os.replace for atomicity, and streams reads
in 64 KiB chunks with inclusive-range support — Gosling's BAM, tabix,
and bbi fetchers all issue Range requests that must pass through to
the cache transparently. Key validation rejects path traversal so
malformed input cannot escape the cache root.

The workflow mutex uses a Mongo partial unique index on workflow_key
filtered to active statuses, mirroring the atomic-upsert idiom already
used by services.locks for the sync and cutover locks. claim_workflow
either inserts a fresh PENDING record or — on DuplicateKeyError —
attaches to the existing active job so concurrent /data and /index
requests converge on one workflow per source file. A stale-threshold
reclamation path covers the case where a worker crashes without
releasing the mutex.
JobExecutor is the abstract interface bridging /data and /index to
the processing pipeline. WoolExecutor is the concrete backend:
ensure_workflow claims or attaches to the per-source mutex, and on a
fresh claim fires a background task that dispatches the processor
through a Wool WorkerPool. Long-running jobs are bounded with
asyncio.wait_for at the call site (Wool's own @routine timeout is
dispatch-only). A short retry loop tolerates the window between
WorkerPool startup and discovery catching up.

Two concrete processors land alongside:

  - BamIndexProcessor: SAM->BAM conversion (when needed) plus
    samtools sort and samtools index. Emits the sorted BAM and its
    BAI.
  - TabixIntervalProcessor: covers VCF, GFF, BED, BroadPeak,
    NarrowPeak with decompress + sort + bgzip + tabix; GTF adds a
    gffread conversion to GFF3 first; bigBed is converted via
    bigBedToBed before the BED pipeline.

Both processors consult the cache per artifact kind before running
the corresponding stage, giving partial-commit recovery for free: a
stage-2 failure leaves the stage-1 artifact in cache, and the next
retry skips the expensive stage.

fetcher.download_source centralizes DRS/HTTPS source retrieval so
processors can focus on the tool-invocation pipeline. An integration
marker is registered for tests that shell out to real samtools or
htslib, gated off by default.
Both /data and /index now converge on ensure_workflow when a file's
processor requires preprocessing and the requested artifact is not
yet in cache. The router returns 202 with Location: /jobs/{job_id}
and Retry-After so clients can poll. On cache hit the artifact
streams directly with byte-range support. Passthrough formats (CSV,
TSV, bigWig) fall through to the existing direct-streaming path
unchanged.

/index preserves the pre-existing 4DN sidecar fast path — files
whose extra.extra_files or extra.fourdn.extra_files carries a tbi or
beddb entry continue to serve from upstream without entering the
workflow. Files with no sidecar and no processor emitting an index
artifact return 404.

/jobs/{job_id} is a new endpoint exposing the persisted JobRecord:
status, stages_done, artifact cache keys, optional progress, and
error on failure.

The FastAPI lifespan brings up a Wool WorkerPool, a LocalFsCache
under SYNC_DATA_DIR, and the processor registry (passthrough plus
BAM and tabix). When SYNC_DATA_DIR is unset the subsystem stays
disabled and routers fall back to direct streaming — keeps minimal
test deployments and older configurations working unchanged.
The api container now installs samtools, tabix, bcftools, gffread,
and UCSC bigBedToBed so the Wool workers spawned by the API process
can shell out to the preprocessing tools without any separate image.
apt-shipped packages cover everything but bigBedToBed, which is
pulled as a UCSC release binary.

create-indexes.js adds three indexes to the new jobs collection: a
partial unique index on workflow_key (filtered to active statuses)
that backs the per-source mutex, a unique index on job_id for /jobs
lookups, and a compound status+updated_at index that the stale-lock
reclamation path scans.
HEAD probes on /data and /index no longer trigger preprocessing. The
previous behavior bound HEAD and GET to the same handler with no
method check, so monitoring probes or prefetch tools claiming to be
side-effect-free were actually dispatching samtools and tabix jobs
via ensure_workflow. Cache-miss now returns 404 on HEAD; clients must
issue an explicit GET to trigger processing.

Alongside the HEAD fix, a ?raw query parameter lets clients opt out of
the preprocessing pipeline entirely. With raw=false (the default) the
router returns the visualization-ready artifact (cache hit or a 202
with a Location header for GET). With raw=true the router serves the
upstream bytes directly for /data, and restricts /index to upstream
sidecars only (404 when none exists). Passthrough formats (CSV, TSV,
bigWig) behave the same regardless of raw since there is no processed
output distinct from the upstream file.

The default is raw=false so Gosling and CVH clients get preprocessed
artifacts without needing to know about the flag.
…eaders

The VCF sort path ran GNU sort over the whole file, which scrambled
the ## header block into the data block and produced artifacts tabix
could not index. Fix is a two-pass pipeline for VCF: grep '^#' emits
the header block, then grep -v '^#' pipes the data block through
sort, and the brace group joins both streams into bgzip. Headers end
up at the top of the output as tabix requires.

Neither GNU sort nor samtools sort had a memory cap, so on Lambda
instances sized 512 MB - 2 GB either could OOM the worker on large
inputs. Add CFDB_SORT_MEMORY_CAP and CFDB_SAMTOOLS_MEMORY_CAP env
vars (both default 256M) and plumb them into sort -S / samtools
sort -m with -T pointed at the per-job workdir so spillage lands on
disk instead of in memory. LC_ALL=C on sort makes collation
byte-wise and independent of the worker's locale.

The format-specific pipelines are now single streaming shell pipes
rather than separate decompress / sort / bgzip invocations writing
intermediate files. Two intermediates remain where the tool can't
stream input: VCF (two passes for header/body split) and GTF (gffread
reads files). Other formats flow zcat -f or bigBedToBed straight into
sort and bgzip without materializing intermediate text.

GFF3 was silently bypassed because _TABIX_PRESET had only GFF, not
GFF3, while the ontology mapper emits GFF3 for .gff3 files. Add the
GFF3 key pointing at the gff preset so .gff3 inputs flow through the
same pipeline.

Tests exercise each per-format pipeline shape (grep-split VCF, gffread
GTF, bigBedToBed bigBed, generic zcat-f text), assert the -S/-T/-m
flags and LC_ALL=C prefix are present, and confirm the tabix preset
matches the format. Integration tests add an end-to-end VCF run that
verifies the output header block survives and data lines are
position-sorted.

Fixes R2-B2, R2-B3, and R2 non-blocking F (locale-sensitive sort).
release_workflow and record_stage_complete updated the jobs document
by job_id alone, with no filter on current status. A worker that
stalled past the one-hour stale threshold would get its record
flipped to FAILED by a subsequent claim_workflow call, but when the
stalled worker eventually woke and called its own release, the
update would overwrite the successor-owned status and leak stage
completions onto a record the new claimant now holds.

Both writes now filter on status in {pending, running}. A late call
from a reclaimed worker matches zero docs and becomes a no-op; a
warning is logged so stomping attempts remain visible in operations.
mark_running was already correctly fenced on status=pending and is
unchanged.
The /data and /index routers reached across package boundaries to
import workflows.executor._extract_identity via in-function imports,
and the test suite referenced the private name directly — both
symptoms of a helper that was always public in practice despite its
leading underscore. Drop the underscore, move the import to module
top in both routers, and update the test file accordingly.

Resolves the test-guide violation (tests MUST NOT reference private
symbols) and the leaky-abstraction smell in the router imports in
one go. The function's behavior, signature, and docstring are
unchanged.
The stale-reclaim test used lock._utcnow() to build a backdated
timestamp. Per the test guide, tests MUST NOT reference private
functions. The equivalent datetime.now(timezone.utc) expression is
already available from stdlib; use it directly.
The three original tests in test_index.py dropped the <method_name>
prefix required by the test guide's naming pattern
test_<method_name>_should_<outcome>[_when_<condition>]. Prefix them
all with test_stream_index_file_should_... so the target method is
clear from the test name alone and pytest output reads consistently
with the rest of the suite.
download_source is a public function but was only ever exercised
indirectly via mocks in the BAM and tabix processor tests — the
missing-access_url, direct-HTTPS, and DRS-resolution code paths were
all effectively untested.

New test module covers: ValueError on missing access_url, HTTPS
passthrough with assertion that fetch_drs_object is not called,
drs:// URIs resolving through fetch_drs_object + get_https_download_url
before streaming, and parent-directory creation for nested dest paths.
artifact_kind_str was a one-line wrapper exposing ArtifactKind.value
with zero callers anywhere in the codebase. Drop the function and
its now-unused ArtifactKind import. Callers that need the string
form can call .value directly on the enum member.
Both routers had a near-byte-identical _stream_cache_entry function
handling Range parsing, HEAD-vs-GET branching, and the Content-Range
header — so any future change to range behavior needed to land in
lockstep across both files. Move the helper into a new module,
routers._cache_stream, and import it from both callers.

The helper now accepts a media_type kwarg (defaulting to
application/octet-stream) so callers can override if they want to
signal a specific type for cached artifacts. No behavior change.
identity_dcc, shell_quote, run_argv, run_shell, and copy_from_cache
were byte-identical duplicates between the BAM and tabix processor
modules. Move them into workflows.processors._tools and re-export in
each processor under their existing local alias so test mocks that
patch the module-level names keep working without modification.

No behavior change. The processor modules now focus on pipeline shape
and let the shared subprocess / cache-staging machinery live in one
place.
Four related shutdown and mid-run hygiene issues are resolved
together in the executor's _run_workflow loop and the FastAPI
lifespan:

1. record_stage_complete failures used to leak the mutex — the
   release_workflow call was unreachable when any stage commit
   raised. The loop is now wrapped in try/finally so release always
   fires with the appropriate terminal status.

2. asyncio.CancelledError bypassed except Exception, leaving
   cancelled jobs stuck in RUNNING until the 1h stale threshold. A
   dedicated except branch now marks the job FAILED with a cancel
   reason before re-raising so cancellation still propagates.

3. The per-job workdir under SYNC_DATA_DIR/jobs/{id}/ was never
   cleaned up, slowly filling the Lambda /tmp partition. shutil.rmtree
   in the finally block (via asyncio.to_thread) removes it on every
   exit path.

4. The FastAPI lifespan used to close the WorkerPool while fire-and-
   forget tasks were still in flight, so those tasks would dispatch
   into a closed pool and try to release against a closed Mongo
   client. The lifespan now drains pending workflow tasks with a
   10-second cap before exiting the pool context.

Tests add coverage for workdir cleanup on success and for the
release-still-fires path when record_stage_complete raises.
The processors reached for identity_dcc from _tools and separately
pulled local_id and md5 from file_meta dict access. extract_identity
(made public in an earlier commit) already returns the full
(dcc, local_id, md5) triple over the same preference chain. Drop
identity_dcc from _tools, call extract_identity in BamIndexProcessor
and TabixIntervalProcessor, and destructure the triple. One helper,
one call site, one contract.
The "# --- Workflow subsystem ---" divider is a section banner; the
style guide forbids them because they duplicate what member naming
and ordering should carry. The substantive comment immediately below
(explaining the SYNC_DATA_DIR=unset fallback) stays.
…MING_CASE

The style guide reserves SCREAMING_CASE for values whose binding AND
referent never change. The previous list comprehension was SCREAMING-
cased but its referent was a mutable list, so callers could in
principle mutate it. Switching to a tuple makes the value immutable
and the name legitimate. The $in Mongo operand accepts either type,
so the downstream call sites are unchanged.
The style guide mandates public method ordering static, class, instance
within a visibility tier. Processor's public block had instance
(needs_processing) then classmethod (artifact_kinds_produced) then
abstractmethod instance (run). Move artifact_kinds_produced above
needs_processing so the public block reads class then instance then
instance as required.
The bare "from cfdb.workflows.processors import _tools" plus the
"_ = _tools" discard existed only to keep a module handle reachable
for an imagined patch target. Nothing in either processor actually
touches the module object — the re-exported aliases from the
"from _tools import ..." block cover every use — so the suppression
line was a workaround for a warning caused by dead code. Remove both.
The cancelled-bool scaffolding in _run_workflow had no observable
effect: the only place it was read was a trailing "if cancelled: pass"
in the finally block whose comment noted the raise was handled by the
already-executed "raise" inside each except asyncio.CancelledError
branch. Remove the flag, both assignments, and the terminal if-block.
Cancellation propagation is unchanged — the "raise" inside each
except branch still escapes through the finally after the terminal
release write.
bigBedToBed ships as a UCSC release binary in the api image; its
sibling bedToBigBed was missing. The integration test fixture builder
uses bedToBigBed to materialize a deterministic sample.bb from a tiny
sorted BED — without the tool the bigBed integration test skips. Add
it alongside bigBedToBed so CI exercises the full bigBed pipeline
end-to-end.
Closes the coverage gap left by the two existing integration tiers:
TestWoolExecutorPickleBoundary exercises Wool dispatch with a stub
processor that does no I/O, and the per-processor integration tests
run real samtools/htslib but call processor.run directly in the
test's event loop. Neither exercises the full claim to dispatch to
cache path from inside a Wool worker subprocess.

The new tests/integration package covers that gap:

  - fixtures/make_samples.py builds deterministic ~1 MB of sample
    data per session (SAM, BAM, VCF.gz, BED.gz, GFF3.gz, GTF.gz,
    narrowPeak.gz, broadPeak.gz, optional bigBed). The builders
    shell out to samtools / gzip / bedToBigBed; seeds are fixed so
    two runs produce byte-identical files and content-addressed
    cache keys stay stable.
  - conftest.py runs an http.server background thread serving the
    sample directory, so the Wool worker process can download over
    real HTTPS the same way production does. Mocking
    fetcher.download_source in the test process is insufficient —
    the worker re-imports its own copy and sees the real function.
  - test_processor_e2e.py parametrizes per format and asserts cache
    population, samtools quickcheck on the cached BAM, tabix query
    on staged bgz/tbi pairs, GFF3-attribute-syntax on converted
    GTF, and VCF header-block preservation end-to-end.
  - test_router_e2e.py drives the stream_file / stream_index_file /
    get_job_status handlers against a live executor: 202-then-200
    workflow, Range requests on cached artifacts, HEAD-no-dispatch,
    /jobs/{id} reporting, and the cached-index serve path.
  - test_concurrency.py asserts the Mongo partial-unique-index
    mutex dedupes two asyncio.gather'd /data requests onto a single
    workflow and converges /data + /index onto one workflow too.

Runtime is ~60 s under the integration marker with function-scoped
pools for cleanest isolation. Tests that require gffread or
bedToBigBed skip gracefully when the tool is absent.
The tabix processor assembles multi-stage shell pipelines like
"zcat -f | sort | bgzip > out.bgz" and invokes them through
run_shell. asyncio.create_subprocess_shell runs via /bin/sh, which
on Debian (the Dockerfile.api base) is dash — dash does not support
pipefail. A non-terminal stage failure (truncated gzip, gffread
syntax error, bigBedToBed read error, grep out-of-memory) exits
non-zero but the pipeline's overall returncode is the last stage's.
bgzip happily consumes whatever partial bytes arrived and exits 0,
run_shell returns success, and cache.put commits a truncated
artifact under its content-addressed key. Every subsequent request
then serves the corrupt pair until the upstream md5 changes.

Invoke the shell via bash -o pipefail -c so any stage's non-zero
exit surfaces at the pipeline boundary. bash is already in the
Dockerfile.api image. The BAM processor's samtools view | samtools
sort pipeline gains the same guarantee.

New tests assert that run_shell traps both final-stage and
non-terminal-stage failures, so a regression would fail loudly
rather than leak through the cache.
bam.py, tabix.py, and jobs.py each defined a module-level
logger = logging.getLogger(__name__) with zero callers in the
respective module. Drop each binding and the logging import. When
structured logging lands on the preprocessing path, re-add targeted
log statements rather than a speculative handle.
The FastAPI lifespan was reaching into WoolExecutor._pending_tasks,
a leading-underscore attribute explicitly marked private, to
implement the shutdown drain. That tightly couples the API entry
point to an executor implementation detail and leaks the private
marker's meaning.

Expose drain(timeout) as a public method on JobExecutor (abstract)
and WoolExecutor (concrete). The lifespan now calls
executor.drain(timeout=SHUTDOWN_DRAIN_TIMEOUT_SECONDS), which
returns the number of tasks that were pending at drain entry for
logging purposes. Tasks still running after the timeout are left
for stale-reclamation, same semantics as before. The asyncio import
drops out of main.py.
Processor._format_name was a leading-underscore staticmethod with
three external callers — ProcessorRegistry.lookup_for, and the
per-module _source_suffix helpers in bam.py and tabix.py — which
violates the underscore's meaning as a class-private marker.

Promote the helper to a module-level free function format_name in
processors/_tools.py (alongside the other shared subprocess /
cache-staging helpers) and update all four call sites. The function
is a pure dict reader with no self or cls state, so it belongs at
module scope rather than hanging off the ABC.
Two methods in TestExtractIdentity were typed without the underscore
between test and extract — testextract_identity_should_... Pytest
still collected them because "test" is a matching prefix, but the
test-naming convention test_<method>_should_<outcome>[_when_<cond>]
requires <method> to mirror the callable under test. Fix the name
so the target function (extract_identity) is readable from the
test identifier alone.
The project test guide mandates that integration tests live in a
dedicated tests/integration/ package. Three @pytest.mark.integration
classes remained under tests/test_workflows/ — a layout violation
now corrected:

  - TestWoolExecutorPickleBoundary from test_executor.py →
    tests/integration/test_executor_boundary.py, carrying its stub
    processor and the cloudpickle-by-value registration.
  - TestBamIndexProcessorIntegration from test_processors_bam.py →
    tests/integration/test_direct_processors.py as
    TestBamIndexProcessorDirectCall.
  - TestTabixIntervalProcessorIntegration (BED + VCF) from
    test_processors_tabix.py → the same new file as
    TestTabixIntervalProcessorDirectCall.

The direct-call classes are kept rather than deleted even though
tests/integration/test_processor_e2e.py covers the same formats
end-to-end through Wool: the direct-call variant skips the Wool
boundary and serves as a diagnostic layer — when an e2e test fails
you can still run the direct variant to isolate whether the
regression is in the processor pipeline itself or in the Wool /
cache / executor scaffolding around it.

test_executor.py drops its cloudpickle import, the by-value
registration, and the stub processor dependency now that they only
serve the moved class.
Both concurrency tests were named after the scenario (concurrent
data requests, concurrent data and index) rather than the callable
under test, missing the test_<method>_should_<outcome>[_when_<cond>]
segment. Rename to test_stream_file_should_... and
test_stream_file_and_stream_index_file_should_... so pytest output
identifies which handler each test drives.
stream_cache_entry is a new public helper in routers/_cache_stream.py
shared by /data and /index. It had no direct unit coverage — every
test reached it only through a router. Add tests asserting the five
response shapes it can produce: 200 GET full body, 200 HEAD no body,
206 with a Range slice, 416 for an out-of-bounds Range, and 400 for
a malformed Range header. Each assertion hits the helper directly
so future regressions surface here first without depending on router
integration.
The style guide forbids function-local / method-local / conditional
imports unless breaking a circular import with an explanatory
comment. New and touched test files accumulated ~25 such imports
inside test bodies — a holdover from the implementation round when
processors and registries were being wired up incrementally.

Hoist every test-body import to the module's import block across
test_data.py, test_index.py, test_workflows/test_executor.py,
test_workflows/test_processors_bam.py,
test_workflows/test_processors_tabix.py,
integration/test_processor_e2e.py, and integration/test_router_e2e.py.
Also replace "from pathlib import Path as _P" aliases with the
top-level Path import that now exists in each file.
Bundled low-risk improvements from the review's non-blocking list:

- lock.py: swap Tuple from typing for the lowercase builtin tuple
  to match the rest of the new workflow code (from __future__
  import annotations is already present).

- tests/integration/conftest.py: the samples fixture was calling
  generate_all a second time after sample_data_root had already
  built every file, paying samtools/gzip/bedToBigBed costs twice.
  Restructure so a new session-scoped _session_samples fixture
  runs generate_all exactly once, and sample_data_root and samples
  both read from it.

- tests/integration/conftest.py: drop the fixed 3-second sleep in
  wool_pool. The executor's _dispatch_with_retry already tolerates
  the LocalDiscovery startup window, so the sleep was pure
  overhead on every test. Integration suite runtime drops from
  ~60s to ~10s.

- fetcher.py: offload file writes with asyncio.to_thread so the
  download loop doesn't block the event loop on large artifacts.
  Matters when the executor runs in-process (tests, no WorkerPool);
  harmless when dispatched to a Wool worker with its own loop.
The ECS Fargate worker fleet needs boto3 at runtime for the S3 cache
backend, the ECS provisioner, and the worker discovery loop. moto is
added as a dev-only dependency so the unit tests can exercise the
boto3 code paths without standing up LocalStack or hitting real AWS.
@conradbzura conradbzura self-assigned this May 1, 2026
@conradbzura conradbzura linked an issue May 1, 2026 that may be closed by this pull request
14 tasks
@conradbzura conradbzura changed the title Deploy workflow workers as an ephemeral ECS Fargate fleet with Mongo-backed discovery — Closes #33 Deploy workflow workers as an ephemeral ECS Fargate fleet — Closes #33 May 1, 2026
@conradbzura conradbzura force-pushed the 33-ecs-fargate-discovery-deployment branch 2 times, most recently from 8ea9145 to 906c063 Compare May 2, 2026 22:53
S3Cache is a CacheBackend implementation over boto3 with the same
range-aware semantics as LocalFsCache (head_object, get_object with a
Range header, upload_file, delete_object). Production points it at
real S3; LocalStack-backed dev points it at the LocalStack endpoint
via AWS_ENDPOINT_URL — only the endpoint differs.

The backend supports an optional key prefix for sharing a single
bucket across environments, rejects path-traversal segments, and
yields an empty iterator on missing objects so router code can treat
cache misses uniformly across backends.
EcsProvisioner is a thin boto3 RunTask wrapper that launches an
ephemeral worker container per workflow, with awsvpc network
configuration, concurrent-call dedup keyed on the workflow mutex key
(so a burst of ensure_workflow calls on the same source doesn't fan
out into multiple tasks), and a semaphore guarding the ~20 req/s
RunTask rate limit. CapacityException covers both ClientError- and
failures[].reason-shaped capacity / ENI errors so the executor can
retry rather than hang.

EcsDiscovery is a Wool DiscoveryLike poll-and-diff over list_tasks +
describe_tasks: it filters on healthStatus HEALTHY, extracts the
awsvpc IP, and emits worker-added / worker-dropped events to non-
blocking subscribers via per-subscriber asyncio.Queue. State replay
on subscribe means subscribers attached after startup observe the
existing healthy fleet.
worker_main.py is the entrypoint baked into the worker container
image. It starts a wool.LocalWorker so the API can dispatch routines
to the task, exposes a tiny aiohttp /health endpoint that returns 503
during drain — so ECS marks the task unhealthy before stop_task kills
the gRPC port — and installs SIGTERM/SIGINT handlers so a stop_task
issued by the API or the Fargate scheduler shuts down cleanly. The
idle-shutdown timeout is configurable so tasks self-terminate when
their workflow completes.

The entrypoint deliberately does not register itself with discovery;
EcsDiscovery polls ECS directly and surfaces healthy tasks to the
pool, so the worker only needs to be listening and HEALTHY.
heartbeat_workflow bumps updated_at on an active job record so a
multi-hour preprocessing run (samtools sort on a multi-GB BAM, tabix
on a large text-interval file) doesn't trip the stale-reclaim
predicate in claim_workflow. The update is fenced on an active status
so a heartbeat against a terminal or stale-reclaimed record is a
silent no-op — the executor uses the False return as a stop signal.

STALE_WORKFLOW_THRESHOLD drops from 1 h to 15 min: with the heartbeat
refreshing updated_at every ~5 min while a job is in flight, a 15-min
threshold catches actually-dead workers usefully fast without the
risk of false-positive reclaiming a long sort that's still making
progress. DEFAULT_HEARTBEAT_INTERVAL_SECONDS exposes the cadence so
the executor and tests share one knob.
ensure_workflow now calls EcsProvisioner.request on a fresh claim so
the worker container is launched in parallel with the dispatch loop;
CapacityException is surfaced as a terminal FAILED job with a
retryable error string rather than hanging. _NO_WORKERS_RETRY_ATTEMPTS
widens from 5 to 60 (~60s window) to cover the Fargate cold-start
budget — booting a worker, pulling the SOCI-indexed image, binding
gRPC, and surfacing as HEALTHY in EcsDiscovery's next poll.

A heartbeat sidecar task bumps updated_at every heartbeat_interval
seconds while the workflow body runs and is cancelled in finally
before the terminal release, so the next tick can't observe an active
record while the executor is flipping status. DEFAULT_WORKFLOW_
DURATION_CAP_SECONDS rises from 20 min to 4 h to match the multi-hour
runtime profile; operators may bump it further via env.

The JobExecutor docstring drops the LambdaExecutor mention since the
production path is the Wool worker pool fed by EcsDiscovery.
The API gains the env vars needed to drive the new runtime: AWS_
ENDPOINT_URL and AWS_REGION (consumed by every boto3 client we
build, set to a LocalStack address in dev), WORKFLOW_S3_BUCKET /
PREFIX (when set, the lifespan picks S3Cache over LocalFsCache),
ECS_CLUSTER, ECS_WORKER_TASK_DEFINITION, ECS_WORKER_SUBNETS,
ECS_WORKER_SECURITY_GROUPS, ECS_WORKER_ASSIGN_PUBLIC_IP (when all
required vars are present, _maybe_build_provisioner returns an
EcsProvisioner so dispatch can launch ephemeral worker tasks), and
WORKFLOW_DURATION_CAP_SECONDS for operators bumping the multi-hour
runtime cap.

The bare PoC profile sets none of these and continues to use the
local filesystem cache plus the in-process WorkerPool. Production
sets all of them and the lifespan transparently switches to the
ECS-backed substrate.
@conradbzura conradbzura force-pushed the 33-ecs-fargate-discovery-deployment branch from 906c063 to 89f4685 Compare May 2, 2026 23:59
@conradbzura conradbzura force-pushed the 30-add-preprocessing-indexing-workflow branch 2 times, most recently from c8585f1 to b92c47a Compare May 4, 2026 01:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Deploy workflow workers as an ephemeral ECS Fargate fleet

1 participant