Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import PackageDescription

let tag = "v0.7.0-rc.27"
let checksum = "c18449b57c5da535b56d1575505c26d68ecca4440c41db9edc23522747742034"
let checksum = "3b26fa722dbabc615ff61360c67974d9b07f8789fafc2871315557356b5439db"
let url = "https://github.com/synonymdev/ldk-node/releases/download/\(tag)/LDKNodeFFI.xcframework.zip"

let package = Package(
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
19 changes: 12 additions & 7 deletions crates/bdk-wallet-aggregate/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,15 +496,20 @@ where

// ─── Sync ───────────────────────────────────────────────────────────

/// Build sync requests for a specific wallet.
#[allow(clippy::type_complexity)]
pub fn wallet_sync_request(
/// Build a full scan request for a specific wallet.
pub fn wallet_full_scan_request(
&self, key: &K,
) -> Result<(FullScanRequest<KeychainKind>, SyncRequest<(KeychainKind, u32)>), Error> {
) -> Result<FullScanRequest<KeychainKind>, Error> {
let wallet = self.wallets.get(key).ok_or(Error::WalletNotFound)?;
let full_scan = wallet.start_full_scan().build();
let incremental_sync = wallet.start_sync_with_revealed_spks().build();
Ok((full_scan, incremental_sync))
Ok(wallet.start_full_scan().build())
}

/// Build an incremental sync request for a specific wallet.
pub fn wallet_incremental_sync_request(
&self, key: &K,
) -> Result<SyncRequest<(KeychainKind, u32)>, Error> {
let wallet = self.wallets.get(key).ok_or(Error::WalletNotFound)?;
Ok(wallet.start_sync_with_revealed_spks().build())
}

/// Apply a chain update to the primary wallet.
Expand Down
210 changes: 130 additions & 80 deletions src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,113 +141,163 @@ impl ElectrumChainSource {
);
return Err(Error::FeerateEstimationUpdateFailed);
};
// If this is our first sync, do a full scan with the configured gap limit.
// Otherwise just do an incremental sync.
let incremental_sync =
self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();

let apply_wallet_update =
|update_res: Result<BdkUpdate, Error>, now: Instant| match update_res {
Ok(update) => match onchain_wallet.apply_update(update) {
Ok(wallet_events) => {
log_info!(
self.logger,
"{} of on-chain wallet finished in {}ms.",
if incremental_sync { "Incremental sync" } else { "Sync" },
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
{
let mut locked_node_metrics = self.node_metrics.write().unwrap();
locked_node_metrics.latest_onchain_wallet_sync_timestamp =
unix_time_secs_opt;
write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
)?;
}
Ok(wallet_events)
},
Err(e) => Err(e),
},
Err(e) => Err(e),
};

let cached_txs = onchain_wallet.get_cached_txs();

let primary_result = if incremental_sync {
let incremental_sync_request = onchain_wallet.get_incremental_sync_request();
let incremental_sync_fut = electrum_client
.get_incremental_sync_wallet_update(incremental_sync_request, cached_txs.clone());

let now = Instant::now();
let update_res = incremental_sync_fut.await.map(|u| u.into());
apply_wallet_update(update_res, now)
} else {
let full_scan_request = onchain_wallet.get_full_scan_request();
let full_scan_fut =
electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs.clone());
let now = Instant::now();
let update_res = full_scan_fut.await.map(|u| u.into());
apply_wallet_update(update_res, now)
};

let (mut all_events, primary_error) = match primary_result {
Ok(events) => (events, None),
Err(e) => (Vec::new(), Some(e)),
};
let primary_incremental =
self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some();

