Implement download method.
This commit is contained in:
parent
0d77334bff
commit
e087b1f7f4
1 changed files with 55 additions and 0 deletions
|
@ -5,6 +5,8 @@ import logging
|
||||||
import os.path
|
import os.path
|
||||||
import threading
|
import threading
|
||||||
from queue import Queue
|
from queue import Queue
|
||||||
|
import math
|
||||||
|
import pybase64
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -88,8 +90,42 @@ class Peer(MessageServer):
|
||||||
})
|
})
|
||||||
# wait until reply is ready
|
# wait until reply is ready
|
||||||
fileinfo, chunkinfo = self._download_results[file].get()
|
fileinfo, chunkinfo = self._download_results[file].get()
|
||||||
|
totalchunknum = math.ceil(fileinfo['size'] / (512 * 1024))
|
||||||
logger.debug('{}: {} ==> {}'.format(file, fileinfo, chunkinfo))
|
logger.debug('{}: {} ==> {}'.format(file, fileinfo, chunkinfo))
|
||||||
|
|
||||||
|
# TODO: refactor this block, make it prettier
|
||||||
|
for chunknum in range(totalchunknum):
|
||||||
|
for peer_id, possessed_chunks in chunkinfo.items():
|
||||||
|
# first one is a range
|
||||||
|
smallest, largest = possessed_chunks[0]
|
||||||
|
if smallest <= chunknum <= largest or chunknum in possessed_chunks:
|
||||||
|
# find the client based on peer id
|
||||||
|
peer = None
|
||||||
|
for client, client_id in self._peers.items():
|
||||||
|
if client_id == peer_id:
|
||||||
|
peer = client
|
||||||
|
# TODO: handle this later
|
||||||
|
assert peer is not None
|
||||||
|
# write the message to ask the chunk
|
||||||
|
self._write_message(peer, {
|
||||||
|
'type': MessageType.PEER_REQUEST_CHUNK,
|
||||||
|
'filename': file,
|
||||||
|
'chunknum': chunknum
|
||||||
|
})
|
||||||
|
break
|
||||||
|
|
||||||
|
with open(destination, 'wb') as dest_file:
|
||||||
|
dest_file.write(b'0' * fileinfo['size'])
|
||||||
|
dest_file.flush()
|
||||||
|
self._file_map[file] = destination
|
||||||
|
for i in range(totalchunknum):
|
||||||
|
number, raw_data = self._download_results[file].get()
|
||||||
|
dest_file.seek(number * (512 * 1024), 0)
|
||||||
|
dest_file.write(pybase64.b64decode(raw_data.encode('utf-8'), validate=True))
|
||||||
|
dest_file.flush()
|
||||||
|
progress(i + 1, totalchunknum)
|
||||||
|
logger.debug('Got {}\'s chunk # {}'.format(file, number))
|
||||||
|
|
||||||
with self._download_lock:
|
with self._download_lock:
|
||||||
del self._download_results[file]
|
del self._download_results[file]
|
||||||
|
|
||||||
|
@ -132,6 +168,25 @@ class Peer(MessageServer):
|
||||||
self._file_list_result.put(message['file_list'])
|
self._file_list_result.put(message['file_list'])
|
||||||
elif message['type'] == MessageType.REPLY_FILE_LOCATION:
|
elif message['type'] == MessageType.REPLY_FILE_LOCATION:
|
||||||
self._download_results[message['filename']].put((message['fileinfo'], message['chunkinfo']))
|
self._download_results[message['filename']].put((message['fileinfo'], message['chunkinfo']))
|
||||||
|
elif message['type'] == MessageType.PEER_REQUEST_CHUNK:
|
||||||
|
assert message['filename'] in self._file_map, 'File {} requested does not exist'.format(message['filename'])
|
||||||
|
local_file = self._file_map[message['filename']]
|
||||||
|
with open(local_file, 'rb') as f:
|
||||||
|
f.seek(message['chunknum'] * (512 * 1024), 0)
|
||||||
|
raw_data = f.read(512 * 1024)
|
||||||
|
self._write_message(client, {
|
||||||
|
'type': MessageType.PEER_REPLY_CHUNK,
|
||||||
|
'filename': message['filename'],
|
||||||
|
'chunknum': message['chunknum'],
|
||||||
|
'data': pybase64.b64encode(raw_data).decode('utf-8')
|
||||||
|
})
|
||||||
|
self._write_message(self._server_sock, {
|
||||||
|
'type': MessageType.REQUEST_CHUNK_REGISTER,
|
||||||
|
'filename': message['filename'],
|
||||||
|
'chunknum': message['chunknum']
|
||||||
|
})
|
||||||
|
elif message['type'] == MessageType.PEER_REPLY_CHUNK:
|
||||||
|
self._download_results[message['filename']].put((message['chunknum'], message['data']))
|
||||||
else:
|
else:
|
||||||
logger.error('Undefined message with type {}, full message: {}'.format(message['type'], message))
|
logger.error('Undefined message with type {}, full message: {}'.format(message['type'], message))
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue