Wait concurrently for better performance.
This commit is contained in:
parent
33ecb26a9c
commit
0700d168c5
1 changed files with 24 additions and 22 deletions
|
@ -123,31 +123,33 @@ class Peer(MessageServer):
|
|||
})
|
||||
break
|
||||
|
||||
tasks = {asyncio.ensure_future(self._read_message(reader)) for address, (reader, _) in peers.items()}
|
||||
# TODO: update chunkinfo after receiving each chunk
|
||||
with open(destination + '.temp', 'wb') as dest_file:
|
||||
self._file_map[file] = destination
|
||||
for i in range(totalchunknum):
|
||||
for address, (reader, _) in peers.items():
|
||||
assert isinstance(reader, asyncio.StreamReader)
|
||||
while not reader.at_eof():
|
||||
message = await self._read_message(reader)
|
||||
number, data, digest = message['chunknum'], message['data'], message['digest']
|
||||
raw_data = pybase64.b64decode(data.encode('utf-8'), validate=True)
|
||||
# TODO: handle if corrupted
|
||||
if Peer._HASH_FUNC(raw_data).hexdigest() != digest:
|
||||
assert False
|
||||
dest_file.seek(number * Peer._CHUNK_SIZE, 0)
|
||||
dest_file.write(raw_data)
|
||||
dest_file.flush()
|
||||
# send request chunk register to server
|
||||
await self._write_message(self._tracker_writer, {
|
||||
'type': MessageType.REQUEST_CHUNK_REGISTER,
|
||||
'filename': file,
|
||||
'chunknum': number
|
||||
})
|
||||
if reporthook:
|
||||
reporthook(i + 1, Peer._CHUNK_SIZE, fileinfo['size'])
|
||||
logger.debug('Got {}\'s chunk # {}'.format(file, number))
|
||||
finished = 0
|
||||
while finished < totalchunknum:
|
||||
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
|
||||
for task in done:
|
||||
message = task.result()
|
||||
finished += 1
|
||||
number, data, digest = message['chunknum'], message['data'], message['digest']
|
||||
raw_data = pybase64.b64decode(data.encode('utf-8'), validate=True)
|
||||
# TODO: handle if corrupted
|
||||
if Peer._HASH_FUNC(raw_data).hexdigest() != digest:
|
||||
assert False
|
||||
dest_file.seek(number * Peer._CHUNK_SIZE, 0)
|
||||
dest_file.write(raw_data)
|
||||
dest_file.flush()
|
||||
# send request chunk register to server
|
||||
await self._write_message(self._tracker_writer, {
|
||||
'type': MessageType.REQUEST_CHUNK_REGISTER,
|
||||
'filename': file,
|
||||
'chunknum': number
|
||||
})
|
||||
if reporthook:
|
||||
reporthook(finished, Peer._CHUNK_SIZE, fileinfo['size'])
|
||||
logger.debug('Got {}\'s chunk # {}'.format(file, number))
|
||||
|
||||
# change the temp file into the actual file
|
||||
os.rename(destination + '.temp', destination)
|
||||
|
|
Loading…
Reference in a new issue