Merge branch 'raise-exceptions', raise exceptions in most of the functions instead of returning (result, errno) pair.

This commit is contained in:
Yuxin Wang 2018-10-13 18:57:02 -04:00
commit 0aa2b16e3c
11 changed files with 251 additions and 185 deletions

View file

@ -1,2 +1,2 @@
from p2pfs.core import Peer, Tracker
from p2pfs.core import *
from p2pfs.ui import PeerTerminal, TrackerTerminal

View file

@ -31,8 +31,10 @@ def main():
exit(0)
try:
loop.run_until_complete(terminal.cmdloop())
except KeyboardInterrupt:
except (KeyboardInterrupt, EOFError):
pass
except Exception as e:
logging.error('{}:{}'.format(type(e).__name__, e))
finally:
loop.run_until_complete(obj.stop())
loop.close()

View file

@ -1,3 +1,3 @@
from p2pfs.core.peer import Peer
from p2pfs.core.tracker import Tracker
from p2pfs.core.exceptions import DownloadIncompleteError
from p2pfs.core.exceptions import *

View file

@ -1,3 +1,21 @@
class DownloadIncompleteError(EOFError):
def __init__(self, chunknum):
def __init__(self, message, chunknum):
super().__init__(message)
self.chunknum = chunknum
class AlreadyConnectedError(ConnectionError):
def __init__(self, address):
self.address = address
class TrackerNotConnectedError(ConnectionError):
pass
class InProgressError(Exception):
pass
class ServerRunningError(Exception):
pass

View file

