Return None if read is incomplete.

This commit is contained in:
Yuxin Wang 2018-10-07 00:09:12 -04:00
parent 581e24b560
commit b7833ef410
3 changed files with 13 additions and 3 deletions

View file

@ -132,6 +132,8 @@ class Peer(MessageServer):
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for task in done:
message = task.result()
if message is None:
continue
finished += 1
number, data, digest = message['chunknum'], message['data'], message['digest']
raw_data = pybase64.b64decode(data.encode('utf-8'), validate=True)
@ -165,6 +167,8 @@ class Peer(MessageServer):
assert isinstance(reader, asyncio.StreamReader) and isinstance(writer, asyncio.StreamWriter)
while not reader.at_eof():
message = await self._read_message(reader)
if message is None:
break
message_type = MessageType(message['type'])
if message_type == MessageType.PEER_REQUEST_CHUNK:
assert message['filename'] in self._file_map, 'File {} requested does not exist'.format(message['filename'])

View file

@ -63,9 +63,13 @@ class MessageServer:
async def _read_message(self, reader):
assert isinstance(reader, asyncio.StreamReader)
# receive length header -> decompress (bytes) -> decode to str (str) -> json load (dict)
raw_msg_len = await reader.readexactly(4)
msglen = struct.unpack('>I', raw_msg_len)[0]
raw_msg = await reader.readexactly(msglen)
try:
raw_msg_len = await reader.readexactly(4)
msglen = struct.unpack('>I', raw_msg_len)[0]
raw_msg = await reader.readexactly(msglen)
except asyncio.IncompleteReadError:
return None
msg = json.loads(self._decompressor.decompress(raw_msg).decode('utf-8'))
logger.debug('Message received {}'.format(self._message_log(msg)))
return msg

View file

@ -29,6 +29,8 @@ class Tracker(MessageServer):
self._peers[writer] = None
while not reader.at_eof():
message = await self._read_message(reader)
if message is None:
break
message_type = MessageType(message['type'])
if message_type == MessageType.REQUEST_REGISTER:
# peer_address is a string, since JSON requires keys being strings