Skip to main content
Technical Guides

Build a Multi-Signal Pipeline with the Rodz API

Peter Cools · · 15 min read

TL;DR: A single intent signal is useful. Multiple signals, combined and scored in real time, are transformative. This guide walks you through building a multi-signal pipeline on top of the Rodz API, covering every stage from ingestion through normalization, enrichment, scoring, routing and action. By the end you will have a clear architecture you can implement in any language, with concrete code examples in Python and JavaScript to get you moving fast.

What Is a Multi-Signal Pipeline?

A multi-signal pipeline is an architecture that collects intent signals from several sources, standardizes them into a common format, enriches them with additional context, scores them against your ideal customer profile, routes them to the right team or system, and triggers an action. Instead of reacting to one event at a time, you evaluate accounts based on the full picture: a funding round plus a leadership change plus a job posting plus a spike in web traffic. The combination tells a far more compelling story than any single data point.

Think of it this way. A funding announcement alone might generate hundreds of leads. A funding announcement at a company that also just posted three sales roles, changed its CTO, and started evaluating tools in your category? That is a signal worth dropping everything for.

The Rodz API gives you a single interface to access all of these signal types. The pipeline you build on top of it determines how effectively you act on them.

If you are new to the Rodz API, start with the complete API reference to familiarize yourself with endpoints, authentication and rate limits before continuing here.

Prerequisites

Before building your pipeline, make sure the following are in place:

  1. A Rodz API key with full signal and enrichment permissions. If you have not generated one yet, follow the authentication guide.
  2. At least three active signal types configured in your Rodz dashboard. The pipeline becomes valuable when you combine multiple signals. Financial, HR and competitive signals are a strong starting combination.
  3. Webhooks configured for real-time delivery. Polling works for batch processing, but a true multi-signal pipeline benefits enormously from real-time ingestion. See the webhook setup guide for step-by-step instructions.
  4. A message queue or event bus (Redis Streams, RabbitMQ, Amazon SQS, or similar). This decouples ingestion from processing and prevents you from losing events during traffic spikes.
  5. A datastore for signal history. PostgreSQL, MongoDB or any database that supports time-series queries. You need to look up past signals for the same account when scoring.
  6. Python 3.10+ or Node.js 18+ installed. Code examples below use both languages.

Pipeline Architecture Overview

The pipeline has six stages. Each stage is a discrete step that can run as its own service or as a function within a larger application. Keeping them separate makes the system easier to debug, scale and extend.

[Rodz API / Webhooks]
        |
   1. INGESTION
        |
   2. NORMALIZATION
        |
   3. ENRICHMENT
        |
   4. SCORING
        |
   5. ROUTING
        |
   6. ACTION

Let us walk through each stage in detail.

Stage 1: Ingestion

Ingestion is the entry point. Signals arrive either through webhook delivery (recommended for real-time use cases) or through periodic API polling (suitable for batch workflows). In most production pipelines, you will use both. Webhooks handle the real-time flow. A scheduled polling job catches anything that might have been missed during a webhook outage or retry failure.

Webhook Ingestion

When a signal fires, Rodz sends an HTTP POST to your registered endpoint. Your webhook handler should do three things: validate the HMAC signature, acknowledge receipt immediately, and push the raw payload onto your message queue.

import hmac
import hashlib
import json
from flask import Flask, request, jsonify
from redis import Redis

app = Flask(__name__)
redis_client = Redis(host="localhost", port=6379, db=0)

WEBHOOK_SECRET = "your_webhook_secret"

def verify_signature(payload_body: bytes, signature: str) -> bool:
    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        payload_body,
        hashlib.sha256
    ).hexdigest()
    return hmac.compare_digest(f"sha256={expected}", signature)

@app.route("/webhooks/rodz", methods=["POST"])
def handle_webhook():
    signature = request.headers.get("X-Rodz-Signature", "")
    if not verify_signature(request.data, signature):
        return jsonify({"error": "Invalid signature"}), 401

    # Push raw payload to the queue immediately
    redis_client.xadd("signals:raw", {"payload": request.data})

    return jsonify({"received": True}), 200

The key principle here is speed. Acknowledge the webhook, queue the payload, and return. All processing happens downstream.

Polling Fallback

Run a scheduled job (cron, Celery beat, or a simple setInterval) that fetches recent signals from the API. Use cursor-based pagination and store the last cursor so you only fetch new events.

