Back to feed

sledtools/pika branch #67

pika-git-mirror-1

Decouple mirror sync from poll loop

Target branch: master

Merge Commit: 96c9bf527e28288b0409c876d498e569c92fb0a9

branch: merged tutorial: ready ci: success
Open CI Details

Continuous Integration

CI: success

Compact status on the review page, with full logs on the CI page.

Open CI Details

Latest run #86 success

10 passed

head a795bcd605a2d1940472568b111dc2963901b792 · queued 2026-03-25 20:56:18 · 10 lane(s)

queued 7s · ran 2m 17s

check-pika-rust · success check-pika-followup · success check-notifications · success check-agent-contracts · success check-rmp · success check-pikachat · success check-pikachat-typescript · success check-apple-host-sanity · success check-pikachat-openclaw-e2e · success check-fixture · success

Summary

This branch decouples the mirror synchronization logic in pika-mirror from its polling loop by introducing an event-driven architecture. Previously, sync operations were tightly coupled to a periodic poll timer. The refactor extracts sync orchestration into a dedicated SyncManager (in sync.rs) that reacts to SyncRequest events from multiple sources—webhook-triggered pushes, periodic polls, and manual triggers—via a Tokio mpsc channel. A new Notifier abstraction (notifier.rs) replaces inline notification logic to handle Slack and other downstream alerts. The git.rs module gains structured error types, richer ref-diffing helpers, and a RepoState snapshot type. The webhook handler is rewritten to simply enqueue SyncRequest messages rather than running sync inline. Configuration is extended with per-repo sync-policy knobs. Metrics are reorganized around the new event model. The result is a cleaner separation of concerns: ingress (webhooks/poll), orchestration (SyncManager), execution (git operations), and observability (metrics/notifier) are each in their own module with well-defined interfaces.

Tutorial Steps

Extend configuration with per-repo sync policy

Intent: Add configuration fields that let operators control how each mirror repository is synced—whether via webhook, poll, or both—and at what interval, so the new event-driven sync manager can respect per-repo policies.

Affected files: pika-mirror/src/config.rs

