1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use crate::error::PlainTextError;
use crate::structs_proto::Exchange;
use crate::PlainText2Config;
use asynchronous_codec::{Framed, FramedParts};
use bytes::{Bytes, BytesMut};
use futures::prelude::*;
use libp2p_core::{PeerId, PublicKey};
use log::{debug, trace};
use prost::Message;
use std::io::{Error as IoError, ErrorKind as IoErrorKind};
use unsigned_varint::codec::UviBytes;
struct HandshakeContext<T> {
config: PlainText2Config,
state: T,
}
struct Local {
exchange_bytes: Vec<u8>,
}
pub struct Remote {
pub peer_id: PeerId,
pub public_key: PublicKey,
}
impl HandshakeContext<Local> {
fn new(config: PlainText2Config) -> Self {
let exchange = Exchange {
id: Some(config.local_public_key.to_peer_id().to_bytes()),
pubkey: Some(config.local_public_key.to_protobuf_encoding()),
};
let mut buf = Vec::with_capacity(exchange.encoded_len());
exchange
.encode(&mut buf)
.expect("Vec<u8> provides capacity as needed");
Self {
config,
state: Local {
exchange_bytes: buf,
},
}
}
fn with_remote(
self,
exchange_bytes: BytesMut,
) -> Result<HandshakeContext<Remote>, PlainTextError> {
let prop = match Exchange::decode(exchange_bytes) {
Ok(prop) => prop,
Err(e) => {
debug!("failed to parse remote's exchange protobuf message");
return Err(PlainTextError::InvalidPayload(Some(e)));
}
};
let pb_pubkey = prop.pubkey.unwrap_or_default();
let public_key = match PublicKey::from_protobuf_encoding(pb_pubkey.as_slice()) {
Ok(p) => p,
Err(_) => {
debug!("failed to parse remote's exchange's pubkey protobuf");
return Err(PlainTextError::InvalidPayload(None));
}
};
let peer_id = match PeerId::from_bytes(&prop.id.unwrap_or_default()) {
Ok(p) => p,
Err(_) => {
debug!("failed to parse remote's exchange's id protobuf");
return Err(PlainTextError::InvalidPayload(None));
}
};
if peer_id != public_key.to_peer_id() {
debug!("the remote's `PeerId` isn't consistent with the remote's public key");
return Err(PlainTextError::InvalidPeerId);
}
Ok(HandshakeContext {
config: self.config,
state: Remote {
peer_id,
public_key,
},
})
}
}
pub async fn handshake<S>(
socket: S,
config: PlainText2Config,
) -> Result<(S, Remote, Bytes), PlainTextError>
where
S: AsyncRead + AsyncWrite + Send + Unpin,
{
let mut framed_socket = Framed::new(socket, UviBytes::default());
trace!("starting handshake");
let context = HandshakeContext::new(config);
trace!("sending exchange to remote");
framed_socket
.send(BytesMut::from(&context.state.exchange_bytes[..]))
.await?;
trace!("receiving the remote's exchange");
let context = match framed_socket.next().await {
Some(p) => context.with_remote(p?)?,
None => {
debug!("unexpected eof while waiting for remote's exchange");
let err = IoError::new(IoErrorKind::BrokenPipe, "unexpected eof");
return Err(err.into());
}
};
trace!(
"received exchange from remote; pubkey = {:?}",
context.state.public_key
);
let FramedParts {
io,
read_buffer,
write_buffer,
..
} = framed_socket.into_parts();
assert!(write_buffer.is_empty());
Ok((io, context.state, read_buffer.freeze()))
}