Compress the packets sent over the network.

This commit is contained in:
Yuxin Wang 2018-09-27 13:11:21 -04:00
parent 99c1381abe
commit 282ac08fc8
2 changed files with 16 additions and 6 deletions

View file

@ -4,6 +4,7 @@ from abc import abstractmethod
import json
import struct
import logging
import zstandard as zstd
logger = logging.getLogger(__name__)
@ -13,6 +14,8 @@ class MessageServer:
self._sock.bind((host, port))
self._process_lock = threading.Lock()
self._connection_lock = threading.Lock()
self._compressor = zstd.ZstdCompressor()
self._decompressor = zstd.ZstdDecompressor()
def start(self):
# put server listening into a thread
@ -49,6 +52,10 @@ class MessageServer:
logger.info('Successfully connected to {} on {}'.format((ip, port), client.getsockname()))
return client
@staticmethod
def __message_log(message):
return {key: message[key] for key in message if key != 'data'} if 'data' in message else message
def _read_message(self, client):
assert isinstance(client, socket.socket)
try:
@ -56,8 +63,8 @@ class MessageServer:
raw_msg_len = self.__recvall(client, 4)
msglen = struct.unpack('>I', raw_msg_len)[0]
raw_msg = self.__recvall(client, msglen)
msg = json.loads(raw_msg.decode('utf-8'))
logger.debug('Message {} from {}'.format(msg, client.getpeername()))
msg = json.loads(self._decompressor.decompress(raw_msg).decode('utf-8'))
logger.debug('Message {} from {}'.format(self.__message_log(msg), client.getpeername()))
# process the packets in order
self._process_lock.acquire()
self._process_message(client, msg)
@ -70,10 +77,13 @@ class MessageServer:
def _write_message(self, client, message):
assert isinstance(client, socket.socket)
logging.debug('Writing {} to {}'.format(message, client.getpeername()))
logger.debug('Writing {} to {}'.format(self.__message_log(message), client.getpeername()))
raw_msg = json.dumps(message).encode('utf-8')
raw_msg = struct.pack('>I', len(raw_msg)) + raw_msg
client.sendall(raw_msg)
compressed = self._compressor.compress(raw_msg)
logger.debug('Compressed rate: {}'.format(len(compressed) / len(raw_msg)))
compressed = struct.pack('>I', len(compressed)) + compressed
client.sendall(compressed)
def _server_started(self):
pass

View file

@ -27,7 +27,7 @@ setup(
],
keywords='P2P, Networking',
packages=find_packages(exclude=['tests']),
install_requires=['pybase64', 'coloredlogs', 'tabulate'],
install_requires=['pybase64', 'zstandard', 'coloredlogs', 'tabulate'],
entry_points={
'console_scripts': [
'p2pfs=p2pfs.__main__:main',