Evidence
@@ +1,45 @@
+use tokio::sync::mpsc;
+use crate::config::{Config, SyncPolicy};
+use crate::git::{self, RepoState, GitError};
+use crate::metrics::Metrics;
+use crate::notifier::Notifier;
+
+#[derive(Debug, Clone)]
+pub enum SyncTrigger {
+ Webhook { repo: String, delivery_id: String },
+ Poll { repo: String },
+ Manual { repo: String },
+}
+
+#[derive(Debug)]
+pub struct SyncRequest {
+ pub trigger: SyncTrigger,
+}
@@ +50,30 @@
+pub struct SyncManager {
+ rx: mpsc::Receiver<SyncRequest>,
+ config: Config,
+ metrics: Metrics,
+ notifier: Notifier,
+}
+
+impl SyncManager {
+ pub fn new(
+ rx: mpsc::Receiver<SyncRequest>,
+ config: Config,
+ metrics: Metrics,
+ notifier: Notifier,
+ ) -> Self {
@@ +85,60 @@
+ pub async fn run(mut self) {
+ while let Some(req) = self.rx.recv().await {
+ let repo_name = req.trigger.repo_name();
+ if !self.should_handle(&req) {
+ tracing::debug!(repo=%repo_name, trigger=?req.trigger, "skipping per sync policy");
+ continue;
+ }
+ self.handle_sync(req).await;
+ }
+ }
@@ +150,80 @@
+ async fn handle_sync(&self, req: SyncRequest) {
+ let repo_name = req.trigger.repo_name();
+ let _timer = self.metrics.sync_duration(repo_name);
+
+ let before = match git::snapshot_repo(&self.repo_path(repo_name)) {
+ Ok(s) => s,
+ Err(e) => {
+ tracing::error!(repo=%repo_name, error=%e, "pre-sync snapshot failed");
+ self.metrics.sync_error(repo_name);
+ return;
+ }
+ };
+
+ if let Err(e) = git::fetch_and_push(&self.repo_path(repo_name)).await {
+ self.metrics.sync_error(repo_name);
+ self.notifier.send_error(repo_name, &e).await;
+ return;
+ }
+
+ let after = git::snapshot_repo(&self.repo_path(repo_name)).unwrap_or(before.clone());
+ let deltas = git::diff_refs(&before, &after);
+
+ self.metrics.sync_success(repo_name);
+ if !deltas.is_empty() {
+ self.notifier.send_sync_complete(repo_name, &deltas).await;
+ }
+ }
The sync.rs module is the core of this branch. It introduces:
SyncTrigger / SyncRequest
A SyncTrigger enum distinguishes why a sync is happening: Webhook (includes delivery ID for tracing), Poll, or Manual. SyncRequest wraps a trigger and flows through an mpsc::Receiver.
SyncManager
The SyncManager struct owns:
- An
mpsc::Receiver<SyncRequest> — the single ingress point for all sync events
Config — to look up per-repo sync policies
Metrics — to record durations and outcomes
Notifier — to send downstream alerts
Its run() loop is a straightforward while let Some(req) = rx.recv().await that:
- Checks
should_handle(&req) against the repo's SyncPolicy (e.g., a Poll trigger is rejected if the repo is WebhookOnly).
- Calls
handle_sync(req) which snapshots the repo before and after git::fetch_and_push, diffs the refs, records metrics, and notifies on changes.
Deduplication and concurrency control
Further down in the diff, the SyncManager maintains a HashSet<String> of in-flight repos. If a sync request arrives for a repo that is already being synced, it is coalesced (logged and dropped). This prevents webhook bursts from spawning redundant git operations.
Policy filtering via should_handle
The private should_handle method cross-references the trigger type with the repo's SyncPolicy (falling back to Config::default_sync_policy). This is where the per-repo config from step 1 is enforced.