1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115
use serde::{de::DeserializeOwned, Serialize}; use serde_cbor; use std::{ io::{Read, Write}, rc::Rc, }; use MessageTransport; use Notifiable; use Notifies; pub struct ProtocolTransport { pub mt: MessageTransport, pub outbound_compression: bool, pub inbound_compression: bool, } fn compress(data: &[u8]) -> Vec<u8> { let compressed_data = Vec::with_capacity(data.len()); let mut encoder = ::libflate::zlib::Encoder::new(compressed_data).expect("Failed to create outbound compression encoder"); encoder.write_all(&data[..]).expect("Failed to compress outbound message"); encoder.finish().into_result().expect("Failed to compress outbound message.") } fn decompress(data: &[u8]) -> Vec<u8> { let mut decoder = ::libflate::zlib::Decoder::new(data).expect("Failed to create inbound compression decoder"); let mut buf = Vec::new(); decoder.read_to_end(&mut buf).expect("Failed to decompress inbound message"); buf } impl ProtocolTransport { pub fn create<T: Into<MessageTransport>>(transport: T) -> ProtocolTransport { ProtocolTransport { mt: transport.into(), inbound_compression: false, outbound_compression: false, } } pub fn send<T: Serialize>(&self, message: T) { let mut data = serde_cbor::ser::to_vec_packed(&message).unwrap(); if self.outbound_compression { data = compress(&data); } self.mt.send(&data[..]); } pub fn recv<T: DeserializeOwned>(&self) -> Option<T> { let message = self.mt.recv(); if message.is_none() { return None; } let mut message = message.unwrap(); if self.inbound_compression { message = decompress(&message); } trace!("Trying to deserialize input: {:?}", message); serde_cbor::from_slice(&message[..]).expect("Failed to deserialize protocol message") } pub fn recv_tolerant<T: DeserializeOwned>(&self) -> Option<Option<T>> { let message = self.mt.recv(); if message.is_none() { return None; } let mut message = message.unwrap(); if self.inbound_compression { message = decompress(&message); } trace!("Trying to deserialize input: {:?}", message); Some(serde_cbor::from_slice(&message[..]).ok()) } pub fn recv_all<T: DeserializeOwned>(&self) -> Vec<T> { let mut result = Vec::new(); for mut i in self.mt.recv_all() { if self.inbound_compression { i = decompress(&i); } result.push(serde_cbor::from_slice(&i).expect("Failed to deserialize protocol message.")); } result } pub fn recv_all_tolerant<T: DeserializeOwned>(&self) -> Vec<Option<T>> { let mut result = Vec::new(); for mut i in self.mt.recv_all() { if self.inbound_compression { i = decompress(&i); } result.push(serde_cbor::from_slice(&i).ok()); } result } pub fn has_write_space(&self) -> bool { self.mt.has_write_space() } pub fn is_closed(&self) -> bool { self.mt.is_closed() } } impl Notifiable for ProtocolTransport { fn notify(&self) { self.mt.notify() } } impl Notifies for ProtocolTransport { fn set_notify(&self, other: Rc<Notifiable>) { self.mt.set_notify(other) } }