Evidence
@@ -1,6 +1,7 @@
+use serde::Deserialize;
+use std::collections::HashMap;
@@ -12,8 +13,27 @@ pub struct Config {
+    pub repos: HashMap<String, RepoConfig>,
+    pub default_sync_policy: SyncPolicy,
@@ +20,15 @@
+#[derive(Debug, Clone, Deserialize)]
+pub struct RepoConfig {
+    pub url: String,
+    pub sync_policy: Option<SyncPolicy>,
+    pub poll_interval_secs: Option<u64>,
+}
+
+#[derive(Debug, Clone, Deserialize, PartialEq)]
+#[serde(rename_all = "snake_case")]
+pub enum SyncPolicy {
+    WebhookOnly,
+    PollOnly,
+    Both,
+}

The Config struct is extended with a repos: HashMap<String, RepoConfig> map and a default_sync_policy field. Each RepoConfig carries an optional sync_policy override and an optional poll_interval_secs.

A new SyncPolicy enum (WebhookOnly, PollOnly, Both) is introduced with serde snake_case deserialization so operators can set policies in TOML or YAML config files.

This lets the downstream SyncManager decide at runtime whether a given repo should react to webhook events, poll ticks, or both, without hard-coding the decision.

Add structured error types and RepoState to git module

Intent: Replace ad-hoc string errors in git operations with a structured `GitError` enum and add a `RepoState` snapshot type that captures the current HEAD and ref list, enabling the sync manager to make diff-based decisions about what changed.

Affected files: pika-mirror/src/git.rs

Evidence
@@ -1,10 +1,25 @@
+use thiserror::Error;
+
+#[derive(Debug, Error)]
+pub enum GitError {
+    #[error("fetch failed for {remote}: {source}")]
+    FetchFailed { remote: String, source: std::io::Error },
+    #[error("push mirror failed: {0}")]
+    PushFailed(String),
+    #[error("ref parse error: {0}")]
+    RefParse(String),
+}
@@ +30,20 @@
+#[derive(Debug, Clone)]
+pub struct RepoState {
+    pub head_commit: String,
+    pub refs: Vec<RefInfo>,
+}
+
+#[derive(Debug, Clone)]
+pub struct RefInfo {
+    pub name: String,
+    pub commit: String,
+}
@@ +55,35 @@
+pub fn diff_refs(old: &RepoState, new: &RepoState) -> Vec<RefDelta> {
@@ +100,20 @@
+pub fn snapshot_repo(path: &std::path::Path) -> Result<RepoState, GitError> {

The git.rs module is significantly enriched:

  1. GitError enum — Uses thiserror to provide variants like FetchFailed, PushFailed, and RefParse, replacing anyhow string errors for git-specific operations. This gives callers pattern-matchable errors.

  2. RepoState / RefInfo — A snapshot of a repo's HEAD commit and all refs at a point in time. The sync manager takes snapshots before and after fetch to determine what actually changed.

  3. diff_refs(old, new) -> Vec<RefDelta> — Computes the set of refs that were added, removed, or updated between two snapshots. This drives selective push-mirroring (only push refs that changed) and richer notification messages.

  4. snapshot_repo(path) — Shells out to git for-each-ref and git rev-parse HEAD to build a RepoState, returning GitError on failure.

These primitives let the sync layer reason about what changed rather than blindly pushing everything on every tick.

Introduce the SyncManager and SyncRequest event model

Intent: Create a centralized sync orchestrator that receives typed SyncRequest events over a channel and dispatches per-repo sync jobs, replacing the previous approach where the poll loop and webhook handler each ran sync logic inline.

Affected files: pika-mirror/src/sync.rs

Evidence
@@ +1,45 @@
+use tokio::sync::mpsc;
+use crate::config::{Config, SyncPolicy};
+use crate::git::{self, RepoState, GitError};
+use crate::metrics::Metrics;
+use crate::notifier::Notifier;
+
+#[derive(Debug, Clone)]
+pub enum SyncTrigger {
+    Webhook { repo: String, delivery_id: String },
+    Poll { repo: String },
+    Manual { repo: String },
+}
+
+#[derive(Debug)]
+pub struct SyncRequest {
+    pub trigger: SyncTrigger,
+}
@@ +50,30 @@
+pub struct SyncManager {
+    rx: mpsc::Receiver<SyncRequest>,
+    config: Config,
+    metrics: Metrics,
+    notifier: Notifier,
+}
+
+impl SyncManager {
+    pub fn new(
+        rx: mpsc::Receiver<SyncRequest>,
+        config: Config,
+        metrics: Metrics,
+        notifier: Notifier,
+    ) -> Self {
@@ +85,60 @@
+    pub async fn run(mut self) {
+        while let Some(req) = self.rx.recv().await {
+            let repo_name = req.trigger.repo_name();
+            if !self.should_handle(&req) {
+                tracing::debug!(repo=%repo_name, trigger=?req.trigger, "skipping per sync policy");
+                continue;
+            }
+            self.handle_sync(req).await;
+        }
+    }
@@ +150,80 @@
+    async fn handle_sync(&self, req: SyncRequest) {
+        let repo_name = req.trigger.repo_name();
+        let _timer = self.metrics.sync_duration(repo_name);
+        
+        let before = match git::snapshot_repo(&self.repo_path(repo_name)) {
+            Ok(s) => s,
+            Err(e) => {
+                tracing::error!(repo=%repo_name, error=%e, "pre-sync snapshot failed");
+                self.metrics.sync_error(repo_name);
+                return;
+            }
+        };
+        
+        if let Err(e) = git::fetch_and_push(&self.repo_path(repo_name)).await {
+            self.metrics.sync_error(repo_name);
+            self.notifier.send_error(repo_name, &e).await;
+            return;
+        }
+        
+        let after = git::snapshot_repo(&self.repo_path(repo_name)).unwrap_or(before.clone());
+        let deltas = git::diff_refs(&before, &after);
+        
+        self.metrics.sync_success(repo_name);
+        if !deltas.is_empty() {
+            self.notifier.send_sync_complete(repo_name, &deltas).await;
+        }
+    }

The sync.rs module is the core of this branch. It introduces:

SyncTrigger / SyncRequest

A SyncTrigger enum distinguishes why a sync is happening: Webhook (includes delivery ID for tracing), Poll, or Manual. SyncRequest wraps a trigger and flows through an mpsc::Receiver.

SyncManager

The SyncManager struct owns:

  • An mpsc::Receiver<SyncRequest> — the single ingress point for all sync events
  • Config — to look up per-repo sync policies
  • Metrics — to record durations and outcomes
  • Notifier — to send downstream alerts

Its run() loop is a straightforward while let Some(req) = rx.recv().await that:

  1. Checks should_handle(&req) against the repo's SyncPolicy (e.g., a Poll trigger is rejected if the repo is WebhookOnly).
  2. Calls handle_sync(req) which snapshots the repo before and after git::fetch_and_push, diffs the refs, records metrics, and notifies on changes.

Deduplication and concurrency control

Further down in the diff, the SyncManager maintains a HashSet<String> of in-flight repos. If a sync request arrives for a repo that is already being synced, it is coalesced (logged and dropped). This prevents webhook bursts from spawning redundant git operations.

Policy filtering via should_handle

The private should_handle method cross-references the trigger type with the repo's SyncPolicy (falling back to Config::default_sync_policy). This is where the per-repo config from step 1 is enforced.

Extract notification logic into a Notifier abstraction

Intent: Move Slack and logging notification code out of the sync path into a dedicated Notifier struct, so the sync manager only calls high-level methods like send_sync_complete and send_error without knowing delivery details.

Affected files: pika-mirror/src/notifier.rs

Evidence
@@ +1,30 @@
+use crate::git::RefDelta;
+
+#[derive(Debug, Clone)]
+pub struct Notifier {
+    slack_webhook_url: Option<String>,
+    client: reqwest::Client,
+}
@@ +35,25 @@
+    pub async fn send_sync_complete(&self, repo: &str, deltas: &[RefDelta]) {
+        let msg = Self::format_sync_message(repo, deltas);
+        tracing::info!(repo=%repo, refs_changed=%deltas.len(), "sync complete");
+        if let Some(url) = &self.slack_webhook_url {
+            self.post_slack(url, &msg).await;
+        }
+    }
@@ +65,20 @@
+    pub async fn send_error(&self, repo: &str, error: &crate::git::GitError) {
+        tracing::error!(repo=%repo, error=%error, "sync error");
+        if let Some(url) = &self.slack_webhook_url {
+            let msg = format!(":x: Mirror sync failed for `{}`: {}", repo, error);
+            self.post_slack(url, &msg).await;
+        }
+    }
@@ +90,30 @@
+    fn format_sync_message(repo: &str, deltas: &[RefDelta]) -> String {
+        let mut lines = vec![format!(":white_check_mark: `{}` synced", repo)];
+        for d in deltas.iter().take(10) {
+            lines.push(format!("  {} `{}` {} -> {}", d.kind, d.ref_name, &d.old_commit[..8], &d.new_commit[..8]));
+        }

The Notifier struct encapsulates all downstream alerting:

  • send_sync_complete(repo, deltas) — Logs at info level and optionally posts a Slack message listing up to 10 changed refs with their old/new commit short-hashes.
  • send_error(repo, error) — Logs at error level and posts a Slack failure message.
  • format_sync_message — Private helper that builds a human-readable summary with emoji status indicators and ref delta details.
  • post_slack — Wraps reqwest::Client::post with error handling and timeout.

This keeps the SyncManager free of HTTP/formatting concerns and makes it straightforward to add other notification channels (PagerDuty, email, etc.) by extending Notifier rather than modifying sync logic.

Rewrite webhook handler to enqueue SyncRequests

Intent: Simplify the webhook HTTP handler so it only validates the incoming payload and sends a SyncRequest into the channel, rather than performing synchronization inline. This eliminates blocking git operations inside HTTP request handlers.

Affected files: pika-mirror/src/webhook.rs

Evidence
@@ -1,15 +1,20 @@
+use axum::{extract::State, Json};
+use tokio::sync::mpsc;
+use crate::sync::{SyncRequest, SyncTrigger};
@@ -20,40 +25,20 @@
+pub struct WebhookState {
+    pub tx: mpsc::Sender<SyncRequest>,
+}
+
+pub async fn handle_push(
+    State(state): State<WebhookState>,
+    Json(payload): Json<PushPayload>,
+) -> axum::http::StatusCode {
+    let repo = payload.repository.full_name.clone();
+    let delivery_id = payload.delivery_id.clone().unwrap_or_default();
+    
+    let req = SyncRequest {
+        trigger: SyncTrigger::Webhook { repo, delivery_id },
+    };
+    
+    match state.tx.send(req).await {
+        Ok(_) => axum::http::StatusCode::ACCEPTED,
+        Err(_) => axum::http::StatusCode::SERVICE_UNAVAILABLE,
+    }
+}

The webhook handler undergoes a significant simplification:

Before: The handle_push function called git fetch/push directly, held locks, and could block the HTTP thread pool during long sync operations.

After: The handler:

  1. Extracts the repo name and delivery ID from the push payload.
  2. Constructs a SyncRequest with a SyncTrigger::Webhook variant.
  3. Sends it into the mpsc::Sender<SyncRequest> held in WebhookState.
  4. Returns 202 Accepted on success or 503 Service Unavailable if the channel is full/closed.

The actual sync work happens asynchronously in the SyncManager's run loop. This means webhook responses are near-instant regardless of repo size, and the HTTP server can handle bursts of webhook deliveries without spawning concurrent git processes.

Reorganize metrics around the event-driven model

Intent: Update the Metrics struct to track sync operations by trigger type and outcome rather than just poll-cycle counters, aligning observability with the new SyncManager architecture.

Affected files: pika-mirror/src/metrics.rs

Evidence
@@ -1,8 +1,15 @@
+use prometheus::{IntCounterVec, HistogramVec, Registry, Opts, HistogramOpts};
@@ +20,30 @@
+pub struct Metrics {
+    sync_total: IntCounterVec,
+    sync_errors: IntCounterVec,
+    sync_duration_seconds: HistogramVec,
+}
+
+impl Metrics {
+    pub fn new(registry: &Registry) -> Self {
+        let sync_total = IntCounterVec::new(
+            Opts::new("pika_mirror_sync_total", "Total sync operations"),
+            &["repo", "trigger"],
+        ).unwrap();
@@ +55,15 @@
+    pub fn sync_duration(&self, repo: &str) -> HistogramTimer {
+        self.sync_duration_seconds
+            .with_label_values(&[repo])
+            .start_timer()
+    }
+
+    pub fn sync_success(&self, repo: &str) {
+        self.sync_total.with_label_values(&[repo, "success"]).inc();
+    }
+
+    pub fn sync_error(&self, repo: &str) {
+        self.sync_errors.with_label_values(&[repo]).inc();
+    }

The metrics module is rebuilt with Prometheus vector types that carry repo and trigger labels:

  • pika_mirror_sync_total (IntCounterVec) — Counts every sync attempt, labeled by repo and trigger (webhook/poll/manual). This replaces the old flat counter.
  • pika_mirror_sync_errors (IntCounterVec) — Counts failures per repo.
  • pika_mirror_sync_duration_seconds (HistogramVec) — Records sync wall-clock time per repo using Prometheus histogram buckets.

The sync_duration method returns a HistogramTimer that automatically observes elapsed time when dropped—used by SyncManager::handle_sync via a let _timer = self.metrics.sync_duration(repo) guard pattern.

This gives operators the ability to build Grafana dashboards that break down sync latency and error rates by repository and trigger source.

Wire everything together in main.rs and update lib.rs exports

Intent: Update the application entry point to construct the mpsc channel, spawn the SyncManager task, pass the sender to both the webhook server and the poll loop, and update module exports in lib.rs.

Affected files: pika-mirror/src/main.rs, pika-mirror/src/lib.rs

Evidence
@@ -5,10 +5,15 @@
+mod notifier;
 mod sync;
+
+use sync::{SyncManager, SyncRequest, SyncTrigger};
+use notifier::Notifier;
@@ -20,15 +25,30 @@
+    let (tx, rx) = tokio::sync::mpsc::channel::<SyncRequest>(256);
+
+    let notifier = Notifier::new(config.slack_webhook_url.clone());
+    let metrics = Metrics::new(&prometheus_registry);
+
+    let sync_manager = SyncManager::new(rx, config.clone(), metrics.clone(), notifier);
+    tokio::spawn(async move { sync_manager.run().await });
+
+    let webhook_state = webhook::WebhookState { tx: tx.clone() };
+    // ... axum router setup with webhook_state ...
@@ -40,20 +55,25 @@
+    // Poll loop sends SyncRequests instead of calling sync directly
+    let poll_tx = tx.clone();
+    tokio::spawn(async move {
+        let mut interval = tokio::time::interval(poll_interval);
+        loop {
+            interval.tick().await;
+            for repo_name in repo_names.iter() {
+                let _ = poll_tx.send(SyncRequest {
+                    trigger: SyncTrigger::Poll { repo: repo_name.clone() },
+                }).await;
+            }
+        }
+    });
@@ lib.rs +1,6 @@
+pub mod config;
+pub mod git;
+pub mod metrics;
+pub mod notifier;
+pub mod sync;
+pub mod webhook;

The main.rs entry point orchestrates the new architecture:

  1. Channel creationtokio::sync::mpsc::channel::<SyncRequest>(256) creates a bounded channel with a 256-slot buffer, providing backpressure if sync can't keep up with events.

  2. SyncManager spawn — The SyncManager is constructed with the receiver half and spawned as a dedicated Tokio task via tokio::spawn.

  3. Webhook wiring — The sender half (tx) is cloned into WebhookState and passed to the Axum router, so incoming webhook POST requests enqueue SyncRequests.

  4. Poll loop refactor — The periodic poll loop no longer calls git::fetch_and_push directly. Instead, it sends SyncRequest { trigger: SyncTrigger::Poll { repo } } for each configured repo on every tick. The SyncManager handles the rest, including policy filtering (skipping poll triggers for WebhookOnly repos).

  5. lib.rs exports — All modules (config, git, metrics, notifier, sync, webhook) are re-exported as public, making them available for integration tests.

The result is a single-writer architecture where all sync decisions flow through one SyncManager task, eliminating race conditions between webhook-triggered and poll-triggered syncs that could previously cause concurrent git operations on the same repo.

Diff