1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use serde::{de::DeserializeOwned, Serialize};
use serde_cbor;
use std::{
	io::{Read, Write},
	rc::Rc,
};
use MessageTransport;
use Notifiable;
use Notifies;

pub struct ProtocolTransport {
	pub mt:                   MessageTransport,
	pub outbound_compression: bool,
	pub inbound_compression:  bool,
}

fn compress(data: &[u8]) -> Vec<u8> {
	let compressed_data = Vec::with_capacity(data.len());
	let mut encoder = ::libflate::zlib::Encoder::new(compressed_data).expect("Failed to create outbound compression encoder");
	encoder.write_all(&data[..]).expect("Failed to compress outbound message");
	encoder.finish().into_result().expect("Failed to compress outbound message.")
}

fn decompress(data: &[u8]) -> Vec<u8> {
	let mut decoder = ::libflate::zlib::Decoder::new(data).expect("Failed to create inbound compression decoder");
	let mut buf = Vec::new();
	decoder.read_to_end(&mut buf).expect("Failed to decompress inbound message");
	buf
}

impl ProtocolTransport {
	pub fn create<T: Into<MessageTransport>>(transport: T) -> ProtocolTransport {
		ProtocolTransport {
			mt:                   transport.into(),
			inbound_compression:  false,
			outbound_compression: false,
		}
	}

	pub fn send<T: Serialize>(&self, message: T) {
		let mut data = serde_cbor::ser::to_vec_packed(&message).unwrap();
		if self.outbound_compression {
			data = compress(&data);
		}
		self.mt.send(&data[..]);
	}

	pub fn recv<T: DeserializeOwned>(&self) -> Option<T> {
		let message = self.mt.recv();
		if message.is_none() {
			return None;
		}
		let mut message = message.unwrap();
		if self.inbound_compression {
			message = decompress(&message);
		}
		trace!("Trying to deserialize input: {:?}", message);
		serde_cbor::from_slice(&message[..]).expect("Failed to deserialize protocol message")
	}

	pub fn recv_tolerant<T: DeserializeOwned>(&self) -> Option<Option<T>> {
		let message = self.mt.recv();
		if message.is_none() {
			return None;
		}
		let mut message = message.unwrap();
		if self.inbound_compression {
			message = decompress(&message);
		}
		trace!("Trying to deserialize input: {:?}", message);
		Some(serde_cbor::from_slice(&message[..]).ok())
	}

	pub fn recv_all<T: DeserializeOwned>(&self) -> Vec<T> {
		let mut result = Vec::new();
		for mut i in self.mt.recv_all() {
			if self.inbound_compression {
				i = decompress(&i);
			}
			result.push(serde_cbor::from_slice(&i).expect("Failed to deserialize protocol message."));
		}
		result
	}

	pub fn recv_all_tolerant<T: DeserializeOwned>(&self) -> Vec<Option<T>> {
		let mut result = Vec::new();
		for mut i in self.mt.recv_all() {
			if self.inbound_compression {
				i = decompress(&i);
			}
			result.push(serde_cbor::from_slice(&i).ok());
		}
		result
	}

	pub fn has_write_space(&self) -> bool {
		self.mt.has_write_space()
	}

	pub fn is_closed(&self) -> bool {
		self.mt.is_closed()
	}
}

impl Notifiable for ProtocolTransport {
	fn notify(&self) {
		self.mt.notify()
	}
}

impl Notifies for ProtocolTransport {
	fn set_notify(&self, other: Rc<Notifiable>) {
		self.mt.set_notify(other)
	}
}