import requests
import time

API_BASE = "https://api.rodz.io/v1"
API_KEY = "your_api_key"
HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Content-Type": "application/json"
}

def poll_signals(last_cursor=None):
    params = {"limit": 100}
    if last_cursor:
        params["cursor"] = last_cursor

    response = requests.get(
        f"{API_BASE}/signals",
        headers=HEADERS,
        params=params
    )
    data = response.json()

    for signal in data["items"]:
        redis_client.xadd("signals:raw", {
            "payload": json.dumps(signal)
        })

    return data.get("next_cursor")

Run this every five minutes as a safety net. Any signals already delivered via webhook will be deduplicated in the normalization stage.

Stage 2: Normalization

Raw signals from different sources and signal types arrive in slightly different shapes. A funding round event has different fields from a job posting event. Normalization transforms every signal into a consistent internal format that downstream stages can rely on.

Defining the Canonical Schema

Your canonical signal object should include at minimum:

{
  "signal_id": "sig_abc123",
  "signal_type": "funding_round",
  "company_id": "comp_xyz789",
  "company_name": "Acme Corp",
  "company_domain": "acme.com",
  "occurred_at": "2026-03-12T14:30:00Z",
  "ingested_at": "2026-03-12T14:30:02Z",
  "raw_data": {},
  "metadata": {
    "source": "webhook",
    "rodz_confidence": 0.95
  }
}

The raw_data field preserves the original payload so you never lose information. The outer fields are the ones your pipeline logic operates on.

The Normalizer

from datetime import datetime, timezone

SIGNAL_TYPE_MAP = {
    "fundraising": "funding_round",
    "job_posting": "job_posting",
    "leadership_change": "leadership_change",
    "merger_acquisition": "merger_acquisition",
    "tender_published": "public_tender",
    "tech_stack_change": "tech_change",
    "content_published": "content_signal",
    "social_mention": "social_signal"
}

def normalize(raw_payload: dict) -> dict:
    signal_type = SIGNAL_TYPE_MAP.get(
        raw_payload.get("type", ""),
        raw_payload.get("type", "unknown")
    )

    company = raw_payload.get("company", {})

    return {
        "signal_id": raw_payload["id"],
        "signal_type": signal_type,
        "company_id": company.get("id"),
        "company_name": company.get("name"),
        "company_domain": company.get("domain"),
        "occurred_at": raw_payload.get("occurred_at"),
        "ingested_at": datetime.now(timezone.utc).isoformat(),
        "raw_data": raw_payload,
        "metadata": {
            "source": raw_payload.get("_source", "api"),
            "rodz_confidence": raw_payload.get("confidence", 1.0)
        }
    }

Deduplication

Signals can arrive twice when you use both webhooks and polling. Deduplicate by signal_id using a Redis set or a unique constraint in your database.

def is_duplicate(signal_id: str) -> bool:
    return not redis_client.sadd("signals:seen", signal_id)

def process_raw_event(raw_payload: dict) -> dict | None:
    normalized = normalize(raw_payload)
    if is_duplicate(normalized["signal_id"]):
        return None
    return normalized

Set a TTL on the Redis set (e.g. 7 days) so it does not grow unbounded.

Stage 3: Enrichment

A signal tells you what happened. Enrichment tells you who it happened to and why it matters. At this stage, you augment the normalized signal with firmographic data, contact information, technographic details, and any other context your scoring model needs.

The Rodz API provides enrichment endpoints for both companies and contacts. Use them here to fill in the gaps.

Company Enrichment

def enrich_company(domain: str) -> dict:
    response = requests.get(
        f"{API_BASE}/enrich/company",
        headers=HEADERS,
        params={"domain": domain}
    )
    if response.status_code == 200:
        return response.json()
    return {}

This returns firmographic data such as employee count, industry, revenue range, location, and founding year. For a deeper look at the enrichment capabilities, see the company and contact enrichment guide.

Contact Enrichment

Once you know the company, you often want to identify the right person to reach out to. The contact enrichment endpoint lets you find and verify decision-makers.

def enrich_contacts(company_id: str, role_filter: str = "CTO") -> list:
    response = requests.get(
        f"{API_BASE}/enrich/contacts",
        headers=HEADERS,
        params={
            "company_id": company_id,
            "role": role_filter,
            "verified_email": True
        }
    )
    if response.status_code == 200:
        return response.json().get("contacts", [])
    return []

