1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
// Copyright 2022 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.

//! Grid topology support implementation
//! The basic operation of the 2D grid topology is that:
//!   * A validator producing a message sends it to its row-neighbors and its column-neighbors
//!   * A validator receiving a message originating from one of its row-neighbors sends it to its column-neighbors
//!   * A validator receiving a message originating from one of its column-neighbors sends it to its row-neighbors
//!
//! This grid approach defines 2 unique paths for every validator to reach every other validator in at most 2 hops.
//!
//! However, we also supplement this with some degree of random propagation:
//! every validator, upon seeing a message for the first time, propagates it to 8 random peers.
//! This inserts some redundancy in case the grid topology isn't working or is being attacked -
//! an adversary doesn't know which peers a validator will send to.
//! This is combined with the property that the adversary doesn't know which validators will elect to check a block.
//!

use crate::PeerId;
use polkadot_primitives::v2::{SessionIndex, ValidatorIndex};
use rand::{CryptoRng, Rng};
use std::{
	collections::{hash_map, HashMap, HashSet},
	fmt::Debug,
};

const LOG_TARGET: &str = "parachain::grid-topology";

/// The sample rate for randomly propagating messages. This
/// reduces the left tail of the binomial distribution but also
/// introduces a bias towards peers who we sample before others
/// (i.e. those who get a block before others).
pub const DEFAULT_RANDOM_SAMPLE_RATE: usize = crate::MIN_GOSSIP_PEERS;

/// The number of peers to randomly propagate messages to.
pub const DEFAULT_RANDOM_CIRCULATION: usize = 4;

/// Topology representation
#[derive(Default, Clone, Debug)]
pub struct SessionGridTopology {
	/// Represent peers in the X axis
	pub peers_x: HashSet<PeerId>,
	/// Represent validators in the X axis
	pub validator_indices_x: HashSet<ValidatorIndex>,
	/// Represent peers in the Y axis
	pub peers_y: HashSet<PeerId>,
	/// Represent validators in the Y axis
	pub validator_indices_y: HashSet<ValidatorIndex>,
}

impl SessionGridTopology {
	/// Given the originator of a message as a validator index, indicates the part of the topology
	/// we're meant to send the message to.
	pub fn required_routing_by_index(
		&self,
		originator: ValidatorIndex,
		local: bool,
	) -> RequiredRouting {
		if local {
			return RequiredRouting::GridXY
		}

		let grid_x = self.validator_indices_x.contains(&originator);
		let grid_y = self.validator_indices_y.contains(&originator);

		match (grid_x, grid_y) {
			(false, false) => RequiredRouting::None,
			(true, false) => RequiredRouting::GridY, // messages from X go to Y
			(false, true) => RequiredRouting::GridX, // messages from Y go to X
			(true, true) => RequiredRouting::GridXY, // if the grid works as expected, this shouldn't happen.
		}
	}

	/// Given the originator of a message as a peer index, indicates the part of the topology
	/// we're meant to send the message to.
	pub fn required_routing_by_peer_id(&self, originator: PeerId, local: bool) -> RequiredRouting {
		if local {
			return RequiredRouting::GridXY
		}

		let grid_x = self.peers_x.contains(&originator);
		let grid_y = self.peers_y.contains(&originator);

		match (grid_x, grid_y) {
			(false, false) => RequiredRouting::None,
			(true, false) => RequiredRouting::GridY, // messages from X go to Y
			(false, true) => RequiredRouting::GridX, // messages from Y go to X
			(true, true) => {
				gum::debug!(
					target: LOG_TARGET,
					?originator,
					"Grid topology is unexpected, play it safe and send to X AND Y"
				);
				RequiredRouting::GridXY
			}, // if the grid works as expected, this shouldn't happen.
		}
	}

	/// Get a filter function based on this topology and the required routing
	/// which returns `true` for peers that are within the required routing set
	/// and false otherwise.
	pub fn route_to_peer(&self, required_routing: RequiredRouting, peer: &PeerId) -> bool {
		match required_routing {
			RequiredRouting::All => true,
			RequiredRouting::GridX => self.peers_x.contains(peer),
			RequiredRouting::GridY => self.peers_y.contains(peer),
			RequiredRouting::GridXY => self.peers_x.contains(peer) || self.peers_y.contains(peer),
			RequiredRouting::None | RequiredRouting::PendingTopology => false,
		}
	}

