From e087b1f7f45bd0becb07e91968e695226707e2cb Mon Sep 17 00:00:00 2001 From: Yuxin Wang Date: Wed, 26 Sep 2018 22:20:22 -0400 Subject: [PATCH] Implement download method. --- p2pfs/core/peer.py | 55 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/p2pfs/core/peer.py b/p2pfs/core/peer.py index c0d29a8..429f16c 100644 --- a/p2pfs/core/peer.py +++ b/p2pfs/core/peer.py @@ -5,6 +5,8 @@ import logging import os.path import threading from queue import Queue +import math +import pybase64 logger = logging.getLogger(__name__) @@ -88,8 +90,42 @@ class Peer(MessageServer): }) # wait until reply is ready fileinfo, chunkinfo = self._download_results[file].get() + totalchunknum = math.ceil(fileinfo['size'] / (512 * 1024)) logger.debug('{}: {} ==> {}'.format(file, fileinfo, chunkinfo)) + # TODO: refactor this block, make it prettier + for chunknum in range(totalchunknum): + for peer_id, possessed_chunks in chunkinfo.items(): + # first one is a range + smallest, largest = possessed_chunks[0] + if smallest <= chunknum <= largest or chunknum in possessed_chunks: + # find the client based on peer id + peer = None + for client, client_id in self._peers.items(): + if client_id == peer_id: + peer = client + # TODO: handle this later + assert peer is not None + # write the message to ask the chunk + self._write_message(peer, { + 'type': MessageType.PEER_REQUEST_CHUNK, + 'filename': file, + 'chunknum': chunknum + }) + break + + with open(destination, 'wb') as dest_file: + dest_file.write(b'0' * fileinfo['size']) + dest_file.flush() + self._file_map[file] = destination + for i in range(totalchunknum): + number, raw_data = self._download_results[file].get() + dest_file.seek(number * (512 * 1024), 0) + dest_file.write(pybase64.b64decode(raw_data.encode('utf-8'), validate=True)) + dest_file.flush() + progress(i + 1, totalchunknum) + logger.debug('Got {}\'s chunk # {}'.format(file, number)) + with self._download_lock: del self._download_results[file] @@ -132,6 +168,25 @@ class Peer(MessageServer): self._file_list_result.put(message['file_list']) elif message['type'] == MessageType.REPLY_FILE_LOCATION: self._download_results[message['filename']].put((message['fileinfo'], message['chunkinfo'])) + elif message['type'] == MessageType.PEER_REQUEST_CHUNK: + assert message['filename'] in self._file_map, 'File {} requested does not exist'.format(message['filename']) + local_file = self._file_map[message['filename']] + with open(local_file, 'rb') as f: + f.seek(message['chunknum'] * (512 * 1024), 0) + raw_data = f.read(512 * 1024) + self._write_message(client, { + 'type': MessageType.PEER_REPLY_CHUNK, + 'filename': message['filename'], + 'chunknum': message['chunknum'], + 'data': pybase64.b64encode(raw_data).decode('utf-8') + }) + self._write_message(self._server_sock, { + 'type': MessageType.REQUEST_CHUNK_REGISTER, + 'filename': message['filename'], + 'chunknum': message['chunknum'] + }) + elif message['type'] == MessageType.PEER_REPLY_CHUNK: + self._download_results[message['filename']].put((message['chunknum'], message['data'])) else: logger.error('Undefined message with type {}, full message: {}'.format(message['type'], message))