let additional_types =
self.address_type_runtime_config.read().unwrap().additional_address_types();
let sync_requests = super::collect_additional_sync_requests(
let additional_sync_requests = super::collect_additional_sync_requests(
&additional_types,
&onchain_wallet,
&self.node_metrics,
&self.logger,
);

let mut join_set = tokio::task::JoinSet::new();
for (address_type, full_scan_req, incremental_req, do_incremental) in sync_requests {
let primary_request: super::WalletSyncRequest = if primary_incremental {
super::WalletSyncRequest::Incremental(onchain_wallet.get_incremental_sync_request())
} else {
super::WalletSyncRequest::FullScan(onchain_wallet.get_full_scan_request())
};

// Collect cached transactions once and share via Arc to avoid cloning
// the entire Vec for each spawned task.
let cached_txs = Arc::new(onchain_wallet.get_cached_txs());

// Primary wallet is identified by address_type = None in the JoinSet results.
let now = Instant::now();
let mut join_set: tokio::task::JoinSet<(
Option<crate::config::AddressType>,
Result<BdkUpdate, Error>,
)> = tokio::task::JoinSet::new();

{
let client = Arc::clone(&electrum_client);
let txs = cached_txs.clone();
join_set.spawn(async move {
let result: Result<BdkUpdate, Error> = if do_incremental {
client
.get_incremental_sync_wallet_update(incremental_req, txs)
.await
.map(|u| u.into())
} else {
client.get_full_scan_wallet_update(full_scan_req, txs).await.map(|u| u.into())
};
(address_type, result)
});
let txs = Arc::clone(&cached_txs);
match primary_request {
super::WalletSyncRequest::Incremental(req) => {
join_set.spawn(async move {
let result: Result<BdkUpdate, Error> = client
.get_incremental_sync_wallet_update(req, txs.iter().cloned())
.await
.map(|u| u.into());
(None, result)
});
},
super::WalletSyncRequest::FullScan(req) => {
join_set.spawn(async move {
let result: Result<BdkUpdate, Error> = client
.get_full_scan_wallet_update(req, txs.iter().cloned())
.await
.map(|u| u.into());
(None, result)
});
},
}
}

let mut sync_results = Vec::new();
for (address_type, sync_req) in additional_sync_requests {
let client = Arc::clone(&electrum_client);
let txs = Arc::clone(&cached_txs);
match sync_req {
super::WalletSyncRequest::Incremental(req) => {
join_set.spawn(async move {
let result: Result<BdkUpdate, Error> = client
.get_incremental_sync_wallet_update(req, txs.iter().cloned())
.await
.map(|u| u.into());
(Some(address_type), result)
});
},
super::WalletSyncRequest::FullScan(req) => {
join_set.spawn(async move {
let result: Result<BdkUpdate, Error> = client
.get_full_scan_wallet_update(req, txs.iter().cloned())
.await
.map(|u| u.into());
(Some(address_type), result)
});
},
}
}

let mut primary_update: Option<BdkUpdate> = None;
let mut primary_error: Option<Error> = None;
let mut additional_results = Vec::new();

while let Some(join_result) = join_set.join_next().await {
match join_result {
Ok((address_type, Ok(update))) => {
sync_results.push((address_type, Some(update)));
Ok((None, Ok(update))) => {
primary_update = Some(update);
},
Ok((None, Err(e))) => {
primary_error = Some(e);
},
Ok((Some(address_type), Ok(update))) => {
additional_results.push((address_type, Some(update)));
},
Ok((address_type, Err(e))) => {
Ok((Some(address_type), Err(e))) => {
log_warn!(self.logger, "Failed to sync wallet {:?}: {}", address_type, e);
sync_results.push((address_type, None));
additional_results.push((address_type, None));
},
Err(e) => {
log_warn!(self.logger, "Wallet sync task panicked: {}", e);
},
};
}

all_events.extend(super::apply_additional_sync_results(
sync_results,
let mut all_events = Vec::new();

if primary_update.is_none() && primary_error.is_none() {
log_error!(self.logger, "Primary wallet sync task failed unexpectedly");
primary_error = Some(Error::WalletOperationFailed);
}

if let Some(update) = primary_update {
match onchain_wallet.apply_update(update) {
Ok(wallet_events) => {
log_info!(
self.logger,
"{} of primary on-chain wallet finished in {}ms.",
if primary_incremental { "Incremental sync" } else { "Full sync" },
now.elapsed().as_millis()
);
let unix_time_secs_opt =
SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs());
self.node_metrics.write().unwrap().latest_onchain_wallet_sync_timestamp =
unix_time_secs_opt;
all_events.extend(wallet_events);
},
Err(e) => {
primary_error = Some(e);
},
}
}

let (additional_events, any_additional_applied) = super::apply_additional_sync_results(
additional_results,
&onchain_wallet,
&self.node_metrics,
&self.kv_store,
&self.logger,
));
);
all_events.extend(additional_events);

let any_updates_applied = primary_error.is_none() || any_additional_applied;

if any_updates_applied {
if let Err(e) = onchain_wallet.update_payment_store_for_all_transactions() {
log_error!(self.logger, "Failed to update payment store after wallet syncs: {}", e);
}

let locked_node_metrics = self.node_metrics.read().unwrap();
if let Err(e) = write_node_metrics(
&*locked_node_metrics,
Arc::clone(&self.kv_store),
Arc::clone(&self.logger),
) {
log_error!(self.logger, "Failed to persist node metrics: {}", e);
}
}

if let Some(e) = primary_error {
return Err(e);
Expand Down
Loading
Loading