diff --git a/p2pfs/core/peer.py b/p2pfs/core/peer.py index 9fc32ee..2ca93f9 100644 --- a/p2pfs/core/peer.py +++ b/p2pfs/core/peer.py @@ -123,31 +123,33 @@ class Peer(MessageServer): }) break + tasks = {asyncio.ensure_future(self._read_message(reader)) for address, (reader, _) in peers.items()} # TODO: update chunkinfo after receiving each chunk with open(destination + '.temp', 'wb') as dest_file: self._file_map[file] = destination - for i in range(totalchunknum): - for address, (reader, _) in peers.items(): - assert isinstance(reader, asyncio.StreamReader) - while not reader.at_eof(): - message = await self._read_message(reader) - number, data, digest = message['chunknum'], message['data'], message['digest'] - raw_data = pybase64.b64decode(data.encode('utf-8'), validate=True) - # TODO: handle if corrupted - if Peer._HASH_FUNC(raw_data).hexdigest() != digest: - assert False - dest_file.seek(number * Peer._CHUNK_SIZE, 0) - dest_file.write(raw_data) - dest_file.flush() - # send request chunk register to server - await self._write_message(self._tracker_writer, { - 'type': MessageType.REQUEST_CHUNK_REGISTER, - 'filename': file, - 'chunknum': number - }) - if reporthook: - reporthook(i + 1, Peer._CHUNK_SIZE, fileinfo['size']) - logger.debug('Got {}\'s chunk # {}'.format(file, number)) + finished = 0 + while finished < totalchunknum: + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED) + for task in done: + message = task.result() + finished += 1 + number, data, digest = message['chunknum'], message['data'], message['digest'] + raw_data = pybase64.b64decode(data.encode('utf-8'), validate=True) + # TODO: handle if corrupted + if Peer._HASH_FUNC(raw_data).hexdigest() != digest: + assert False + dest_file.seek(number * Peer._CHUNK_SIZE, 0) + dest_file.write(raw_data) + dest_file.flush() + # send request chunk register to server + await self._write_message(self._tracker_writer, { + 'type': MessageType.REQUEST_CHUNK_REGISTER, + 'filename': file, + 'chunknum': number + }) + if reporthook: + reporthook(finished, Peer._CHUNK_SIZE, fileinfo['size']) + logger.debug('Got {}\'s chunk # {}'.format(file, number)) # change the temp file into the actual file os.rename(destination + '.temp', destination)