Implement publish and list file.
This commit is contained in:
parent
df42d117d7
commit
21f7fdc8d7
2 changed files with 81 additions and 1 deletions
|
@ -3,6 +3,7 @@ from p2pfs.core.message import MessageType
|
|||
import socket
|
||||
import logging
|
||||
import os.path
|
||||
import threading
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
|
@ -10,6 +11,14 @@ class PeerServer(MessageServer):
|
|||
def __init__(self, host, port, server, server_port):
|
||||
super().__init__(host, port)
|
||||
self._peers = {}
|
||||
# (remote filename) <-> (local filename)
|
||||
self._file_map = {}
|
||||
|
||||
self._publish_locks = {}
|
||||
self._publish_results = {}
|
||||
|
||||
self._list_file_lock = threading.Lock()
|
||||
self._file_list = {}
|
||||
try:
|
||||
self._server_sock = self._connect(server, server_port)
|
||||
except ConnectionRefusedError:
|
||||
|
@ -17,8 +26,46 @@ class PeerServer(MessageServer):
|
|||
exit(1)
|
||||
|
||||
def publish(self, file):
|
||||
if file in self._publish_locks and self._publish_locks[file].locked():
|
||||
return False, 'Publish file {} already in progress.'.format(file)
|
||||
if not os.path.exists(file):
|
||||
return False
|
||||
return False, 'File {} doesn\'t exist'.format(file)
|
||||
path, filename = os.path.split(file)
|
||||
self._write_message(self._server_sock, {
|
||||
'type': MessageType.REQUEST_PUBLISH,
|
||||
'filename': filename,
|
||||
'size': os.stat(file).st_size,
|
||||
'chunkinfo': []
|
||||
})
|
||||
lock = threading.Lock()
|
||||
self._publish_locks[filename] = lock
|
||||
lock.acquire()
|
||||
lock.acquire()
|
||||
del self._publish_locks[filename]
|
||||
is_success, message = self._publish_results[filename]
|
||||
del self._publish_results[filename]
|
||||
if is_success:
|
||||
self._file_map[filename] = file
|
||||
logger.info('File {} published on server with name {}'.format(file, filename))
|
||||
else:
|
||||
logger.info('File {} failed to publish, {}'.format(file, message))
|
||||
return is_success, message
|
||||
|
||||
def list_file(self):
|
||||
# there's a request file list packet on the way
|
||||
if self._list_file_lock.locked():
|
||||
return self._file_list
|
||||
self._write_message(self._server_sock, {
|
||||
'type': MessageType.REQUEST_FILE_LIST,
|
||||
})
|
||||
self._list_file_lock.acquire()
|
||||
self._list_file_lock.acquire()
|
||||
return self._file_list
|
||||
|
||||
def download(self, file, destination, progress):
|
||||
if file not in self._file_list.keys():
|
||||
return False, 'Requested file {} does not exist'.format(file)
|
||||
return True, 'File {} dowloaded to {} completed'.format(file, destination)
|
||||
|
||||
def _server_started(self):
|
||||
logger.info('Requesting to register')
|
||||
|
@ -46,6 +93,12 @@ class PeerServer(MessageServer):
|
|||
logger.info('Greetings from peer with id {}'.format(message['id']))
|
||||
assert client in self._peers
|
||||
self._peers[client] = message['id']
|
||||
elif message['type'] == MessageType.REPLY_PUBLISH:
|
||||
self._publish_results[message['filename']] = (message['result'], message['message'])
|
||||
self._publish_locks[message['filename']].release()
|
||||
elif message['type'] == MessageType.REPLY_FILE_LIST:
|
||||
self._file_list = message['file_list']
|
||||
self._list_file_lock.release()
|
||||
|
||||
logger.debug(self._peers.values())
|
||||
|
||||
|
|
|
@ -11,6 +11,8 @@ class CentralServer(MessageServer):
|
|||
super().__init__(host, port)
|
||||
self._peers = {}
|
||||
|
||||
self._file_list = {}
|
||||
|
||||
def _client_connected(self, client):
|
||||
assert isinstance(client, socket.socket)
|
||||
self._peers[client] = None
|
||||
|
@ -28,6 +30,31 @@ class CentralServer(MessageServer):
|
|||
})
|
||||
self._peers[client] = (id, message['address'])
|
||||
logger.debug(self._peers.values())
|
||||
elif message['type'] == MessageType.REQUEST_PUBLISH:
|
||||
if message['filename'] in self._file_list:
|
||||
self._write_message(client, {
|
||||
'type': MessageType.REPLY_PUBLISH,
|
||||
'filename': message['filename'],
|
||||
'result': False,
|
||||
'message': 'Filename already existed on server!'
|
||||
})
|
||||
else:
|
||||
self._file_list[message['filename']] = {
|
||||
'size': message['size'],
|
||||
'chunkinfo': message['chunkinfo']
|
||||
}
|
||||
self._write_message(client, {
|
||||
'type': MessageType.REPLY_PUBLISH,
|
||||
'filename': message['filename'],
|
||||
'result': True,
|
||||
'message': 'Success'
|
||||
})
|
||||
logger.info('{} published file {}'.format(self._peers[client], message['filename']))
|
||||
elif message['type'] == MessageType.REQUEST_FILE_LIST:
|
||||
self._write_message(client, {
|
||||
'type': MessageType.REPLY_FILE_LIST,
|
||||
'file_list': self._file_list
|
||||
})
|
||||
|
||||
def _client_closed(self, client):
|
||||
assert isinstance(client, socket.socket)
|
||||
|
|
Loading…
Reference in a new issue