1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#![deny(missing_docs)]
#![deny(unused_crate_dependencies)]
#![recursion_limit = "256"]
use std::time::Duration;
use futures::{FutureExt, TryFutureExt};
use sp_keystore::SyncCryptoStorePtr;
use polkadot_node_network_protocol::{
request_response::{v1 as request_v1, IncomingRequestReceiver},
PeerId, UnifiedReputationChange as Rep,
};
use polkadot_primitives::v2::CollatorPair;
use polkadot_node_subsystem::{
errors::SubsystemError, messages::NetworkBridgeTxMessage, overseer, SpawnedSubsystem,
};
mod error;
mod collator_side;
mod validator_side;
const LOG_TARGET: &'static str = "parachain::collator-protocol";
#[derive(Debug, Clone, Copy)]
pub struct CollatorEvictionPolicy {
pub inactive_collator: Duration,
pub undeclared: Duration,
}
impl Default for CollatorEvictionPolicy {
fn default() -> Self {
CollatorEvictionPolicy {
inactive_collator: Duration::from_secs(24),
undeclared: Duration::from_secs(1),
}
}
}
pub enum ProtocolSide {
Validator {
keystore: SyncCryptoStorePtr,
eviction_policy: CollatorEvictionPolicy,
metrics: validator_side::Metrics,
},
Collator(
PeerId,
CollatorPair,
IncomingRequestReceiver<request_v1::CollationFetchingRequest>,
collator_side::Metrics,
),
}
pub struct CollatorProtocolSubsystem {
protocol_side: ProtocolSide,
}
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
impl CollatorProtocolSubsystem {
pub fn new(protocol_side: ProtocolSide) -> Self {
Self { protocol_side }
}
async fn run<Context>(self, ctx: Context) -> std::result::Result<(), error::FatalError> {
match self.protocol_side {
ProtocolSide::Validator { keystore, eviction_policy, metrics } =>
validator_side::run(ctx, keystore, eviction_policy, metrics).await,
ProtocolSide::Collator(local_peer_id, collator_pair, req_receiver, metrics) =>
collator_side::run(ctx, local_peer_id, collator_pair, req_receiver, metrics).await,
}
}
}
#[overseer::subsystem(CollatorProtocol, error=SubsystemError, prefix=self::overseer)]
impl<Context> CollatorProtocolSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = self
.run(ctx)
.map_err(|e| SubsystemError::with_origin("collator-protocol", e))
.boxed();
SpawnedSubsystem { name: "collator-protocol-subsystem", future }
}
}
async fn modify_reputation(
sender: &mut impl overseer::CollatorProtocolSenderTrait,
peer: PeerId,
rep: Rep,
) {
gum::trace!(
target: LOG_TARGET,
rep = ?rep,
peer_id = %peer,
"reputation change for peer",
);
sender.send_message(NetworkBridgeTxMessage::ReportPeer(peer, rep)).await;
}