Use set to manage the connections.

This commit is contained in:
Yuxin Wang 2018-10-01 00:12:58 -04:00
parent 54bc2c901a
commit e6e996abf9

View file

@ -17,8 +17,10 @@ class MessageServer:
self._decompressor = zstd.ZstdDecompressor()
self._is_running = True
# manage the connections
self._connections_lock = threading.Lock()
self._connections = []
self._connections = set()
def start(self):
self._sock.listen(5)
@ -40,7 +42,7 @@ class MessageServer:
logger.info('New connection from {}'.format(address))
with self._connections_lock:
self._client_connected(client)
self._connections.append(client)
self._connections.add(client)
threading.Thread(target=self._read_message, args=(client,)).start()
except ConnectionAbortedError or OSError as e:
if self._is_running:
@ -86,12 +88,13 @@ class MessageServer:
self._process_lock.acquire()
self._process_message(client, msg)
self._process_lock.release()
except EOFError:
logger.warning('{} closed unexpectedly'.format(client.getpeername()))
with self._connections_lock:
assert client in self._connections
self._connections.remove(client)
self._client_closed(client)
except (EOFError, OSError):
if self._is_running:
logger.warning('{} closed unexpectedly'.format(client.getpeername()))
with self._connections_lock:
assert client in self._connections
self._connections.remove(client)
self._client_closed(client)
def _write_message(self, client, message):
assert isinstance(client, socket.socket)