BaseServer handles the close of all connections.
This commit is contained in:
parent
b3eaf0c33c
commit
78d4bbb147
3 changed files with 13 additions and 9 deletions
|
@ -148,10 +148,9 @@ class Peer(MessageServer):
|
||||||
|
|
||||||
return True, 'File {} dowloaded to {}'.format(file, destination)
|
return True, 'File {} dowloaded to {}'.format(file, destination)
|
||||||
|
|
||||||
def exit(self):
|
def stop(self):
|
||||||
self._server_sock.close()
|
self._server_sock.close()
|
||||||
for client, _ in self._peers.items():
|
super().stop()
|
||||||
client.close()
|
|
||||||
|
|
||||||
def _server_started(self):
|
def _server_started(self):
|
||||||
logger.info('Requesting to register')
|
logger.info('Requesting to register')
|
||||||
|
|
|
@ -17,6 +17,8 @@ class MessageServer:
|
||||||
self._decompressor = zstd.ZstdDecompressor()
|
self._decompressor = zstd.ZstdDecompressor()
|
||||||
|
|
||||||
self._is_running = True
|
self._is_running = True
|
||||||
|
self._connections_lock = threading.Lock()
|
||||||
|
self._connections = []
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
self._sock.listen(5)
|
self._sock.listen(5)
|
||||||
|
@ -28,13 +30,17 @@ class MessageServer:
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self._is_running = False
|
self._is_running = False
|
||||||
self._sock.close()
|
self._sock.close()
|
||||||
|
for client in self._connections:
|
||||||
|
client.close()
|
||||||
|
|
||||||
def _listen(self):
|
def _listen(self):
|
||||||
try:
|
try:
|
||||||
while self._is_running:
|
while self._is_running:
|
||||||
client, address = self._sock.accept()
|
client, address = self._sock.accept()
|
||||||
logger.info('New connection from {}'.format(address))
|
logger.info('New connection from {}'.format(address))
|
||||||
|
with self._connections_lock:
|
||||||
self._client_connected(client)
|
self._client_connected(client)
|
||||||
|
self._connections.append(client)
|
||||||
threading.Thread(target=self._read_message, args=(client,)).start()
|
threading.Thread(target=self._read_message, args=(client,)).start()
|
||||||
except ConnectionAbortedError or OSError as e:
|
except ConnectionAbortedError or OSError as e:
|
||||||
if self._is_running:
|
if self._is_running:
|
||||||
|
@ -82,6 +88,9 @@ class MessageServer:
|
||||||
self._process_lock.release()
|
self._process_lock.release()
|
||||||
except EOFError:
|
except EOFError:
|
||||||
logger.warning('{} closed unexpectedly'.format(client.getpeername()))
|
logger.warning('{} closed unexpectedly'.format(client.getpeername()))
|
||||||
|
with self._connections_lock:
|
||||||
|
assert client in self._connections
|
||||||
|
self._connections.remove(client)
|
||||||
self._client_closed(client)
|
self._client_closed(client)
|
||||||
|
|
||||||
def _write_message(self, client, message):
|
def _write_message(self, client, message):
|
||||||
|
|
|
@ -24,10 +24,6 @@ class Tracker(MessageServer):
|
||||||
def peers(self):
|
def peers(self):
|
||||||
return tuple(self._peers.values())
|
return tuple(self._peers.values())
|
||||||
|
|
||||||
def exit(self):
|
|
||||||
for client, _ in self._peers.items():
|
|
||||||
client.close()
|
|
||||||
|
|
||||||
def _client_connected(self, client):
|
def _client_connected(self, client):
|
||||||
assert isinstance(client, socket.socket)
|
assert isinstance(client, socket.socket)
|
||||||
self._peers[client] = None
|
self._peers[client] = None
|
||||||
|
|
Loading…
Reference in a new issue