	/// Returns the difference between this and the `other` topology as a vector of peers
	pub fn peers_diff(&self, other: &SessionGridTopology) -> Vec<PeerId> {
		self.peers_x
			.iter()
			.chain(self.peers_y.iter())
			.filter(|peer_id| !(other.peers_x.contains(peer_id) || other.peers_y.contains(peer_id)))
			.cloned()
			.collect::<Vec<_>>()
	}

	/// A convenience method that returns total number of peers in the topology
	pub fn len(&self) -> usize {
		self.peers_x.len().saturating_add(self.peers_y.len())
	}
}

/// A set of topologies indexed by session
#[derive(Default)]
pub struct SessionGridTopologies {
	inner: HashMap<SessionIndex, (Option<SessionGridTopology>, usize)>,
}

impl SessionGridTopologies {
	/// Returns a topology for the specific session index
	pub fn get_topology(&self, session: SessionIndex) -> Option<&SessionGridTopology> {
		self.inner.get(&session).and_then(|val| val.0.as_ref())
	}

	/// Increase references counter for a specific topology
	pub fn inc_session_refs(&mut self, session: SessionIndex) {
		self.inner.entry(session).or_insert((None, 0)).1 += 1;
	}

	/// Decrease references counter for a specific topology
	pub fn dec_session_refs(&mut self, session: SessionIndex) {
		if let hash_map::Entry::Occupied(mut occupied) = self.inner.entry(session) {
			occupied.get_mut().1 = occupied.get().1.saturating_sub(1);
			if occupied.get().1 == 0 {
				let _ = occupied.remove();
			}
		}
	}

	/// Insert a new topology, no-op if already present.
	pub fn insert_topology(&mut self, session: SessionIndex, topology: SessionGridTopology) {
		let entry = self.inner.entry(session).or_insert((None, 0));
		if entry.0.is_none() {
			entry.0 = Some(topology);
		}
	}
}

/// A simple storage for a topology and the corresponding session index
#[derive(Default, Debug)]
pub struct GridTopologySessionBound {
	topology: SessionGridTopology,
	session_index: SessionIndex,
}

/// A storage for the current and maybe previous topology
#[derive(Default, Debug)]
pub struct SessionBoundGridTopologyStorage {
	current_topology: GridTopologySessionBound,
	prev_topology: Option<GridTopologySessionBound>,
}

impl SessionBoundGridTopologyStorage {
	/// Return a grid topology based on the session index:
	/// If we need a previous session and it is registered in the storage, then return that session.
	/// Otherwise, return a current session to have some grid topology in any case
	pub fn get_topology_or_fallback(&self, idx: SessionIndex) -> &SessionGridTopology {
		self.get_topology(idx).unwrap_or(&self.current_topology.topology)
	}

	/// Return the grid topology for the specific session index, if no such a session is stored
	/// returns `None`.
	pub fn get_topology(&self, idx: SessionIndex) -> Option<&SessionGridTopology> {
		if let Some(prev_topology) = &self.prev_topology {
			if idx == prev_topology.session_index {
				return Some(&prev_topology.topology)
			}
		}
		if self.current_topology.session_index == idx {
			return Some(&self.current_topology.topology)
		}

		None
	}

	/// Update the current topology preserving the previous one
	pub fn update_topology(&mut self, session_index: SessionIndex, topology: SessionGridTopology) {
		let old_current = std::mem::replace(
			&mut self.current_topology,
			GridTopologySessionBound { topology, session_index },
		);
		self.prev_topology.replace(old_current);
	}

	/// Returns a current grid topology
	pub fn get_current_topology(&self) -> &SessionGridTopology {
		&self.current_topology.topology
	}
}

/// A representation of routing based on sample
#[derive(Debug, Clone, Copy)]
pub struct RandomRouting {
	/// The number of peers to target.
	target: usize,
	/// The number of peers this has been sent to.
	sent: usize,
	/// Sampling rate
	sample_rate: usize,
}

