SignalFlow

architecture v3 · 4-platform ingestion · unified signal bus · dual-model NLP
● LIVE 4 Platforms RoBERTa + VADER 4.1M Signals ✦ v3
1
01
Ingestion — 4 Platform Sources
asyncpraw · AT Protocol firehose · Algolia API · YouTube Data API v3
asyncio 194 subreddits comments ✦
Reddit + Comments
194 subreddits, independent asyncio tasks
Hot-reload config from source_config DB
Top-25 comments per new post (score ≥ 5)
TooManyRequests → backoff + retry
Bluesky + HackerNews
AT Protocol firehose — push, no polling
Auto-reconnect on disconnect
HN: Algolia API, 5-min poll, 80 stories
YouTube
Channels via source_config (stable @handles)
6-hour poll interval
Comments are primary signal (not video meta)
# All sources produce to the same unified topic via normaliser.py signal = normalise("reddit", raw_post) # → {id, platform, title, body, raw_score, extra} producer.send("signals.normalised", signal) # Reddit comment ingestion — already in submission object, no extra API calls for comment in submission.comments[:25]: if comment.score < 5: continue # quality filter raw = _serialize_comment(comment, submission, name) producer.send("signals.normalised", normalise("reddit", raw)) # extra = {is_comment: true, parent_post_id: "...", parent_post_title: "..."}
194
Subreddits
260K+
Signals / Day
4
Platforms
4.1M
Total Signals
Token bucket rate limiter per platform prevents burst overload. Reddit startup burst (all 194 subreddits firing simultaneously) handled by per-subreddit backoff — normalises within seconds. Comments doubled daily volume from 40K to 260K/day — each comment is a separate signal with is_comment: true in the JSONB extra field, enabling per-signal model routing downstream.
kafka partition assignment · Snappy compression
2
02
Kafka — Unified Signal Bus
signals.normalised · Snappy compression · manual offset commits · DLQ
exactly-once DLQ lag < 1000
Active Topics
signals.normalised — unified bus (all 4 platforms)
reddit.posts.raw — raw payload archive
reddit.posts.refresh — score/velocity updates
*.dlq — dead letter queue per topic
Reliability
Manual offset commits → exactly-once semantics
Consumer lag monitored — sustained < 1,000 msgs
OOM crash → restart picks up from last offset
DLQ consumer with HTTP replay for poison pills
Ingestion and processing are fully decoupled. When the processing container was OOM-killed (RoBERTa memory pressure), Kafka buffered all messages. After restart, consumer lag returned to <10 within seconds — zero signal loss. This is the key architectural guarantee: no matter what happens downstream, ingested data is never dropped.
consumer group picks up · micro-batch window opens
3
03
Processing — Dual-Model NLP Pipeline
RoBERTa (comments) · VADER (posts) · spaCy NER · velocity · trending score
RoBERTa VADER spaCy NER dual-model ✦
Dual-Model Sentiment NEW
is_comment=True → RoBERTa (cardiffnlp)
Trained on 124M tweets — handles sarcasm
is_comment=False → VADER (posts/headlines)
VADER fast, accurate for formal text
Proven: trump posts -0.10 vs comments -0.47
NLP + Scoring Pipeline
spaCy NER + 50-term EntityRuler vocabulary
NOISE_ENTITIES filter (nta, yta, wibta…)
CANONICAL_MAP merges variants → iran
Redis velocity cache → shared across replicas
trending_score = velocity × spread × recency
# Dual-model routing in _get_sentiment() — main_processor.py is_comment = bool((signal.get("extra") or {}).get("is_comment")) text = signal.get("body") if is_comment else signal.get("title") or signal.get("body") _sentiment_cache[sid] = analyze_sentiment(text, is_comment=is_comment) # analyze_sentiment() routes to correct model if is_comment: return _analyze_roberta(text) # lazy-loaded, cached after first call return _analyze_vader(text)
Split-brain prevention: 3 processing replicas share a Redis velocity cache — all replicas read/write the same baseline, ensuring consistent trending scores across instances. Sentiment cache keyed by signal ID — RoBERTa runs once per comment lifetime (~100ms), never again on refresh cycles. Memory ceiling: 2GB limit (spaCy 200MB + RoBERTa 500MB + overhead).
WAL streams to replica within milliseconds
4
04 🗄
Storage — TimescaleDB + Indexes + Replication
GIN index on topics · published_at index · WAL replica · ORM read routing
TimescaleDB GIN index 47× faster ✦
3+ min
Before Index
120ms
After GIN Index
47×
Improvement
5.4GB
Signals Table
4.1M
Total Rows
Key Indexes ADDED
idx_signals_topics_gin — GIN on topics JSONB
topics @> '["iran"]' → Bitmap Index Scan
idx_signals_published_at — btree DESC
BitmapAnd combines both → 120ms execution
Replication + Routing
WAL streaming to hot standby replica
ORM ReadReplicaRouter — all reads → replica
Writes → primary only
Replication lag typically <50ms local network
TablePurposeKey Detail
signalsUnified signal store — all 4 platformsGIN(topics) + btree(published_at) · 4.1M rows · 5.4GB
topic_timeseries15-min bucketed topic volumeTimescaleDB hypertable · 90-day retention · sparklines source
platform_divergenceCross-platform sentiment disagreement eventsDivergence detector · 15-min cycle · min 15 signals/platform
topic_summariesLLM-generated intelligence per topicON CONFLICT (topic, window_minutes) DO UPDATE — always fresh
watched_topicsWebhook alert subscriptionsCooldown · threshold · last_fired_at · race-safe UPDATE
alert_deliveriesWebhook delivery audit logHTTP status · latency_ms · append-only
Archive strategy: archive_signals.py exports signals older than 90 days to Parquet (Snappy compressed), confirms download, then deletes. No automated scheduler yet — manual trigger when disk exceeds 70%. Currently at 49% of 75GB with ~70 days runway at 260K signals/day.
topic aggregator polls every 60s · watermark advances
5
05
Background Workers — Aggregation + Divergence
topic_aggregator · divergence_detector · both independent containers
60s cycle 15-min buckets divergence ✦
Topic Aggregator
Polls signals table every 60s via watermark
Buckets by (topic, platform, 15-min window)
UPSERT into topic_timeseries
Source for sparklines + Grafana traction chart
Divergence Detector NEW
Runs every 15 minutes
Min 15 signals/platform (noise filter)
Threshold 0.3 on [-1,1] sentiment scale
Writes to platform_divergence table
trump: Reddit -0.10 vs YouTube -0.76 → alert
Both workers run as independent containers — failure doesn't affect ingestion or API. The divergence detector was written early but never wired into docker-compose until recently, which is why platform_divergence was empty despite 2.5M signals. Lesson: independently-deployed workers need explicit orchestration verification.
LLM summariser fires every 15 min · Groq round-robin
6
06 🧠
LLM Intelligence — Topic Summariser
every 15 min · Groq/OpenAI/Anthropic · source attribution · outside hot path
source attribution ✦ 15-min cycle
LLM Output Fields
summary — 3-sentence discourse narrative
dominant_narrative — with source attribution
emerging_angle — minority counter-narrative
divergence_explanation — why platforms differ
source_platform — dominant platform driver
Rate Limit Strategy
3 Groq keys, round-robin = 90 req/min
Semaphore(3) — one worker per key
2.5s sleep between calls per worker
62s wait + retry pass for 429'd topics
ON CONFLICT DO UPDATE — always overwrites stale
# Source attribution baked into dominant_narrative at write time source_pf = parsed.get("source_platform") # LLM identifies dominant platform if source_pf in platform_labels and dominant: dominant = f"{platform_labels[source_pf]}-dominant · {dominant}" # Result: "YouTube-dominant · Iranian diaspora clash over Middle East war" # Runtime fallback in _get_intelligence() — catches pre-fix summaries if share >= 0.5 and narrative and "·" not in narrative: narrative = f"{platform_label}-dominant · {narrative}"
Attribution applied at two layers: write time (LLM explicitly identifies dominant platform, stored permanently) and read time (runtime fallback for summaries written before the feature was added). The "·" not in narrative guard prevents double-attribution on already-attributed strings.
Redis cache layer · Daphne ASGI serves HTTP + WebSocket
7
07
API Layer — Django + Daphne ASGI
10 endpoints · Redis cache · GIN-indexed queries · webhook push
10 endpoints webhooks ✦ leaderboard ✦
GET/api/v1/trending/velocity-ranked topics · sparklines_7d · 120s cache
GET/api/v1/pulse/?topic=per-topic sentiment · top 3 signals per platform · LLM intelligence · 300s cache
GET/api/v1/divergence/leaderboard/top 10 cross-platform disputes · DISTINCT ON · severity · 60s cache NEW
GET/api/v1/compare/divergence events by topic/platform pair
GET/POST/DELETE/api/v1/alerts/watch/webhook subscriptions · cooldown · audit log · requires token NEW
GET/api/v1/signals/raw signal feed · filterable by platform/topic/trending
GET/api/v1/stats/totals/platform signal counts · all-time + 24h · 300s cache
WS/ws/signals/real-time push via Django Channels + Redis pub/sub
AUTH/api/token/POST → {access, refresh} · access expires 60min · POST /api/token/refresh/ to rotate
GET/health/health check · no auth required · no DB query
Webhook Alert System NEW
POST /alerts/watch/ → row in watched_topics
TrendingView fires _dispatch_topic_alerts()
3 gates: trend threshold, platform count, cooldown
Race-safe: UPDATE last_fired_at (no read-modify-write)
All attempts logged in alert_deliveries
Caching Strategy
trending: 120s · pulse: 300s · stats: 300s
divergence/leaderboard: 60s (freshness matters)
Cache keyed by endpoint + query params
WebSocket bypasses cache entirely — always live
Redis DB1 (REST cache) · DB2 (Channels pub/sub)
# WebSocket push after bulk upsert — processing service redis.publish("asgi:group:posts_feed", signal_payload) # Django Channels consumer receives and forwards to clients class SignalConsumer(AsyncWebsocketConsumer): async def signal_update(self, event): await self.send(json.dumps(event["payload"]))
Daphne serves HTTP and WebSocket on the same ASGI process. All REST endpoints require Authorization: Bearer <access_token> header. Cache TTLs are intentionally conservative on divergence (60s) vs pulse (300s) — divergence data drives alerts, staleness matters more.
Grafana scrapes every 60s · Prometheus scrapes every 15s
8
08 📊
Observability — Grafana + Prometheus
1-min refresh · Postgres replica + Prometheus · 6 panels · DLQ alerting
1min refresh 6 panels pipeline health ✦
📈 Topic Traction Timeline
source: topic_timeseries · 15-min buckets · line chart per topic · sparklines_7d from TimescaleDB hypertable
🌡 Platform Divergence Heatmap
source: platform_divergence · platform × topic matrix · colour = delta_score · threshold 0.3 on [-1,1] scale
🔥 Trending Topic Cards
source: trending_topics_24h VIEW · stat panels · velocity + trending_score · refreshes every 60s
📊 Top Topics by Platform
source: topic_leaderboard_2h VIEW · bar chart · grouped by platform · updated on each query
⚡ Pipeline Throughput
source: Prometheus · scrapes ×3 processing replicas port 8000 every 15s · messages/sec · batch flush latency P99 · RoBERTa inference time
🚨 DLQ Rate
source: Prometheus · reddit_processor_dlq_messages_total · sustained 0 = healthy · any non-zero rate = data loss in progress · page immediately
Datasources
Postgres replica — topic/signal/divergence panels
ReadReplicaRouter keeps Grafana off primary
Prometheus — pipeline health panels
Scrapes all 3 processing replicas on port 8000
Both provisioned via grafana/provisioning/
View Strategy
trending_topics_24h — regular VIEW, recomputes on query
topic_leaderboard_2h — regular VIEW, recomputes on query
Promote to MATERIALIZED VIEW + refresh job if latency grows at scale
No manual Grafana setup — fully provisioned on container start
Grafana has two datasources provisioned automatically via grafana/provisioning/ — Postgres replica for topic/signal panels and Prometheus for pipeline health panels. No manual setup needed on fresh deploy. DLQ rate is the most critical panel — any sustained non-zero value means messages are being dropped and requires immediate investigation. Consumer lag and RoBERTa inference time are leading indicators of processing backpressure before it manifests as latency.