Combining Enrichment with the Signal

def enrich_signal(signal: dict) -> dict:
    company_data = enrich_company(signal["company_domain"])
    contacts = enrich_contacts(signal["company_id"])

    signal["enrichment"] = {
        "company": company_data,
        "contacts": contacts,
        "enriched_at": datetime.now(timezone.utc).isoformat()
    }
    return signal

A word on rate limits. The Rodz API allows 100 requests per minute. If you are processing a high volume of signals, implement a rate limiter (token bucket or leaky bucket) in front of your enrichment calls. Queue signals that cannot be enriched immediately and process them in the next window. Check the API reference for the latest rate limit details.

Stage 4: Scoring

Scoring is where the multi-signal approach pays off. Instead of treating each signal independently, you look at the full history of signals for a given account and compute a composite score.

Designing Your Scoring Model

A practical starting point is a weighted additive model. Assign a base score to each signal type, apply multipliers for recency and signal density, and cap the result.

SIGNAL_WEIGHTS = {
    "funding_round": 30,
    "leadership_change": 20,
    "job_posting": 15,
    "merger_acquisition": 25,
    "public_tender": 20,
    "tech_change": 10,
    "content_signal": 5,
    "social_signal": 5
}

RECENCY_MULTIPLIERS = {
    7: 1.0,     # Last 7 days: full weight
    14: 0.8,    # 8-14 days: 80%
    30: 0.5,    # 15-30 days: 50%
    90: 0.2     # 31-90 days: 20%
}

def get_recency_multiplier(days_ago: int) -> float:
    for threshold, multiplier in sorted(RECENCY_MULTIPLIERS.items()):
        if days_ago <= threshold:
            return multiplier
    return 0.0

def score_account(company_id: str, signals: list[dict]) -> dict:
    total_score = 0
    signal_breakdown = []

    for signal in signals:
        base = SIGNAL_WEIGHTS.get(signal["signal_type"], 5)
        days_ago = (
            datetime.now(timezone.utc)
            - datetime.fromisoformat(signal["occurred_at"])
        ).days
        multiplier = get_recency_multiplier(days_ago)
        weighted_score = base * multiplier

        total_score += weighted_score
        signal_breakdown.append({
            "signal_type": signal["signal_type"],
            "base_score": base,
            "recency_multiplier": multiplier,
            "weighted_score": weighted_score
        })

    # Density bonus: multiple signals in a short window
    recent_count = sum(
        1 for s in signals
        if (datetime.now(timezone.utc)
            - datetime.fromisoformat(s["occurred_at"])).days <= 14
    )
    if recent_count >= 3:
        density_bonus = recent_count * 5
        total_score += density_bonus

    return {
        "company_id": company_id,
        "total_score": min(total_score, 100),
        "signal_count": len(signals),
        "breakdown": signal_breakdown
    }

This model is intentionally simple. It works well as a starting point. As you collect data on which scored accounts actually convert, you can refine the weights, add ICP-based multipliers (industry match, employee count range, geography), or replace the additive model with a machine learning classifier. For a deep dive into advanced scoring strategies, see the scoring and balance model guide.

Signal History Lookup

Scoring requires access to previous signals for the same company. Store every normalized signal in your database and query by company_id when a new signal arrives.

def get_signal_history(company_id: str, days: int = 90) -> list[dict]:
    cutoff = datetime.now(timezone.utc) - timedelta(days=days)
    # Replace with your actual database query
    return db.signals.find({
        "company_id": company_id,
        "occurred_at": {"$gte": cutoff.isoformat()}
    })

Every time a new signal comes in, fetch the history, append the new signal, and re-score the account.

Stage 5: Routing

Routing determines where a scored signal goes next. Not every signal deserves the same treatment. A score of 80+ might go straight to a senior AE. A score of 40 might enter a nurture sequence. A score below 20 might simply be logged for future reference.

Rule-Based Routing

def route_signal(scored_account: dict, signal: dict) -> dict:
    score = scored_account["total_score"]
    signal_type = signal["signal_type"]

    if score >= 75:
        return {
            "destination": "sales_team",
            "channel": "slack_urgent",
            "priority": "high",
            "assigned_to": "senior_ae"
        }
    elif score >= 40:
        return {
            "destination": "crm",
            "channel": "hubspot_workflow",
            "priority": "medium",
            "assigned_to": "sdr_team"
        }
    elif signal_type in ("funding_round", "merger_acquisition"):
        # Always surface financial signals even at lower scores
        return {
            "destination": "crm",
            "channel": "hubspot_task",
            "priority": "medium",
            "assigned_to": "sdr_team"
        }
    else:
        return {
            "destination": "database",
            "channel": "log_only",
            "priority": "low",
            "assigned_to": None
        }

