Analysis Pipeline

End to end from the audio upload until the track reaches Status = Ready: HLS encoding, acoustic analysis (librosa), ML enrichment (Essentia + TensorFlow), persistence and fail-open.

1 Full sequence

sequenceDiagram actor User participant API participant S3 participant MQ as RabbitMQ participant P as Worker
ProcessTrackConsumer participant A as audio-analyzer
(librosa) participant E as Worker
EnrichTrackAudioConsumer participant En as audio-enricher
(Essentia + TF) participant DB User->>API: POST /tracks (upload) API->>S3: temporary/tracks/{id}.mp3 API->>DB: Track(Status=Queued) + TrackJob API->>MQ: ProcessTrackMessage API-->>User: 201 Created MQ->>P: consume (prefetch=2) P->>DB: job=Processing, track=Processing P->>P: FFmpeg → HLS low/mid/high P->>S3: upload segments + manifests P->>API: POST /tracks/{id}/segments P->>A: analyze.py {file} A-->>P: { BPM, key, loudness, ... } P->>DB: upsert track_audio_features (phase 1) P->>DB: track=Enriching (8) P->>MQ: EnrichTrackAudioMessage MQ->>E: consume (prefetch=1) E->>S3: download temp audio E->>En: enrich.py {file} En-->>E: { dance, energy, moods, embedding[1280], ... } E->>DB: upsert features (phase 2) + EnrichmentStatus=Completed E->>S3: DELETE temp audio E->>DB: track=Ready (3) Note over E,DB: Fails 3x → EnrichmentStatus=Failed
but track=Ready anyway

2 Step by step

1

User upload

POST /tracks (multipart) with the audio file. The API saves a temporary copy to temporary/tracks/{trackId}.{ext} in S3 and creates the Track with Status = Queued.

apps/api · TracksController
2

Message to the Worker

The API publishes ProcessTrackMessage(jobId, trackId, audioUrl) to the beatix.process-track RabbitMQ queue.

MassTransit · Direct exchange
3

HLS encoding

ProcessTrackConsumer consumes (prefetch=2), marks the track as Processing and invokes FFmpeg in a single pass producing three qualities:

  • low: 256 kbps
  • medium: 512 kbps
  • high: 1024 kbps

The .ts segments and index.m3u8 manifests are uploaded to S3 under {trackId}/hls/{quality}/; the segment index is persisted via POST /tracks/{id}/segments.

apps/worker · ProcessTrackConsumer.cs
4

Phase 1 · Acoustic analysis (librosa)

The Worker downloads the temporary audio and invokes, as a subprocess, audio-analyzer/analyze.py {path}. The sidecar returns JSON with:

Bpm, BpmConfidence, MusicalKey, Mode, KeyConfidence, LoudnessLufs, PeakDb, TruePeakDb, WaveformPeaks[800], Fingerprint, AcoustIdMatch, SpeechMusicProfile, AnalyzedAtUtc

The result is upserted into track_audio_features. The track advances to Status = Enriching (8).

apps/audio-analyzer · analyze.py
5

Queue enrichment

The Worker publishes EnrichTrackAudioMessage(trackId, tempAudioObjectKey) to the beatix.enrich-track-audio queue. The temporary audio is not deleted yet — the enricher needs it.

MassTransit · prefetch=1, retry 2× every 5 min
6

Phase 2 · Enrichment (Essentia + TF)

EnrichTrackAudioConsumer downloads the audio to /tmp/beatix-enrich-{uuid}.{ext} and invokes audio-enricher/enrich.py in two stages:

  • Classic Essentia (44100 Hz): RhythmExtractor2013, KeyExtractor, DynamicComplexity, RMS, ERBBands.
  • TensorFlow discogs-effnet (16000 Hz): 1280-d embedding (mean pooling), Danceability, Voice/Instrumental, 5 mood classifiers, the 56-tag multi-label model, and the Valence/Arousal heuristics.

The result is upserted into the Phase 2 columns + EmbeddingDiscogs via vector SQL. The track advances to Status = Ready (3).

apps/audio-enricher · enrich.py
7

Cleanup & publication

The temporary audio is deleted from S3. The track can now be published via PATCH /tracks/{id} with Status = Published, at which point it appears in the public catalog.

3 Queues & consumers

Queue Consumer Prefetch Retry Payload
beatix.process-track ProcessTrackConsumer 2 No retry · DLQ { jobId, trackId, audioUrl }
beatix.enrich-track-audio EnrichTrackAudioConsumer 1 2× every 5 min { trackId, tempAudioObjectKey }
Why prefetch=1 on the enricher?

Each enricher run loads TensorFlow models (discogs-effnet ~150 MB) and processes the entire audio at 16 kHz. It is CPU/memory intensive, so one worker processes one message at a time to avoid OOM and thrashing.

4 Failures & fail-open

stateDiagram-v2 [*] --> Queued: upload Queued --> Processing: ProcessTrack starts Processing --> Failed: unrecoverable error Processing --> Enriching: HLS + analyze.py OK Enriching --> Ready: enrich.py OK Enriching --> Ready: fail-open after 3 attempts Ready --> Published: artist publishes Ready --> Archived: artist archives Ready --> TakenDown: DMCA / TOS Published --> Archived Published --> TakenDown Failed --> [*] TakenDown --> [*]
Fail-open strategy

Enrichment is not blocking. If the enricher fails 3 times, the consumer:

  • Records EnrichmentStatus = Failed and EnrichmentLastError.
  • Advances the track to Status = Ready.
  • Leaves the ML columns null — the UI shows a warning icon, but the track is playable.

This avoids holding uploads hostage to enricher bugs or pathological inputs (near-empty files, samples too short for the model).

Error scenarios

Scenario Behaviour
FFmpeg fails (corrupted) Status = Failed, the message goes to the DLQ for manual investigation.
analyze.py crash / timeout Same as FFmpeg — Status = Failed. Enrichment is not queued.
enrich.py crash Retry 2× (3 attempts total). Once exhausted → fail-open: EnrichmentStatus = Failed, Status = Ready.
Very short track (<3 s) Can trip the TF models (input shape). Handled as fail-open.
S3 unavailable The message returns to the queue; eventually the DLQ if it persists.

5 Key files

Role Path
Processing consumer apps/worker/src/Consumers/ProcessTrackConsumer.cs
Enrichment consumer apps/worker/src/Consumers/EnrichTrackAudioConsumer.cs
Analyzer invoker (subprocess) apps/worker/src/Services/AudioAnalyzerInvoker.cs
Enricher invoker (subprocess) apps/worker/src/Services/AudioEnricherInvoker.cs
Upsert repository apps/worker/src/Services/AudioFeaturesRepository.cs
Python sidecar — Phase 1 apps/audio-analyzer/analyze.py
Python sidecar — Phase 2 apps/audio-enricher/enrich.py
pgvector + HNSW index migration apps/api/src/Migrations/20260513230003_AddAudioEnrichmentSchema.cs
Similarity endpoint apps/api/src/Features/Tracks/Endpoints/GetSimilar/Endpoint.cs
Original markdown diagram docs/track-upload-flow.md