Add speed tests for peers.

This commit is contained in:
Yuxin Wang 2018-10-09 21:15:17 -04:00
parent 422e08f6f6
commit be6ad63795

View file

@ -2,6 +2,7 @@ import logging
import os.path
import math
import json
import time
import hashlib
import asyncio
import pybase64
@ -114,27 +115,46 @@ class Peer(MessageServer):
fileinfo, chunkinfo = message['fileinfo'], message['chunkinfo']
logger.debug('{}: {} ==> {}'.format(file, fileinfo, chunkinfo))
total_chunknum = math.ceil(fileinfo['size'] / Peer._CHUNK_SIZE)
total_chunknum = message['fileinfo']['total_chunknum']
# TODO: decide which peer to request chunk
# peer_address -> (reader, writer)
# peer_address -> (reader, writer, RTT)
peers = {}
# connect to all peers and do a speed test
for peer_address in chunkinfo.keys():
# peer_address is a string, since JSON requires keys being strings
reader, writer = await asyncio.open_connection(*json.loads(peer_address), loop=self._loop)
# send out ping packet
await self._write_message(writer, {
'type': MessageType.PEER_PING_PONG,
'peer_address': peer_address
})
# set current time
peers[peer_address] = [reader, writer, time.time()]
# start reading from peers to get pong packets
# task -> peer_address, for easy reference to peers' RTT
for done in asyncio.as_completed({asyncio.ensure_future(self._read_message(reader))
for reader, _, _ in peers.values()}):
message = await done
peer_address = message['peer_address']
peers[peer_address][2] = time.time() - peers[peer_address][2]
for chunknum in range(total_chunknum):
for peer_address, possessed_chunks in chunkinfo.items():
if chunknum in possessed_chunks:
if peer_address not in peers:
# peer_address is a string, since JSON requires keys being strings
peers[peer_address] = await asyncio.open_connection(*json.loads(peer_address), loop=self._loop)
# write the message to ask for the chunk
asyncio.ensure_future(self._write_message(peers[peer_address][1], {
'type': MessageType.PEER_REQUEST_CHUNK,
'filename': file,
'chunknum': chunknum
})))
}))
break
# task -> reader
tasks = {asyncio.ensure_future(self._read_message(reader)): reader for address, (reader, _) in peers.items()}
tasks = {asyncio.ensure_future(self._read_message(reader)): reader for address, (reader, _, _) in peers.items()}
# TODO: update chunkinfo after receiving each chunk
try:
with open(destination + '.temp', 'wb') as dest_file:
@ -174,7 +194,7 @@ class Peer(MessageServer):
logger.debug('Got {}\'s chunk # {}'.format(file, number))
finally:
# close the connections
for _, (_, writer) in peers.items():
for _, (_, writer, _) in peers.items():
if not writer.is_closing():
writer.close()
await writer.wait_closed()