1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use futures::{
channel::{mpsc, oneshot},
stream::FuturesUnordered,
SinkExt, StreamExt,
};
use fatality::Nested;
use polkadot_node_network_protocol::{
request_response::{
incoming::OutgoingResponse,
v1::{StatementFetchingRequest, StatementFetchingResponse},
IncomingRequestReceiver, MAX_PARALLEL_STATEMENT_REQUESTS,
},
PeerId, UnifiedReputationChange as Rep,
};
use polkadot_primitives::v2::{CandidateHash, CommittedCandidateReceipt, Hash};
use crate::LOG_TARGET;
const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
pub enum ResponderMessage {
GetData {
requesting_peer: PeerId,
relay_parent: Hash,
candidate_hash: CandidateHash,
tx: oneshot::Sender<CommittedCandidateReceipt>,
},
}
pub async fn respond(
mut receiver: IncomingRequestReceiver<StatementFetchingRequest>,
mut sender: mpsc::Sender<ResponderMessage>,
) {
let mut pending_out = FuturesUnordered::new();
loop {
if pending_out.len() >= MAX_PARALLEL_STATEMENT_REQUESTS as usize {
pending_out.next().await;
}
let req = match receiver.recv(|| vec![COST_INVALID_REQUEST]).await.into_nested() {
Ok(Ok(v)) => v,
Err(fatal) => {
gum::debug!(target: LOG_TARGET, error = ?fatal, "Shutting down request responder");
return
},
Ok(Err(jfyi)) => {
gum::debug!(target: LOG_TARGET, error = ?jfyi, "Decoding request failed");
continue
},
};
let (tx, rx) = oneshot::channel();
if let Err(err) = sender
.feed(ResponderMessage::GetData {
requesting_peer: req.peer,
relay_parent: req.payload.relay_parent,
candidate_hash: req.payload.candidate_hash,
tx,
})
.await
{
gum::debug!(target: LOG_TARGET, ?err, "Shutting down responder");
return
}
let response = match rx.await {
Err(err) => {
gum::debug!(target: LOG_TARGET, ?err, "Requested data not found.");
Err(())
},
Ok(v) => Ok(StatementFetchingResponse::Statement(v)),
};
let (pending_sent_tx, pending_sent_rx) = oneshot::channel();
let response = OutgoingResponse {
result: response,
reputation_changes: Vec::new(),
sent_feedback: Some(pending_sent_tx),
};
pending_out.push(pending_sent_rx);
if let Err(_) = req.send_outgoing_response(response) {
gum::debug!(target: LOG_TARGET, "Sending response failed");
}
}
}