diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index 16dbc7662c192..7abc723c4047e 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -249,7 +249,7 @@ impl DiscoveryConfig { NonZeroUsize::new(MAX_KNOWN_EXTERNAL_ADDRESSES) .expect("value is a constant; constant is non-zero; qed."), ), - outbound_query_records: Vec::new(), + records_to_publish: Default::default(), } } } @@ -287,8 +287,12 @@ pub struct DiscoveryBehaviour { allow_non_globals_in_dht: bool, /// A cache of discovered external addresses. Only used for logging purposes. known_external_addresses: LruHashSet, - /// A cache of outbound query records. - outbound_query_records: Vec<(record::Key, Vec)>, + /// Records to publish per QueryId. + /// + /// After finishing a Kademlia query, libp2p will return us a list of the closest peers that + /// did not return the record(in `FinishedWithNoAdditionalRecord`). We will then put the record + /// to these peers. + records_to_publish: HashMap, } impl DiscoveryBehaviour { @@ -692,33 +696,54 @@ impl NetworkBehaviour for DiscoveryBehaviour { KademliaEvent::OutboundQueryProgressed { result: QueryResult::GetRecord(res), stats, - step, + id, .. } => { let ev = match res { - Ok(ok) => - if let GetRecordOk::FoundRecord(r) = ok { - self.outbound_query_records - .push((r.record.key, r.record.value)); + Ok(GetRecordOk::FoundRecord(r)) => { + debug!( + target: "sub-libp2p", + "Libp2p => Found record ({:?}) with value: {:?}", + r.record.key, + r.record.value, + ); + + // Let's directly finish the query, as we are only interested in a + // quorum of 1. + if let Some(kad) = self.kademlia.as_mut() { + if let Some(mut query) = kad.query_mut(&id) { + query.finish(); + } + } + + self.records_to_publish.insert(id, r.record.clone()); + + DiscoveryOut::ValueFound( + vec![(r.record.key, r.record.value)], + stats.duration().unwrap_or_default(), + ) + }, + Ok(GetRecordOk::FinishedWithNoAdditionalRecord { + cache_candidates, + }) => { + if cache_candidates.is_empty() { continue - } else { - debug!( - target: "sub-libp2p", - "Libp2p => Query progressed to {:?} step (last: {:?})", - step.count, - step.last, - ); - if step.last { - let records = - self.outbound_query_records.drain(..).collect(); - DiscoveryOut::ValueFound( - records, - stats.duration().unwrap_or_default(), - ) - } else { - continue + } + + // Put the record to the `cache_candidates` that are nearest to the + // record key from our point of view of the network. + if let Some(record) = self.records_to_publish.remove(&id) { + if let Some(kad) = self.kademlia.as_mut() { + kad.put_record_to( + record, + cache_candidates.into_iter().map(|v| v.1), + Quorum::One, + ); } - }, + } + + continue + }, Err(e @ libp2p::kad::GetRecordError::NotFound { .. }) => { trace!( target: "sub-libp2p",