You can also route based on geography (EMEA signals go to the EMEA team), industry vertical, or deal size potential derived from the enrichment data.

CRM Integration

For signals routed to your CRM, the Rodz platform offers native integrations. If you use HubSpot, the HubSpot sync guide explains how to push signals directly into deals, contacts and company records without writing custom integration code.

Stage 6: Action

The final stage turns a routed signal into something tangible. This is where your pipeline delivers value.

Slack Notification

const axios = require('axios');

async function sendSlackAlert(signal, score, routing) {
  const blocks = [
    {
      type: 'header',
      text: {
        type: 'plain_text',
        text: `New Signal: ${signal.company_name} (Score: ${score.total_score})`,
      },
    },
    {
      type: 'section',
      text: {
        type: 'mrkdwn',
        text: [
          `*Signal:* ${signal.signal_type}`,
          `*Company:* ${signal.company_name} (${signal.company_domain})`,
          `*Priority:* ${routing.priority}`,
          `*Signals in last 14 days:* ${score.signal_count}`,
          `*Score breakdown:*`,
          ...score.breakdown.map((b) => `  - ${b.signal_type}: ${b.weighted_score.toFixed(1)} pts`),
        ].join('\n'),
      },
    },
  ];

  await axios.post(process.env.SLACK_WEBHOOK_URL, { blocks });
}

CRM Record Update

def update_crm(signal: dict, score: dict, enrichment: dict):
    contacts = enrichment.get("contacts", [])
    company = enrichment.get("company", {})

    crm_payload = {
        "company_name": signal["company_name"],
        "domain": signal["company_domain"],
        "signal_score": score["total_score"],
        "latest_signal": signal["signal_type"],
        "signal_count_90d": score["signal_count"],
        "employee_count": company.get("employee_count"),
        "industry": company.get("industry"),
        "contacts": [
            {
                "name": c.get("full_name"),
                "email": c.get("email"),
                "role": c.get("title")
            }
            for c in contacts[:3]  # Top 3 contacts
        ]
    }

    requests.post(
        f"{CRM_API_URL}/companies/upsert",
        headers={"Authorization": f"Bearer {CRM_API_KEY}"},
        json=crm_payload
    )

Email Sequence Enrollment

For medium-priority signals, you may want to enroll the identified contacts into an automated outreach sequence rather than assigning a manual task.

def enroll_in_sequence(contact: dict, signal: dict, sequence_id: str):
    requests.post(
        f"{OUTREACH_API_URL}/sequences/{sequence_id}/enroll",
        headers={"Authorization": f"Bearer {OUTREACH_API_KEY}"},
        json={
            "email": contact["email"],
            "first_name": contact.get("first_name"),
            "company": signal["company_name"],
            "signal_type": signal["signal_type"],
            "custom_fields": {
                "signal_detail": signal["raw_data"].get("summary", "")
            }
        }
    )

Putting It All Together

Here is the complete pipeline orchestrator that ties every stage together.

def process_signal(raw_payload: dict):
    # Stage 1: Already handled (webhook or poll pushed to queue)

    # Stage 2: Normalize
    signal = process_raw_event(raw_payload)
    if signal is None:
        return  # Duplicate

    # Stage 3: Enrich
    signal = enrich_signal(signal)

    # Stage 4: Score
    history = get_signal_history(signal["company_id"])
    all_signals = list(history) + [signal]
    score = score_account(signal["company_id"], all_signals)

    # Stage 5: Route
    routing = route_signal(score, signal)

    # Stage 6: Act
    if routing["channel"] == "slack_urgent":
        send_slack_alert(signal, score, routing)
        update_crm(signal, score, signal["enrichment"])
    elif routing["channel"] == "hubspot_workflow":
        update_crm(signal, score, signal["enrichment"])
        contacts = signal["enrichment"].get("contacts", [])
        if contacts:
            enroll_in_sequence(contacts[0], signal, "seq_medium_priority")
    else:
        # Log only
        pass

    # Persist the signal for future scoring
    db.signals.insert_one(signal)

