diff --git a/Package.swift b/Package.swift index 8af5d2404..76853492d 100644 --- a/Package.swift +++ b/Package.swift @@ -4,7 +4,7 @@ import PackageDescription let tag = "v0.7.0-rc.27" -let checksum = "c18449b57c5da535b56d1575505c26d68ecca4440c41db9edc23522747742034" +let checksum = "3b26fa722dbabc615ff61360c67974d9b07f8789fafc2871315557356b5439db" let url = "https://github.com/synonymdev/ldk-node/releases/download/\(tag)/LDKNodeFFI.xcframework.zip" let package = Package( diff --git a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/arm64-v8a/libldk_node.so b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/arm64-v8a/libldk_node.so index c9e9c0279..ef81f0a6e 100755 Binary files a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/arm64-v8a/libldk_node.so and b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/arm64-v8a/libldk_node.so differ diff --git a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/armeabi-v7a/libldk_node.so b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/armeabi-v7a/libldk_node.so index 48bb1822f..422c96e7c 100755 Binary files a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/armeabi-v7a/libldk_node.so and b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/armeabi-v7a/libldk_node.so differ diff --git a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/x86_64/libldk_node.so b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/x86_64/libldk_node.so index bbc20b5a6..bda2ed04e 100755 Binary files a/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/x86_64/libldk_node.so and b/bindings/kotlin/ldk-node-android/lib/src/main/jniLibs/x86_64/libldk_node.so differ diff --git a/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-aarch64/libldk_node.dylib b/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-aarch64/libldk_node.dylib index 54fc1a5a7..fa1401240 100644 Binary files a/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-aarch64/libldk_node.dylib and b/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-aarch64/libldk_node.dylib differ diff --git a/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-x86-64/libldk_node.dylib b/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-x86-64/libldk_node.dylib index 1d7e300f4..a0f8ca631 100644 Binary files a/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-x86-64/libldk_node.dylib and b/bindings/kotlin/ldk-node-jvm/lib/src/main/resources/darwin-x86-64/libldk_node.dylib differ diff --git a/crates/bdk-wallet-aggregate/src/lib.rs b/crates/bdk-wallet-aggregate/src/lib.rs index c45d89178..5d6e2d110 100644 --- a/crates/bdk-wallet-aggregate/src/lib.rs +++ b/crates/bdk-wallet-aggregate/src/lib.rs @@ -496,15 +496,20 @@ where // ─── Sync ─────────────────────────────────────────────────────────── - /// Build sync requests for a specific wallet. - #[allow(clippy::type_complexity)] - pub fn wallet_sync_request( + /// Build a full scan request for a specific wallet. + pub fn wallet_full_scan_request( &self, key: &K, - ) -> Result<(FullScanRequest, SyncRequest<(KeychainKind, u32)>), Error> { + ) -> Result, Error> { let wallet = self.wallets.get(key).ok_or(Error::WalletNotFound)?; - let full_scan = wallet.start_full_scan().build(); - let incremental_sync = wallet.start_sync_with_revealed_spks().build(); - Ok((full_scan, incremental_sync)) + Ok(wallet.start_full_scan().build()) + } + + /// Build an incremental sync request for a specific wallet. + pub fn wallet_incremental_sync_request( + &self, key: &K, + ) -> Result, Error> { + let wallet = self.wallets.get(key).ok_or(Error::WalletNotFound)?; + Ok(wallet.start_sync_with_revealed_spks().build()) } /// Apply a chain update to the primary wallet. diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index eb26f47b0..2db971676 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -141,99 +141,104 @@ impl ElectrumChainSource { ); return Err(Error::FeerateEstimationUpdateFailed); }; - // If this is our first sync, do a full scan with the configured gap limit. - // Otherwise just do an incremental sync. - let incremental_sync = - self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); - - let apply_wallet_update = - |update_res: Result, now: Instant| match update_res { - Ok(update) => match onchain_wallet.apply_update(update) { - Ok(wallet_events) => { - log_info!( - self.logger, - "{} of on-chain wallet finished in {}ms.", - if incremental_sync { "Incremental sync" } else { "Sync" }, - now.elapsed().as_millis() - ); - let unix_time_secs_opt = - SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); - locked_node_metrics.latest_onchain_wallet_sync_timestamp = - unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger), - )?; - } - Ok(wallet_events) - }, - Err(e) => Err(e), - }, - Err(e) => Err(e), - }; - let cached_txs = onchain_wallet.get_cached_txs(); - - let primary_result = if incremental_sync { - let incremental_sync_request = onchain_wallet.get_incremental_sync_request(); - let incremental_sync_fut = electrum_client - .get_incremental_sync_wallet_update(incremental_sync_request, cached_txs.clone()); - - let now = Instant::now(); - let update_res = incremental_sync_fut.await.map(|u| u.into()); - apply_wallet_update(update_res, now) - } else { - let full_scan_request = onchain_wallet.get_full_scan_request(); - let full_scan_fut = - electrum_client.get_full_scan_wallet_update(full_scan_request, cached_txs.clone()); - let now = Instant::now(); - let update_res = full_scan_fut.await.map(|u| u.into()); - apply_wallet_update(update_res, now) - }; - - let (mut all_events, primary_error) = match primary_result { - Ok(events) => (events, None), - Err(e) => (Vec::new(), Some(e)), - }; + let primary_incremental = + self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); let additional_types = self.address_type_runtime_config.read().unwrap().additional_address_types(); - let sync_requests = super::collect_additional_sync_requests( + let additional_sync_requests = super::collect_additional_sync_requests( &additional_types, &onchain_wallet, &self.node_metrics, &self.logger, ); - let mut join_set = tokio::task::JoinSet::new(); - for (address_type, full_scan_req, incremental_req, do_incremental) in sync_requests { + let primary_request: super::WalletSyncRequest = if primary_incremental { + super::WalletSyncRequest::Incremental(onchain_wallet.get_incremental_sync_request()) + } else { + super::WalletSyncRequest::FullScan(onchain_wallet.get_full_scan_request()) + }; + + // Collect cached transactions once and share via Arc to avoid cloning + // the entire Vec for each spawned task. + let cached_txs = Arc::new(onchain_wallet.get_cached_txs()); + + // Primary wallet is identified by address_type = None in the JoinSet results. + let now = Instant::now(); + let mut join_set: tokio::task::JoinSet<( + Option, + Result, + )> = tokio::task::JoinSet::new(); + + { let client = Arc::clone(&electrum_client); - let txs = cached_txs.clone(); - join_set.spawn(async move { - let result: Result = if do_incremental { - client - .get_incremental_sync_wallet_update(incremental_req, txs) - .await - .map(|u| u.into()) - } else { - client.get_full_scan_wallet_update(full_scan_req, txs).await.map(|u| u.into()) - }; - (address_type, result) - }); + let txs = Arc::clone(&cached_txs); + match primary_request { + super::WalletSyncRequest::Incremental(req) => { + join_set.spawn(async move { + let result: Result = client + .get_incremental_sync_wallet_update(req, txs.iter().cloned()) + .await + .map(|u| u.into()); + (None, result) + }); + }, + super::WalletSyncRequest::FullScan(req) => { + join_set.spawn(async move { + let result: Result = client + .get_full_scan_wallet_update(req, txs.iter().cloned()) + .await + .map(|u| u.into()); + (None, result) + }); + }, + } } - let mut sync_results = Vec::new(); + for (address_type, sync_req) in additional_sync_requests { + let client = Arc::clone(&electrum_client); + let txs = Arc::clone(&cached_txs); + match sync_req { + super::WalletSyncRequest::Incremental(req) => { + join_set.spawn(async move { + let result: Result = client + .get_incremental_sync_wallet_update(req, txs.iter().cloned()) + .await + .map(|u| u.into()); + (Some(address_type), result) + }); + }, + super::WalletSyncRequest::FullScan(req) => { + join_set.spawn(async move { + let result: Result = client + .get_full_scan_wallet_update(req, txs.iter().cloned()) + .await + .map(|u| u.into()); + (Some(address_type), result) + }); + }, + } + } + + let mut primary_update: Option = None; + let mut primary_error: Option = None; + let mut additional_results = Vec::new(); + while let Some(join_result) = join_set.join_next().await { match join_result { - Ok((address_type, Ok(update))) => { - sync_results.push((address_type, Some(update))); + Ok((None, Ok(update))) => { + primary_update = Some(update); + }, + Ok((None, Err(e))) => { + primary_error = Some(e); + }, + Ok((Some(address_type), Ok(update))) => { + additional_results.push((address_type, Some(update))); }, - Ok((address_type, Err(e))) => { + Ok((Some(address_type), Err(e))) => { log_warn!(self.logger, "Failed to sync wallet {:?}: {}", address_type, e); - sync_results.push((address_type, None)); + additional_results.push((address_type, None)); }, Err(e) => { log_warn!(self.logger, "Wallet sync task panicked: {}", e); @@ -241,13 +246,58 @@ impl ElectrumChainSource { }; } - all_events.extend(super::apply_additional_sync_results( - sync_results, + let mut all_events = Vec::new(); + + if primary_update.is_none() && primary_error.is_none() { + log_error!(self.logger, "Primary wallet sync task failed unexpectedly"); + primary_error = Some(Error::WalletOperationFailed); + } + + if let Some(update) = primary_update { + match onchain_wallet.apply_update(update) { + Ok(wallet_events) => { + log_info!( + self.logger, + "{} of primary on-chain wallet finished in {}ms.", + if primary_incremental { "Incremental sync" } else { "Full sync" }, + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + self.node_metrics.write().unwrap().latest_onchain_wallet_sync_timestamp = + unix_time_secs_opt; + all_events.extend(wallet_events); + }, + Err(e) => { + primary_error = Some(e); + }, + } + } + + let (additional_events, any_additional_applied) = super::apply_additional_sync_results( + additional_results, &onchain_wallet, &self.node_metrics, - &self.kv_store, &self.logger, - )); + ); + all_events.extend(additional_events); + + let any_updates_applied = primary_error.is_none() || any_additional_applied; + + if any_updates_applied { + if let Err(e) = onchain_wallet.update_payment_store_for_all_transactions() { + log_error!(self.logger, "Failed to update payment store after wallet syncs: {}", e); + } + + let locked_node_metrics = self.node_metrics.read().unwrap(); + if let Err(e) = write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + ) { + log_error!(self.logger, "Failed to persist node metrics: {}", e); + } + } if let Some(e) = primary_error { return Err(e); diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index af4a2e883..25bdbe0cd 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -115,159 +115,156 @@ impl EsploraChainSource { async fn sync_onchain_wallet_inner( &self, onchain_wallet: Arc, ) -> Result, Error> { - // If this is our first sync, do a full scan with the configured gap limit. - // Otherwise just do an incremental sync. - let incremental_sync = + let primary_incremental = self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); - macro_rules! get_and_apply_wallet_update { - ($sync_future: expr) => {{ - let now = Instant::now(); - match $sync_future.await { - Ok(res) => match res { - Ok(update) => match onchain_wallet.apply_update(update) { - Ok(wallet_events) => { - log_info!( - self.logger, - "{} of on-chain wallet finished in {}ms.", - if incremental_sync { "Incremental sync" } else { "Sync" }, - now.elapsed().as_millis() - ); - let unix_time_secs_opt = SystemTime::now() - .duration_since(UNIX_EPOCH) - .ok() - .map(|d| d.as_secs()); - { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); - locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; - write_node_metrics( - &*locked_node_metrics, - Arc::clone(&self.kv_store), - Arc::clone(&self.logger) - )?; - } - Ok(wallet_events) - }, - Err(e) => Err(e), - }, - Err(e) => match *e { - esplora_client::Error::Reqwest(he) => { - if let Some(status_code) = he.status() { - log_error!( - self.logger, - "{} of on-chain wallet failed due to HTTP {} error: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - status_code, - he, - ); - } else { - log_error!( - self.logger, - "{} of on-chain wallet failed due to HTTP error: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - he, - ); - } - Err(Error::WalletOperationFailed) - }, - _ => { - log_error!( - self.logger, - "{} of on-chain wallet failed due to Esplora error: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - e - ); - Err(Error::WalletOperationFailed) - }, - }, - }, - Err(e) => { - log_error!( - self.logger, - "{} of on-chain wallet timed out: {}", - if incremental_sync { "Incremental sync" } else { "Sync" }, - e - ); - Err(Error::WalletOperationTimeout) - }, - } - }} - } - - let primary_result = if incremental_sync { - let sync_request = onchain_wallet.get_incremental_sync_request(); - let wallet_sync_timeout_fut = tokio::time::timeout( - Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - self.esplora_client.sync(sync_request, BDK_CLIENT_CONCURRENCY), - ); - get_and_apply_wallet_update!(wallet_sync_timeout_fut) - } else { - let full_scan_request = onchain_wallet.get_full_scan_request(); - let wallet_sync_timeout_fut = tokio::time::timeout( - Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - self.esplora_client.full_scan( - full_scan_request, - BDK_CLIENT_STOP_GAP, - BDK_CLIENT_CONCURRENCY, - ), - ); - get_and_apply_wallet_update!(wallet_sync_timeout_fut) - }; - - let (mut all_events, primary_error) = match primary_result { - Ok(events) => (events, None), - Err(e) => (Vec::new(), Some(e)), - }; - let additional_types = self.address_type_runtime_config.read().unwrap().additional_address_types(); - let sync_requests = super::collect_additional_sync_requests( + let additional_sync_requests = super::collect_additional_sync_requests( &additional_types, &onchain_wallet, &self.node_metrics, &self.logger, ); - let mut join_set = tokio::task::JoinSet::new(); - for (address_type, full_scan_req, incremental_req, do_incremental) in sync_requests { - let client = self.esplora_client.clone(); - join_set.spawn(async move { - let result = if do_incremental { - tokio::time::timeout( + let primary_request: super::WalletSyncRequest = if primary_incremental { + super::WalletSyncRequest::Incremental(onchain_wallet.get_incremental_sync_request()) + } else { + super::WalletSyncRequest::FullScan(onchain_wallet.get_full_scan_request()) + }; + + // Primary wallet is identified by address_type = None in the JoinSet results. + let now = Instant::now(); + type EsploraSyncResult = ( + Option, + Result< + Result>, + tokio::time::error::Elapsed, + >, + ); + let mut join_set: tokio::task::JoinSet = tokio::task::JoinSet::new(); + + let client = self.esplora_client.clone(); + match primary_request { + super::WalletSyncRequest::Incremental(req) => { + join_set.spawn(async move { + let result = tokio::time::timeout( Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - client.sync(incremental_req, BDK_CLIENT_CONCURRENCY), + client.sync(req, BDK_CLIENT_CONCURRENCY), ) .await - .map(|r| r.map(|u| bdk_wallet::Update::from(u))) - } else { - tokio::time::timeout( + .map(|r| r.map(|u| bdk_wallet::Update::from(u))); + (None, result) + }); + }, + super::WalletSyncRequest::FullScan(req) => { + join_set.spawn(async move { + let result = tokio::time::timeout( Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), - client.full_scan( - full_scan_req, - BDK_CLIENT_STOP_GAP, - BDK_CLIENT_CONCURRENCY, - ), + client.full_scan(req, BDK_CLIENT_STOP_GAP, BDK_CLIENT_CONCURRENCY), ) .await - .map(|r| r.map(|u| bdk_wallet::Update::from(u))) - }; - (address_type, result) - }); + .map(|r| r.map(|u| bdk_wallet::Update::from(u))); + (None, result) + }); + }, } - let mut sync_results = Vec::new(); + for (address_type, sync_req) in additional_sync_requests { + let client = self.esplora_client.clone(); + match sync_req { + super::WalletSyncRequest::Incremental(req) => { + join_set.spawn(async move { + let result = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + client.sync(req, BDK_CLIENT_CONCURRENCY), + ) + .await + .map(|r| r.map(|u| bdk_wallet::Update::from(u))); + (Some(address_type), result) + }); + }, + super::WalletSyncRequest::FullScan(req) => { + join_set.spawn(async move { + let result = tokio::time::timeout( + Duration::from_secs(BDK_WALLET_SYNC_TIMEOUT_SECS), + client.full_scan(req, BDK_CLIENT_STOP_GAP, BDK_CLIENT_CONCURRENCY), + ) + .await + .map(|r| r.map(|u| bdk_wallet::Update::from(u))); + (Some(address_type), result) + }); + }, + } + } + + let mut primary_update: Option = None; + let mut primary_error: Option = None; + let mut additional_results = Vec::new(); + while let Some(join_result) = join_set.join_next().await { match join_result { - Ok((address_type, Ok(Ok(update)))) => { - sync_results.push((address_type, Some(update))); + Ok((None, Ok(Ok(update)))) => { + primary_update = Some(update); }, - Ok((address_type, Ok(Err(e)))) => { + Ok((None, Ok(Err(e)))) => { + match *e { + esplora_client::Error::Reqwest(ref he) => { + if let Some(status_code) = he.status() { + log_error!( + self.logger, + "{} of primary on-chain wallet failed due to HTTP {} error: {}", + if primary_incremental { + "Incremental sync" + } else { + "Full sync" + }, + status_code, + he, + ); + } else { + log_error!( + self.logger, + "{} of primary on-chain wallet failed due to HTTP error: {}", + if primary_incremental { + "Incremental sync" + } else { + "Full sync" + }, + he, + ); + } + }, + _ => { + log_error!( + self.logger, + "{} of primary on-chain wallet failed due to Esplora error: {}", + if primary_incremental { "Incremental sync" } else { "Full sync" }, + e + ); + }, + } + primary_error = Some(Error::WalletOperationFailed); + }, + Ok((None, Err(e))) => { + log_error!( + self.logger, + "{} of primary on-chain wallet timed out: {}", + if primary_incremental { "Incremental sync" } else { "Full sync" }, + e + ); + primary_error = Some(Error::WalletOperationTimeout); + }, + Ok((Some(address_type), Ok(Ok(update)))) => { + additional_results.push((address_type, Some(update))); + }, + Ok((Some(address_type), Ok(Err(e)))) => { log_warn!(self.logger, "Failed to sync wallet {:?}: {}", address_type, e); - sync_results.push((address_type, None)); + additional_results.push((address_type, None)); }, - Ok((address_type, Err(_))) => { + Ok((Some(address_type), Err(_))) => { log_warn!(self.logger, "Sync timeout for wallet {:?}", address_type); - sync_results.push((address_type, None)); + additional_results.push((address_type, None)); }, Err(e) => { log_warn!(self.logger, "Wallet sync task panicked: {}", e); @@ -275,13 +272,58 @@ impl EsploraChainSource { }; } - all_events.extend(super::apply_additional_sync_results( - sync_results, + let mut all_events = Vec::new(); + + if primary_update.is_none() && primary_error.is_none() { + log_error!(self.logger, "Primary wallet sync task failed unexpectedly"); + primary_error = Some(Error::WalletOperationFailed); + } + + if let Some(update) = primary_update { + match onchain_wallet.apply_update(update) { + Ok(wallet_events) => { + log_info!( + self.logger, + "{} of primary on-chain wallet finished in {}ms.", + if primary_incremental { "Incremental sync" } else { "Full sync" }, + now.elapsed().as_millis() + ); + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + self.node_metrics.write().unwrap().latest_onchain_wallet_sync_timestamp = + unix_time_secs_opt; + all_events.extend(wallet_events); + }, + Err(e) => { + primary_error = Some(e); + }, + } + } + + let (additional_events, any_additional_applied) = super::apply_additional_sync_results( + additional_results, &onchain_wallet, &self.node_metrics, - &self.kv_store, &self.logger, - )); + ); + all_events.extend(additional_events); + + let any_updates_applied = primary_error.is_none() || any_additional_applied; + + if any_updates_applied { + if let Err(e) = onchain_wallet.update_payment_store_for_all_transactions() { + log_error!(self.logger, "Failed to update payment store after wallet syncs: {}", e); + } + + let locked_node_metrics = self.node_metrics.read().unwrap(); + if let Err(e) = write_node_metrics( + &*locked_node_metrics, + Arc::clone(&self.kv_store), + Arc::clone(&self.logger), + ) { + log_error!(self.logger, "Failed to persist node metrics: {}", e); + } + } if let Some(e) = primary_error { return Err(e); diff --git a/src/chain/mod.rs b/src/chain/mod.rs index d7825415b..1de848809 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -38,6 +38,11 @@ use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{check_and_emit_balance_update, Error, NodeMetrics}; +pub(super) enum WalletSyncRequest { + FullScan(FullScanRequest), + Incremental(SyncRequest<(KeychainKind, u32)>), +} + pub(crate) enum WalletSyncStatus { Completed, InProgress { subscribers: tokio::sync::broadcast::Sender> }, @@ -169,17 +174,24 @@ fn get_transaction_details( pub(super) fn collect_additional_sync_requests( additional_types: &[AddressType], onchain_wallet: &Wallet, node_metrics: &Arc>, logger: &Arc, -) -> Vec<(AddressType, FullScanRequest, SyncRequest<(KeychainKind, u32)>, bool)> { +) -> Vec<(AddressType, WalletSyncRequest)> { additional_types .iter() .copied() .filter_map(|address_type| { let do_incremental = node_metrics.read().unwrap().get_wallet_sync_timestamp(address_type).is_some(); - match onchain_wallet.get_wallet_sync_request(address_type) { - Ok((full_scan_req, incremental_req)) => { - Some((address_type, full_scan_req, incremental_req, do_incremental)) - }, + let request = if do_incremental { + onchain_wallet + .get_wallet_incremental_sync_request(address_type) + .map(WalletSyncRequest::Incremental) + } else { + onchain_wallet + .get_wallet_full_scan_request(address_type) + .map(WalletSyncRequest::FullScan) + }; + match request { + Ok(req) => Some((address_type, req)), Err(e) => { log_info!(logger, "Skipping sync for wallet {:?}: {}", address_type, e); None @@ -189,41 +201,34 @@ pub(super) fn collect_additional_sync_requests( .collect() } +/// Returns `(events, any_applied)` where `any_applied` is true if at least one +/// additional wallet update was successfully applied. pub(super) fn apply_additional_sync_results( results: Vec<(AddressType, Option)>, onchain_wallet: &Wallet, - node_metrics: &Arc>, kv_store: &Arc, logger: &Arc, -) -> Vec { + node_metrics: &Arc>, logger: &Arc, +) -> (Vec, bool) { let mut events = Vec::new(); + let mut any_applied = false; for (address_type, update_opt) in results { if let Some(update) = update_opt { - let wallet_events = onchain_wallet - .apply_update_for_address_type(address_type, update) - .unwrap_or_else(|e| { + match onchain_wallet.apply_update_for_address_type(address_type, update) { + Ok(wallet_events) => { + any_applied = true; + if let Some(ts) = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()) + { + node_metrics.write().unwrap().set_wallet_sync_timestamp(address_type, ts); + } + events.extend(wallet_events); + }, + Err(e) => { log_warn!(logger, "Failed to apply update to wallet {:?}: {}", address_type, e); - Vec::new() - }); - if let Some(ts) = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()) - { - node_metrics.write().unwrap().set_wallet_sync_timestamp(address_type, ts); + }, } - events.extend(wallet_events); } } - { - let locked_node_metrics = node_metrics.read().unwrap(); - if let Err(e) = - write_node_metrics(&locked_node_metrics, Arc::clone(kv_store), Arc::clone(logger)) - { - log_error!(logger, "Failed to persist node metrics: {}", e); - } - } - - if let Err(e) = onchain_wallet.update_payment_store_for_all_transactions() { - log_info!(logger, "Failed to update payment store after wallet syncs: {}", e); - } - - events + (events, any_applied) } // Process BDK wallet events and emit corresponding ldk-node events via the event queue. diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 396ebd438..0ca364f90 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -159,13 +159,16 @@ impl Wallet { self.inner.lock().unwrap().start_sync_with_revealed_spks().build() } - pub(crate) fn get_wallet_sync_request( + pub(crate) fn get_wallet_full_scan_request( &self, address_type: AddressType, - ) -> Result< - (FullScanRequest, SyncRequest<(KeychainKind, u32)>), - bdk_wallet_aggregate::Error, - > { - self.inner.lock().unwrap().wallet_sync_request(&address_type) + ) -> Result, bdk_wallet_aggregate::Error> { + self.inner.lock().unwrap().wallet_full_scan_request(&address_type) + } + + pub(crate) fn get_wallet_incremental_sync_request( + &self, address_type: AddressType, + ) -> Result, bdk_wallet_aggregate::Error> { + self.inner.lock().unwrap().wallet_incremental_sync_request(&address_type) } pub(crate) fn get_cached_txs(&self) -> Vec> { @@ -348,19 +351,14 @@ impl Wallet { Ok(()) } + /// Callers are responsible for calling `update_payment_store_for_all_transactions` + /// after all updates have been applied. pub(crate) fn apply_update( &self, update: impl Into, ) -> Result, Error> { let mut locked_wallet = self.inner.lock().unwrap(); match locked_wallet.apply_update(update) { - Ok((events, _txids)) => { - self.update_payment_store(&locked_wallet).map_err(|e| { - log_error!(self.logger, "Failed to update payment store: {}", e); - Error::PersistenceFailed - })?; - - Ok(events) - }, + Ok((events, _txids)) => Ok(events), Err(e) => { log_error!(self.logger, "Sync failed due to chain connection error: {}", e); Err(Error::WalletOperationFailed) @@ -368,18 +366,14 @@ impl Wallet { } } + /// Callers are responsible for calling `update_payment_store_for_all_transactions` + /// after all updates have been applied. pub(crate) fn apply_update_for_address_type( &self, address_type: AddressType, update: impl Into, ) -> Result, Error> { let mut locked_wallet = self.inner.lock().unwrap(); match locked_wallet.apply_update_to_wallet(address_type, update) { - Ok((events, _txids)) => { - self.update_payment_store(&locked_wallet).map_err(|e| { - log_error!(self.logger, "Failed to update payment store: {}", e); - Error::PersistenceFailed - })?; - Ok(events) - }, + Ok((events, _txids)) => Ok(events), Err(e) => { log_error!( self.logger,