impl Default for RandomRouting {
	fn default() -> Self {
		RandomRouting {
			target: DEFAULT_RANDOM_CIRCULATION,
			sent: 0_usize,
			sample_rate: DEFAULT_RANDOM_SAMPLE_RATE,
		}
	}
}

impl RandomRouting {
	/// Perform random sampling for a specific peer
	/// Returns `true` for a lucky peer
	pub fn sample(&self, n_peers_total: usize, rng: &mut (impl CryptoRng + Rng)) -> bool {
		if n_peers_total == 0 || self.sent >= self.target {
			false
		} else if self.sample_rate > n_peers_total {
			true
		} else {
			rng.gen_ratio(self.sample_rate as _, n_peers_total as _)
		}
	}

	/// Increase number of messages being sent
	pub fn inc_sent(&mut self) {
		self.sent += 1
	}
}

/// Routing mode
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum RequiredRouting {
	/// We don't know yet, because we're waiting for topology info
	/// (race condition between learning about the first blocks in a new session
	/// and getting the topology for that session)
	PendingTopology,
	/// Propagate to all peers of any kind.
	All,
	/// Propagate to all peers sharing either the X or Y dimension of the grid.
	GridXY,
	/// Propagate to all peers sharing the X dimension of the grid.
	GridX,
	/// Propagate to all peers sharing the Y dimension of the grid.
	GridY,
	/// No required propagation.
	None,
}

impl RequiredRouting {
	/// Whether the required routing set is definitely empty.
	pub fn is_empty(self) -> bool {
		match self {
			RequiredRouting::PendingTopology | RequiredRouting::None => true,
			_ => false,
		}
	}
}

#[cfg(test)]
mod tests {
	use super::*;
	use rand::SeedableRng;
	use rand_chacha::ChaCha12Rng;

	fn dummy_rng() -> ChaCha12Rng {
		rand_chacha::ChaCha12Rng::seed_from_u64(12345)
	}

	#[test]
	fn test_random_routing_sample() {
		// This test is fragile as it relies on a specific ChaCha12Rng
		// sequence that might be implementation defined even for a static seed
		let mut rng = dummy_rng();
		let mut random_routing = RandomRouting { target: 4, sent: 0, sample_rate: 8 };

		assert_eq!(random_routing.sample(16, &mut rng), true);
		random_routing.inc_sent();
		assert_eq!(random_routing.sample(16, &mut rng), false);
		assert_eq!(random_routing.sample(16, &mut rng), false);
		assert_eq!(random_routing.sample(16, &mut rng), true);
		random_routing.inc_sent();
		assert_eq!(random_routing.sample(16, &mut rng), true);
		random_routing.inc_sent();
		assert_eq!(random_routing.sample(16, &mut rng), false);
		assert_eq!(random_routing.sample(16, &mut rng), false);
		assert_eq!(random_routing.sample(16, &mut rng), false);
		assert_eq!(random_routing.sample(16, &mut rng), true);
		random_routing.inc_sent();

		for _ in 0..16 {
			assert_eq!(random_routing.sample(16, &mut rng), false);
		}
	}

	fn run_random_routing(
		random_routing: &mut RandomRouting,
		rng: &mut (impl CryptoRng + Rng),
		npeers: usize,
		iters: usize,
	) -> usize {
		let mut ret = 0_usize;

		for _ in 0..iters {
			if random_routing.sample(npeers, rng) {
				random_routing.inc_sent();
				ret += 1;
			}
		}

		ret
	}

	#[test]
	fn test_random_routing_distribution() {
		let mut rng = dummy_rng();

		let mut random_routing = RandomRouting { target: 4, sent: 0, sample_rate: 8 };
		assert_eq!(run_random_routing(&mut random_routing, &mut rng, 100, 10000), 4);

		let mut random_routing = RandomRouting { target: 8, sent: 0, sample_rate: 100 };
		assert_eq!(run_random_routing(&mut random_routing, &mut rng, 100, 10000), 8);

		let mut random_routing = RandomRouting { target: 0, sent: 0, sample_rate: 100 };
		assert_eq!(run_random_routing(&mut random_routing, &mut rng, 100, 10000), 0);

		let mut random_routing = RandomRouting { target: 10, sent: 0, sample_rate: 10 };
		assert_eq!(run_random_routing(&mut random_routing, &mut rng, 10, 100), 10);
	}
}