In production, each call to process_signal is triggered by a queue consumer reading from the signals:raw Redis stream. This keeps the pipeline resilient. If any stage fails, the message stays in the queue and can be retried.

Production Considerations

Error Handling and Retries

Wrap each stage in try/except blocks and implement exponential backoff for external API calls (Rodz enrichment, CRM updates, Slack notifications). Use dead-letter queues for messages that fail repeatedly so they do not block the pipeline.

Monitoring

Track these metrics at minimum:

  • Ingestion rate: signals received per minute (webhook vs. poll breakdown).
  • Enrichment hit rate: percentage of signals successfully enriched.
  • Score distribution: histogram of scores to verify your weights are producing meaningful differentiation.
  • Action latency: time from signal occurrence to action (Slack message sent, CRM updated).
  • Error rate per stage: which stages fail most and why.

Scaling

Each stage can scale independently. If enrichment becomes a bottleneck because of rate limits, add more workers with a shared rate limiter. If scoring requires more complex computation, move it to a dedicated service. The queue-based architecture makes this straightforward.

Data Retention

Decide how long you keep raw signal data versus normalized data versus scores. A common approach: raw payloads for 30 days, normalized signals for 12 months, scores indefinitely (they are small and useful for trend analysis).

Frequently Asked Questions

How many signal types should I combine in a pipeline?

Start with three to five signal types that are most relevant to your sales process. A strong default combination is funding rounds, job postings, and leadership changes. These three cover financial capacity, growth intent, and organizational change. Add more signal types as you validate which combinations actually predict conversion for your specific market.

Can I run this pipeline without webhooks, using only polling?

Yes, but you lose real-time responsiveness. A polling-only pipeline works well for batch workflows where you process signals once or twice a day. For time-sensitive use cases like outbound sales, webhooks are strongly recommended. The webhook guide covers setup in detail.

How do I handle rate limits when enriching a large batch of signals?

The Rodz API allows 100 requests per minute. Implement a token bucket rate limiter in your enrichment worker. When the bucket is empty, queue the signal and process it when tokens refill. In Python, the ratelimit or aiolimiter libraries make this easy. For full rate limit details, check the API reference.

What database should I use for signal history?

PostgreSQL is a solid default. It handles time-series queries well, supports JSONB for storing raw payloads, and scales to millions of signals without issues. If you are already running MongoDB, that works too. The important thing is that your database supports efficient queries by company_id and occurred_at, since scoring depends on fast history lookups.

How do I test the pipeline locally before deploying?

Use the Rodz API sandbox environment (available in your dashboard under Settings > API Keys) to generate test signals. Point your webhook endpoint at a local server exposed through ngrok or Cloudflare Tunnel. Run each pipeline stage independently with test data before connecting them through the queue. This lets you validate normalization, enrichment and scoring logic in isolation.

How do I know if my scoring weights are correct?

Track conversion outcomes. When a scored account results in a meeting, an opportunity, or a closed deal, record the score it had at the time the signal fired. After a few weeks, analyze which score ranges correlate with actual conversions. Adjust weights accordingly. The scoring guide covers this feedback loop in more detail.

Can I integrate this pipeline with HubSpot or other CRMs?

Absolutely. The action stage is where CRM integration happens. For HubSpot specifically, Rodz offers a native integration path documented in the HubSpot sync guide. For other CRMs, use their respective APIs in the action stage. The pipeline architecture is CRM-agnostic by design.

What happens if a pipeline stage fails?

The queue-based architecture protects you. If enrichment fails for a signal (API timeout, rate limit hit, network error), the message remains in the queue and gets retried with exponential backoff. If a message fails after a configurable number of retries (typically 3-5), it moves to a dead-letter queue for manual inspection. The pipeline continues processing other signals without interruption.

Next Steps

You now have a complete architecture for a multi-signal pipeline. The code examples above are functional starting points, not toy examples. Adapt them to your stack, plug in your signal types, tune your scoring weights, and start routing high-intent accounts to your sales team faster than your competitors can.

If you have not already, explore the full Rodz API documentation for endpoint details, and check out the related guides linked throughout this article to deepen your understanding of each component.

Share:

Generate your outbound strategy for free

Our AI analyzes your company and creates a complete playbook: ICP, personas, email templates, call scripts.

Generate my strategy