Analysis Pipeline
End to end from the audio upload until the track reaches
Status = Ready: HLS encoding, acoustic analysis
(librosa), ML enrichment (Essentia + TensorFlow), persistence and
fail-open.
1 Full sequence
ProcessTrackConsumer participant A as audio-analyzer
(librosa) participant E as Worker
EnrichTrackAudioConsumer participant En as audio-enricher
(Essentia + TF) participant DB User->>API: POST /tracks (upload) API->>S3: temporary/tracks/{id}.mp3 API->>DB: Track(Status=Queued) + TrackJob API->>MQ: ProcessTrackMessage API-->>User: 201 Created MQ->>P: consume (prefetch=2) P->>DB: job=Processing, track=Processing P->>P: FFmpeg → HLS low/mid/high P->>S3: upload segments + manifests P->>API: POST /tracks/{id}/segments P->>A: analyze.py {file} A-->>P: { BPM, key, loudness, ... } P->>DB: upsert track_audio_features (phase 1) P->>DB: track=Enriching (8) P->>MQ: EnrichTrackAudioMessage MQ->>E: consume (prefetch=1) E->>S3: download temp audio E->>En: enrich.py {file} En-->>E: { dance, energy, moods, embedding[1280], ... } E->>DB: upsert features (phase 2) + EnrichmentStatus=Completed E->>S3: DELETE temp audio E->>DB: track=Ready (3) Note over E,DB: Fails 3x → EnrichmentStatus=Failed
but track=Ready anyway
2 Step by step
User upload
POST /tracks (multipart) with the audio file. The
API saves a temporary copy to
temporary/tracks/{trackId}.{ext} in S3 and creates
the Track with Status = Queued.
Message to the Worker
The API publishes
ProcessTrackMessage(jobId, trackId, audioUrl) to
the beatix.process-track RabbitMQ queue.
HLS encoding
ProcessTrackConsumer consumes (prefetch=2), marks
the track as Processing and invokes FFmpeg in a
single pass producing three qualities:
- low: 256 kbps
- medium: 512 kbps
- high: 1024 kbps
The .ts segments and index.m3u8
manifests are uploaded to S3 under
{trackId}/hls/{quality}/; the segment index is
persisted via POST /tracks/{id}/segments.
Phase 1 · Acoustic analysis (librosa)
The Worker downloads the temporary audio and invokes, as a
subprocess, audio-analyzer/analyze.py {path}. The
sidecar returns JSON with:
Bpm, BpmConfidence, MusicalKey, Mode, KeyConfidence, LoudnessLufs, PeakDb, TruePeakDb, WaveformPeaks[800], Fingerprint, AcoustIdMatch, SpeechMusicProfile, AnalyzedAtUtc
The result is upserted into
track_audio_features. The track advances to
Status = Enriching (8).
Queue enrichment
The Worker publishes
EnrichTrackAudioMessage(trackId, tempAudioObjectKey)
to the beatix.enrich-track-audio queue. The
temporary audio is not deleted yet — the
enricher needs it.
Phase 2 · Enrichment (Essentia + TF)
EnrichTrackAudioConsumer downloads the audio to
/tmp/beatix-enrich-{uuid}.{ext} and invokes
audio-enricher/enrich.py in two stages:
- Classic Essentia (44100 Hz): RhythmExtractor2013, KeyExtractor, DynamicComplexity, RMS, ERBBands.
- TensorFlow discogs-effnet (16000 Hz): 1280-d embedding (mean pooling), Danceability, Voice/Instrumental, 5 mood classifiers, the 56-tag multi-label model, and the Valence/Arousal heuristics.
The result is upserted into the Phase 2 columns +
EmbeddingDiscogs via vector SQL. The track advances
to Status = Ready (3).
Cleanup & publication
The temporary audio is deleted from S3. The track can now be
published via PATCH /tracks/{id} with
Status = Published, at which point it appears in
the public catalog.
3 Queues & consumers
| Queue | Consumer | Prefetch | Retry | Payload |
|---|---|---|---|---|
beatix.process-track |
ProcessTrackConsumer |
2 | No retry · DLQ | { jobId, trackId, audioUrl } |
beatix.enrich-track-audio |
EnrichTrackAudioConsumer |
1 | 2× every 5 min | { trackId, tempAudioObjectKey } |
Each enricher run loads TensorFlow models (discogs-effnet ~150 MB) and processes the entire audio at 16 kHz. It is CPU/memory intensive, so one worker processes one message at a time to avoid OOM and thrashing.
4 Failures & fail-open
Enrichment is not blocking. If the enricher fails 3 times, the consumer:
-
Records
EnrichmentStatus = FailedandEnrichmentLastError. - Advances the track to
Status = Ready. -
Leaves the ML columns
null— the UI shows a warning icon, but the track is playable.
This avoids holding uploads hostage to enricher bugs or pathological inputs (near-empty files, samples too short for the model).
Error scenarios
| Scenario | Behaviour |
|---|---|
| FFmpeg fails (corrupted) |
Status = Failed, the message goes to the DLQ for
manual investigation.
|
| analyze.py crash / timeout |
Same as FFmpeg — Status = Failed. Enrichment is
not queued.
|
| enrich.py crash |
Retry 2× (3 attempts total). Once exhausted → fail-open:
EnrichmentStatus = Failed,
Status = Ready.
|
| Very short track (<3 s) | Can trip the TF models (input shape). Handled as fail-open. |
| S3 unavailable | The message returns to the queue; eventually the DLQ if it persists. |
5 Key files
| Role | Path |
|---|---|
| Processing consumer | apps/worker/src/Consumers/ProcessTrackConsumer.cs |
| Enrichment consumer | apps/worker/src/Consumers/EnrichTrackAudioConsumer.cs |
| Analyzer invoker (subprocess) | apps/worker/src/Services/AudioAnalyzerInvoker.cs |
| Enricher invoker (subprocess) | apps/worker/src/Services/AudioEnricherInvoker.cs |
| Upsert repository | apps/worker/src/Services/AudioFeaturesRepository.cs |
| Python sidecar — Phase 1 | apps/audio-analyzer/analyze.py |
| Python sidecar — Phase 2 | apps/audio-enricher/enrich.py |
| pgvector + HNSW index migration | apps/api/src/Migrations/20260513230003_AddAudioEnrichmentSchema.cs |
| Similarity endpoint | apps/api/src/Features/Tracks/Endpoints/GetSimilar/Endpoint.cs |
| Original markdown diagram | docs/track-upload-flow.md |