@ -8,7 +8,7 @@ import asyncio
import pybase64
from p2pfs.core.message import MessageType, read_message, write_message
from p2pfs.core.server import MessageServer
from p2pfs.core.exceptions import DownloadIncompleteError
from p2pfs.core.exceptions import *
logger = logging.getLogger(__name__)
@ -36,7 +36,7 @@ class DownloadManager:
self._is_connected = True
async def _update_peer_rtt(self, addresses):
""" Test multiple peer's rtt, must have registered in _peers"""
""" Test multiple peer's rtt, must have registered in _peers, doesn't raise exceptions"""
# read_coro -> address
read_tasks = set()
for address in addresses:
@ -51,7 +51,7 @@ class DownloadManager:
read_tasks.add(asyncio.ensure_future(read_message(reader)))
# set current time
self._peers[address][2] = time.time()
except ConnectionError:
except (ConnectionError, RuntimeError):
# if cannot send ping pong packet to peer, the rtt remains math.inf
# won't cause trouble
pass
@ -66,10 +66,11 @@ class DownloadManager:
# however, since time.time() is relatively large enough to be similar to math.inf
# it won't cause trouble
pass
# we will hide exceptions here since the exceptions will re-arise when we do read task in download main body
# we will handle the exceptions there
# Note: we will hide exceptions here since the exceptions will re-arise
# when we do read task in download main body and we will handle the exceptions there
async def _request_chunkinfo(self):
""" send request chunkinfo to tracker, will raise exceptions """
await write_message(self._tracker_writer, {
'type': MessageType.REQUEST_FILE_LOCATION,
'filename': self._filename
@ -90,7 +91,7 @@ class DownloadManager:
return
try:
self._fileinfo, chunkinfo = await self._request_chunkinfo()
except (asyncio.IncompleteReadError, ConnectionError):
except (asyncio.IncompleteReadError, ConnectionError, RuntimeError):
# if tracker is down
self._is_connected = False
return
@ -132,7 +133,7 @@ class DownloadManager:
async def _send_request_chunk(self, chunknum):
if len(self._file_chunk_info[chunknum]) == 0:
raise DownloadIncompleteError(chunknum=chunknum)
raise DownloadIncompleteError(message='Download cannot proceed.', chunknum=chunknum)
fastest_peer = min(self._file_chunk_info[chunknum], key=lambda address: self._peers[address][2])
try:
await write_message(self._peers[fastest_peer][1], {
@ -289,35 +290,32 @@ class Peer(MessageServer):
async def connect(self, tracker_address, loop=None):
if await self.is_connected():
return False, 'Already connected!'
raise AlreadyConnectedError(address=self._tracker_writer.get_extra_info('peername'))
# connect to server
self._tracker_reader, self._tracker_writer = \
await asyncio.open_connection(*tracker_address, loop=loop)
try:
self._tracker_reader, self._tracker_writer = \
await asyncio.open_connection(*tracker_address, loop=loop)
except ConnectionRefusedError:
logger.error('Server connection refused!')
return False, 'Server connection refused!'
# send out register message
logger.info('Requesting to register')
await write_message(self._tracker_writer, {
'type': MessageType.REQUEST_REGISTER,
'address': self._server_address
})
message = await read_message(self._tracker_reader)
assert MessageType(message['type']) == MessageType.REPLY_REGISTER
# send out register message
logger.info('Requesting to register')
await write_message(self._tracker_writer, {
'type': MessageType.REQUEST_REGISTER,
'address': self._server_address
})
message = await read_message(self._tracker_reader)
assert MessageType(message['type']) == MessageType.REPLY_REGISTER
except (ConnectionError, RuntimeError, asyncio.IncompleteReadError):
logger.warning('Error occurred during communications with tracker.')
if not self._tracker_writer.is_closing():
self._tracker_writer.close()
await self._tracker_writer.wait_closed()
raise
logger.info('Successfully registered.')
return True, 'Connected!'
async def disconnect(self):
if not self._tracker_writer or self._tracker_writer.is_closing():
return False, 'Already disconnected'
self._tracker_writer.close()
await self._tracker_writer.wait_closed()
self._reset()
return True, 'Disconnected!'
def _reset(self):
if not self._tracker_writer.is_closing():
self._tracker_writer.close()
await self._tracker_writer.wait_closed()
# reset variables
self._pending_publish = set()
self._delay = 0
self._file_map = {}
@ -334,56 +332,66 @@ class Peer(MessageServer):
async def publish(self, local_file, remote_name=None):
if not os.path.exists(local_file):
return False, 'File {} doesn\'t exist'.format(local_file)
raise FileNotFoundError()
_, remote_name = os.path.split(local_file) if remote_name is None else remote_name
if remote_name in self._pending_publish:
return False, 'Publish file {} already in progress.'.format(local_file)
raise InProgressError()
if not await self.is_connected():
return False, 'Not connected, try \'connect <tracker_ip> <tracker_port>\''
raise TrackerNotConnectedError()
self._pending_publish.add(remote_name)
try:
# send out the request packet
await write_message(self._tracker_writer, {
'type': MessageType.REQUEST_PUBLISH,
'filename': remote_name,
'fileinfo': {
'size': os.stat(local_file).st_size,
'total_chunknum': math.ceil(os.stat(local_file).st_size / Peer._CHUNK_SIZE)
},
})
message = await read_message(self._tracker_reader)
assert MessageType(message['type']) == MessageType.REPLY_PUBLISH
is_success = message['result']
# send out the request packet
await write_message(self._tracker_writer, {
'type': MessageType.REQUEST_PUBLISH,
'filename': remote_name,
'fileinfo': {
'size': os.stat(local_file).st_size,
'total_chunknum': math.ceil(os.stat(local_file).st_size / Peer._CHUNK_SIZE)
},
})
message = await read_message(self._tracker_reader)
assert MessageType(message['type']) == MessageType.REPLY_PUBLISH
is_success, message = message['result'], message['message']
if is_success:
self._file_map[remote_name] = local_file
logger.info('File {} published on server with name {}'.format(local_file, remote_name))
else:
logger.info('File {} failed to publish, {}'.format(local_file, message))
self._pending_publish.remove(remote_name)
return is_success, message
if is_success:
self._file_map[remote_name] = local_file
logger.info('File {} published on server with name {}'.format(local_file, remote_name))
else:
logger.info('File {} failed to publish, {}'.format(local_file, message))
raise FileExistsError()
except (ConnectionError, RuntimeError, asyncio.IncompleteReadError):
logger.warning('Error occured during communications with tracker.')
raise
finally:
self._pending_publish.remove(remote_name)
async def list_file(self):
if not await self.is_connected():
return None, 'Not connected, try \'connect <tracker_ip> <tracker_port>\''
await write_message(self._tracker_writer, {
'type': MessageType.REQUEST_FILE_LIST,
})
message = await read_message(self._tracker_reader)
assert MessageType(message['type']) == MessageType.REPLY_FILE_LIST
return message['file_list'], 'Success'
raise TrackerNotConnectedError()
try:
await write_message(self._tracker_writer, {
'type': MessageType.REQUEST_FILE_LIST,
})
message = await read_message(self._tracker_reader)
assert MessageType(message['type']) == MessageType.REPLY_FILE_LIST
return message['file_list']
except (asyncio.IncompleteReadError, ConnectionError, RuntimeError):
logger.warning('Error occured during communications with tracker.')
if not self._tracker_writer.is_closing():
self._tracker_writer.close()
await self._tracker_writer.wait_closed()
raise
async def download(self, file, destination, reporthook=None):
# request for file list
file_list, _ = await self.list_file()
file_list = await self.list_file()
if not file_list or file not in file_list:
return False, 'Requested file {} does not exist.'.format(file)
raise FileNotFoundError()
download_manager = DownloadManager(self._tracker_reader, self._tracker_writer, file,
server_address=self._server_address, window_size=30)
@ -398,7 +406,6 @@ class Peer(MessageServer):
dest_file.seek(chunknum * Peer._CHUNK_SIZE, 0)
dest_file.write(data)
dest_file.flush()
if reporthook:
if reporthook:
finished_chunknum, file_size = download_manager.get_progress()
reporthook(finished_chunknum, Peer._CHUNK_SIZE, file_size)
@ -408,7 +415,7 @@ class Peer(MessageServer):
# change the temp file into the actual file
os.rename(destination + '.temp', destination)
return True, 'File {} dowloaded to {}'.format(file, destination)
return
async def _process_connection(self, reader, writer):
assert isinstance(reader, asyncio.StreamReader) and isinstance(writer, asyncio.StreamWriter)

View file

@ -1,6 +1,7 @@
from abc import abstractmethod
import logging
import asyncio
from p2pfs.core.exceptions import ServerRunningError
logger = logging.getLogger(__name__)
@ -22,6 +23,8 @@ class MessageServer:
return self._is_running
async def start(self, local_address, loop=None):
if self._is_running:
raise ServerRunningError()
logger.info('Start listening on {}'.format(local_address))
# start server
self._server = await asyncio.start_server(self.__new_connection, *local_address, loop=loop)
@ -29,7 +32,6 @@ class MessageServer:
# see https://docs.python.org/3.7/library/socket.html#socket-families
self._server_address = self._server.sockets[0].getsockname()[:2]
self._is_running = True
return True
async def stop(self):
if self._is_running:

View file

@ -25,6 +25,9 @@ class Tracker(MessageServer):
def peers(self):
return tuple(self._peers.values())
def address(self):
return self._server_address
def _reset(self):
self._peers = {}
self._file_list = {}
@ -58,8 +61,7 @@ class Tracker(MessageServer):
await write_message(writer, {
'type': MessageType.REPLY_PUBLISH,
'filename': message['filename'],
'result': False,
'message': 'Filename already existed on server!'
'result': False
})
else:
self._file_list[message['filename']] = message['fileinfo']
@ -72,7 +74,6 @@ class Tracker(MessageServer):
'type': MessageType.REPLY_PUBLISH,
'filename': message['filename'],
'result': True,
'message': 'Success'
})
logger.info('{} published file {} of {} chunks'
.format(self._peers[writer], message['filename'], message['fileinfo']['total_chunknum']))

View file

@ -1,6 +1,9 @@
import os
from asyncio import IncompleteReadError
from beautifultable import BeautifulTable
from p2pfs.core.tracker import Tracker
from p2pfs.core.peer import Peer
from p2pfs.core.exceptions import *
import p2pfs.ui.aiocmd as aiocmd
import logging
@ -19,7 +22,16 @@ class TrackerTerminal(aiocmd.Cmd):
if len(arg) < 2:
print('Not enough argument, start <host> <port>')
else:
await self._tracker.start((arg[0], int(arg[1])))
try:
await self._tracker.start((arg[0], int(arg[1])))
except ServerRunningError:
print('Tracker is already running.')
except OSError as e:
if e.errno == 48:
print('Cannot bind on address {}:{}.'.format(arg[0], arg[1]))
else:
raise
print('Tracker started listening on {}'.format(self._tracker.address()))
async def do_list_files(self, arg):
file_list_dict = self._tracker.file_list()
@ -60,33 +72,61 @@ class PeerTerminal(aiocmd.Cmd):
async def do_publish(self, arg):
arg = arg.split(' ')[0]
_, message = await self._peer.publish(arg)
print(message)
try:
await self._peer.publish(arg)
except FileNotFoundError:
print('File {} doesn\'t exist.'.format(arg))
except FileExistsError:
print('File {} already registered on tracker, use \'list_files\' to see.'.format(arg))
except TrackerNotConnectedError:
print('Tracker is not connected. Use \'connect <tracker_ip> <tracker_port> to connect.\' ')
except (ConnectionError, RuntimeError, IncompleteReadError):
print('Error occurred during communications with tracker, try to re-connect.')
except InProgressError:
print('Publish file {} already in progress.'.format(arg))
else:
print('File {} successfully published on tracker.'.format(arg))
async def do_set_delay(self, arg):
arg = arg.split(' ')[0]
if arg == '':
print('delay is required.')
print('Usage: set_delay <delay>, <delay> is required.')
else:
self._peer.set_delay(float(arg))
print('Delay {} successfully set.'.format(arg))
async def do_connect(self, arg):
arg = arg.split(' ')
if len(arg) < 2:
print('More arguments required! Usage: connect <address> <port>')
_, message = await self._peer.connect((arg[0], int(arg[1])))
print(message)
try:
await self._peer.connect((arg[0], int(arg[1])))
except AlreadyConnectedError as e:
print('Peer already connected to {}.'.format(e.address))
except ConnectionRefusedError:
print('Cannot connect to tracker.')
except (ConnectionError, RuntimeError, IncompleteReadError, AssertionError):
print('Error occurred during communications with tracker.')
else:
print('Successfully connected!')
async def do_list_files(self, arg):
file_list_dict, _ = await self._peer.list_file()
table = BeautifulTable()
table.row_separator_char = ''
try:
file_list_dict = await self._peer.list_file()
except TrackerNotConnectedError:
print('Tracker is not connected, try \'connect <tracker_ip> <tracker_port>\' to connect.')
except (ConnectionError, RuntimeError, IncompleteReadError):
print('Error occured during communications with tracker, '
'try \'connect <tracker_ip> <tracker_port>\' to re-connect.')
else:
table = BeautifulTable()
table.row_separator_char = ''
for filename, fileinfo in file_list_dict.items():
if table.column_count == 0:
table.column_headers = ['Filename'] + list(map(lambda x: x.capitalize(), tuple(fileinfo.keys())))
table.append_row((filename,) + tuple(fileinfo.values()))
print(table)
for filename, fileinfo in file_list_dict.items():
if table.column_count == 0:
table.column_headers = ['Filename'] + list(map(lambda x: x.capitalize(), tuple(fileinfo.keys())))
table.append_row((filename,) + tuple(fileinfo.values()))
print(table)
async def do_download(self, arg):
filename, destination, *_ = arg.split(' ')
@ -102,14 +142,27 @@ class PeerTerminal(aiocmd.Cmd):
last_chunk[0] = chunknum
return update_to
try:
with tqdm(unit='B', unit_scale=True, unit_divisor=1024, miniters=1, desc='Downloading ...') as t:
# no report hook if we need debug logging (too many logs will cause trouble to tqdm)
hook = tqdm_hook_wrapper(t) if logging.getLogger().getEffectiveLevel() != logging.DEBUG else None
with tqdm(unit='B', unit_scale=True, unit_divisor=1024, miniters=1, desc='Downloading ...') as t:
# no report hook if we need debug logging (too many logs will cause trouble to tqdm)
hook = tqdm_hook_wrapper(t) if logging.getLogger().getEffectiveLevel() != logging.DEBUG else None
_, message = await self._peer.download(filename, destination, reporthook=hook)
print(message)
await self._peer.download(filename, destination, reporthook=hook)
except TrackerNotConnectedError:
print('Tracker not connected, cannot pull initial chunk information.')
except FileNotFoundError:
print('File {} doesn\'t exist, please check filename and try again.'.format(filename))
except (IncompleteReadError, ConnectionError, RuntimeError):
print('Error occurred during transmission.')
except DownloadIncompleteError as e:
print('File chunk # {} doesn\'t exist on any peers, download isn\'t completed.'.format(e.chunknum))
# try to remove incomplete file
try:
os.remove(destination)
except FileNotFoundError:
pass
else:
print('File {} successfully downloaded to {}.'.format(filename, destination))
async def do_exit(self, arg):
await self._peer.stop()

View file

@ -27,12 +27,10 @@ def fmd5(fname):
async def setup_tracker_and_peers(peer_num, tracker_port):
tracker = Tracker()
peers = tuple(Peer() for _ in range(peer_num))
tracker_started = await tracker.start(('localhost', tracker_port))
await tracker.start(('localhost', tracker_port))
# spawn peers concurrently
peers_started = await asyncio.gather(*[peer.start(('localhost', 0)) for peer in peers])
assert tracker_started and all(peers_started)
peers_connected = await asyncio.gather(*[peer.connect(('localhost', tracker_port)) for peer in peers])
assert all(peers_connected)
await asyncio.gather(*[peer.start(('localhost', 0)) for peer in peers])
await asyncio.gather(*[peer.connect(('localhost', tracker_port)) for peer in peers])
return tracker, peers

View file

@ -1,7 +1,7 @@
import os
import asyncio
import pytest
from p2pfs import Peer, Tracker
from p2pfs import *
import time
from tests.conftest import fmd5, setup_tracker_and_peers, TEST_SMALL_FILE, TEST_LARGE_FILE, \
TEST_SMALL_FILE_SIZE, TEST_LARGE_FILE_SIZE, TEST_SMALL_FILE_1
@ -16,12 +16,11 @@ def cleanup_files(files):
pass
async def test_server_refused(unused_tcp_port):
async def test_connect_refused(unused_tcp_port):
peer = Peer()
started = await peer.start(('localhost', 0))
assert started
is_success, _ = await peer.connect(('localhost', unused_tcp_port))
assert not is_success
await peer.start(('localhost', 0))
with pytest.raises(ConnectionRefusedError):
await peer.connect(('localhost', unused_tcp_port))
async def test_start_stop(unused_tcp_port):
@ -36,10 +35,11 @@ async def test_publish_refuse(unused_tcp_port):
try:
with open('test_publish_refuse', 'wb') as fout:
fout.write(os.urandom(100))
is_success, _ = await peers[0].publish('test_publish_refuse')
assert is_success
is_success, _ = await peers[0].publish('test_publish_refuse')
assert not is_success
await peers[0].publish('test_publish_refuse')
with pytest.raises(FileNotFoundError):
await peers[0].publish('__not_existed_file')
with pytest.raises(FileExistsError):
await peers[0].publish('test_publish_refuse')
os.remove('test_publish_refuse')
finally:
await tracker.stop()
@ -50,20 +50,18 @@ async def test_publish(unused_tcp_port):
tracker, peers = await setup_tracker_and_peers(2, unused_tcp_port)
try:
# peer0 publishes a small_file and peer1 publishes a large file
is_success, _ = await peers[0].publish(TEST_SMALL_FILE)
assert is_success
await peers[0].publish(TEST_SMALL_FILE)
file_list = tracker.file_list()
assert TEST_SMALL_FILE in file_list
assert file_list[TEST_SMALL_FILE]['size'] == TEST_SMALL_FILE_SIZE
file_list, _ = await peers[1].list_file()
file_list= await peers[1].list_file()
assert TEST_SMALL_FILE in file_list
is_success, _ = await peers[1].publish(TEST_LARGE_FILE)
assert is_success
await peers[1].publish(TEST_LARGE_FILE)
file_list = tracker.file_list()
assert TEST_LARGE_FILE in file_list and TEST_SMALL_FILE in file_list
assert file_list[TEST_LARGE_FILE]['size'] == TEST_LARGE_FILE_SIZE
file_list, _ = await peers[0].list_file()
file_list = await peers[0].list_file()
assert TEST_LARGE_FILE in file_list and TEST_SMALL_FILE in file_list
finally:
await tracker.stop()
@ -75,20 +73,18 @@ async def test_download(unused_tcp_port):
to_cleanup = set()
try:
# peer0 publishes a small_file and peer1 publishes a large file
is_success, _ = await peers[0].publish(TEST_SMALL_FILE)
assert is_success
await peers[0].publish(TEST_SMALL_FILE)
file_list = tracker.file_list()
assert TEST_SMALL_FILE in file_list
assert file_list[TEST_SMALL_FILE]['size'] == TEST_SMALL_FILE_SIZE
file_list, _ = await peers[1].list_file()
file_list = await peers[1].list_file()
assert TEST_SMALL_FILE in file_list
is_success, _ = await peers[1].publish(TEST_LARGE_FILE)
assert is_success
await peers[1].publish(TEST_LARGE_FILE)
file_list = tracker.file_list()
assert TEST_LARGE_FILE in file_list and TEST_SMALL_FILE in file_list
assert file_list[TEST_LARGE_FILE]['size'] == TEST_LARGE_FILE_SIZE
file_list, _ = await peers[0].list_file()
file_list = await peers[0].list_file()
assert TEST_LARGE_FILE in file_list and TEST_SMALL_FILE in file_list
def reporthook(chunk_num, chunk_size, total_size):
@ -96,24 +92,21 @@ async def test_download(unused_tcp_port):
# download small file
to_cleanup.add('downloaded_' + TEST_SMALL_FILE)
is_success, _ = await peers[1].download(TEST_SMALL_FILE, 'downloaded_' + TEST_SMALL_FILE, reporthook=reporthook)
await peers[1].download(TEST_SMALL_FILE, 'downloaded_' + TEST_SMALL_FILE, reporthook=reporthook)
assert os.path.exists('downloaded_' + TEST_SMALL_FILE)
assert is_success
assert fmd5(TEST_SMALL_FILE) == fmd5('downloaded_' + TEST_SMALL_FILE)
assert reporthook.value == (1, 1000)
# download large file from single source
to_cleanup.add('downloaded_' + TEST_LARGE_FILE + '_0')
is_success, _ = await peers[0].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_0')
await peers[0].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_0')
assert os.path.exists('downloaded_' + TEST_LARGE_FILE + '_0')
assert is_success
assert fmd5(TEST_LARGE_FILE) == fmd5('downloaded_' + TEST_LARGE_FILE + '_0')
# download large file from multiple sources
to_cleanup.add('downloaded_' + TEST_LARGE_FILE + '_2')
is_success, _ = await peers[2].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_2')
await peers[2].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_2')
assert os.path.exists('downloaded_' + TEST_LARGE_FILE + '_2')
assert is_success
assert fmd5(TEST_LARGE_FILE) == fmd5('downloaded_' + TEST_LARGE_FILE + '_2')
# download large file concurrently
@ -121,12 +114,10 @@ async def test_download(unused_tcp_port):
to_cleanup.add('downloaded_' + TEST_LARGE_FILE + '_4')
download_task_1 = peers[3].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_3')
download_task_2 = peers[4].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_4')
(is_success_1, _), (is_success_2, _) = await asyncio.gather(download_task_1, download_task_2)
await asyncio.gather(download_task_1, download_task_2)
assert os.path.exists('downloaded_' + TEST_LARGE_FILE + '_3')
assert is_success_1
assert fmd5(TEST_LARGE_FILE) == fmd5('downloaded_' + TEST_LARGE_FILE + '_3')
assert os.path.exists('downloaded_' + TEST_LARGE_FILE + '_4')
assert is_success_2
assert fmd5(TEST_LARGE_FILE) == fmd5('downloaded_' + TEST_LARGE_FILE + '_4')
finally:
cleanup_files(to_cleanup)
@ -138,27 +129,24 @@ async def test_delay(unused_tcp_port):
tracker, peers = await setup_tracker_and_peers(2, unused_tcp_port)
# peer0 publishes a small_file and peer1 publishes a large file
is_success, _ = await peers[0].publish(TEST_SMALL_FILE)
assert is_success
await peers[0].publish(TEST_SMALL_FILE)
file_list = tracker.file_list()
assert TEST_SMALL_FILE in file_list
assert file_list[TEST_SMALL_FILE]['size'] == TEST_SMALL_FILE_SIZE
file_list, _ = await peers[1].list_file()
file_list = await peers[1].list_file()
assert TEST_SMALL_FILE in file_list
is_success, _ = await peers[0].publish(TEST_SMALL_FILE_1)
assert is_success
await peers[0].publish(TEST_SMALL_FILE_1)
file_list = tracker.file_list()
assert TEST_SMALL_FILE in file_list
assert file_list[TEST_SMALL_FILE_1]['size'] == TEST_SMALL_FILE_SIZE
file_list, _ = await peers[1].list_file()
file_list = await peers[1].list_file()
assert TEST_SMALL_FILE_1 in file_list
to_cleanup = set()
try:
# download small file
start = time.time()
to_cleanup.add('downloaded_' + TEST_SMALL_FILE)
result, msg = await peers[1].download(TEST_SMALL_FILE, 'downloaded_' + TEST_SMALL_FILE)
assert result is True
await peers[1].download(TEST_SMALL_FILE, 'downloaded_' + TEST_SMALL_FILE)
assert os.path.exists('downloaded_' + TEST_SMALL_FILE)
assert fmd5(TEST_SMALL_FILE) == fmd5('downloaded_' + TEST_SMALL_FILE)
@ -166,8 +154,7 @@ async def test_delay(unused_tcp_port):
start = time.time()
peers[0].set_delay(1)
to_cleanup.add('downloaded_' + TEST_SMALL_FILE_1)
result, msg = await peers[1].download(TEST_SMALL_FILE_1, 'downloaded_' + TEST_SMALL_FILE_1)
assert result is True
await peers[1].download(TEST_SMALL_FILE_1, 'downloaded_' + TEST_SMALL_FILE_1)
download_time_with_delay = time.time() - start
assert download_time_with_delay > download_time
peers[0].set_delay(0)
@ -180,8 +167,7 @@ async def test_delay(unused_tcp_port):
async def test_peer_disconnect(unused_tcp_port):
tracker, peers = await setup_tracker_and_peers(1, unused_tcp_port)
try:
is_suceess, _ = await peers[0].publish(TEST_SMALL_FILE)
assert is_suceess
await peers[0].publish(TEST_SMALL_FILE)
assert TEST_SMALL_FILE in tracker.file_list()
# stop peer and check the file has been removed
@ -197,15 +183,13 @@ async def test_peer_download_disconnect(unused_tcp_port):
tracker, peers = await setup_tracker_and_peers(3, unused_tcp_port)
to_cleanup = set()
try:
is_suceess, _ = await peers[0].publish(TEST_LARGE_FILE)
assert is_suceess
await peers[0].publish(TEST_LARGE_FILE)
assert TEST_LARGE_FILE in tracker.file_list()
# download large file from single source
to_cleanup.add('downloaded_' + TEST_LARGE_FILE + '_1')
is_success, _ = await peers[1].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_1')
await peers[1].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_1')
assert os.path.exists('downloaded_' + TEST_LARGE_FILE + '_1')
assert is_success
assert fmd5(TEST_LARGE_FILE) == fmd5('downloaded_' + TEST_LARGE_FILE + '_1')
peers[1].set_delay(0.1)
@ -217,10 +201,9 @@ async def test_peer_download_disconnect(unused_tcp_port):
await peer.stop()
# run download and stop peer task concurrently
to_cleanup.add('downloaded_' + TEST_LARGE_FILE + '_2')
(is_success, _), _ = await asyncio.gather(peers[2].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_2'),
await asyncio.gather(peers[2].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_2'),
stop_peer_after(peers[0], 1))
assert os.path.exists('downloaded_' + TEST_LARGE_FILE + '_2')
assert is_success
assert fmd5(TEST_LARGE_FILE) == fmd5('downloaded_' + TEST_LARGE_FILE + '_2')
finally:
cleanup_files(to_cleanup)
@ -232,8 +215,7 @@ async def test_tracker_download_disconnect(unused_tcp_port):
tracker, peers = await setup_tracker_and_peers(2, unused_tcp_port)
to_cleanup = set()
try:
is_suceess, _ = await peers[0].publish(TEST_LARGE_FILE)
assert is_suceess
await peers[0].publish(TEST_LARGE_FILE)
assert TEST_LARGE_FILE in tracker.file_list()
# stop tracker in the download process, should still successfully download since peer is still alive
@ -242,10 +224,9 @@ async def test_tracker_download_disconnect(unused_tcp_port):
await obj.stop()
# run download and stop task concurrently
to_cleanup.add('downloaded_' + TEST_LARGE_FILE + '_2')
(is_success, _), _ = await asyncio.gather(peers[1].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_2'),
await asyncio.gather(peers[1].download(TEST_LARGE_FILE, 'downloaded_' + TEST_LARGE_FILE + '_2'),
stop_after(tracker, 1))
assert os.path.exists('downloaded_' + TEST_LARGE_FILE + '_2')
assert is_success
assert fmd5(TEST_LARGE_FILE) == fmd5('downloaded_' + TEST_LARGE_FILE + '_2')
finally:
cleanup_files(to_cleanup)
@ -254,18 +235,14 @@ async def test_tracker_download_disconnect(unused_tcp_port):
async def test_peer_restart(unused_tcp_port):
tracker, peers = await setup_tracker_and_peers(1, unused_tcp_port)
is_success, _ = await peers[0].publish(TEST_SMALL_FILE)
assert is_success
await peers[0].publish(TEST_SMALL_FILE)
assert TEST_SMALL_FILE in tracker.file_list()
is_success, _ = await peers[0].disconnect()
await peers[0].disconnect()
await asyncio.sleep(0.5)
assert TEST_SMALL_FILE not in tracker.file_list()
assert is_success
is_success, _ = await peers[0].connect(('localhost', unused_tcp_port))
assert is_success
is_success, _ = await peers[0].publish(TEST_SMALL_FILE)
await peers[0].connect(('localhost', unused_tcp_port))
await peers[0].publish(TEST_SMALL_FILE)
assert TEST_SMALL_FILE in tracker.file_list()
assert is_success
async def test_tracker_restart(unused_tcp_port):
@ -275,5 +252,9 @@ async def test_tracker_restart(unused_tcp_port):
await asyncio.sleep(0.5)
is_all_connected = await asyncio.gather(*[peer.is_connected() for peer in peers])
assert not any(is_all_connected)
with pytest.raises(TrackerNotConnectedError):
await peers[0].publish(TEST_SMALL_FILE)
with pytest.raises(TrackerNotConnectedError):
await peers[1].list_file()
await tracker.start(('localhost', unused_tcp_port))
assert tracker.is_running()

View file

@ -11,26 +11,30 @@ async def test_terminals(unused_tcp_port, capsys):
tracker, peers = await setup_tracker_and_peers(2, unused_tcp_port)
peer_terminals = tuple(PeerTerminal(peer) for peer in peers)
tracker_terminal = TrackerTerminal(tracker)
await peer_terminals[1].do_connect('localhost {}'.format(unused_tcp_port))
out, _ = capsys.readouterr()
assert 'Already' in out
await peer_terminals[0].do_help('')
capsys.readouterr()
await peer_terminals[0].do_publish(TEST_SMALL_FILE)
out, _ = capsys.readouterr()
assert out == 'Success\n'
await peer_terminals[1].do_list_files('')
out, _ = capsys.readouterr()
assert TEST_SMALL_FILE in out
await tracker_terminal.do_list_files('')
out, _ = capsys.readouterr()
assert TEST_SMALL_FILE in out
await peer_terminals[1].do_set_delay('0')
await peer_terminals[1].do_download(TEST_SMALL_FILE + ' ' + 'downloaded_' + TEST_SMALL_FILE)
assert os.path.exists('downloaded_' + TEST_SMALL_FILE)
assert fmd5(TEST_SMALL_FILE) == fmd5('downloaded_' + TEST_SMALL_FILE)
out, _ = capsys.readouterr()
os.remove('downloaded_' + TEST_SMALL_FILE)
await tracker_terminal.do_exit('')
await asyncio.gather(*[terminal.do_exit('') for terminal in peer_terminals])
try:
await peer_terminals[1].do_connect('localhost {}'.format(unused_tcp_port))
out, _ = capsys.readouterr()
assert 'already' in out
await peer_terminals[0].do_help('')
capsys.readouterr()
await peer_terminals[0].do_publish(TEST_SMALL_FILE)
out, _ = capsys.readouterr()
assert 'success' in out
await peer_terminals[1].do_list_files('')
out, _ = capsys.readouterr()
assert TEST_SMALL_FILE in out
await tracker_terminal.do_list_files('')
out, _ = capsys.readouterr()
assert TEST_SMALL_FILE in out
await peer_terminals[1].do_set_delay('0')
await peer_terminals[1].do_download(TEST_SMALL_FILE + ' ' + 'downloaded_' + TEST_SMALL_FILE)
assert os.path.exists('downloaded_' + TEST_SMALL_FILE)
assert fmd5(TEST_SMALL_FILE) == fmd5('downloaded_' + TEST_SMALL_FILE)
out, _ = capsys.readouterr()
finally:
try:
os.remove('downloaded_' + TEST_SMALL_FILE)
except FileNotFoundError:
pass
await tracker_terminal.do_exit('')
await asyncio.gather(*[terminal.do_exit('') for terminal in peer_terminals])