Manage the threads.

This commit is contained in:
Yuxin Wang 2018-10-01 16:16:10 -04:00
parent 468df73f08
commit ca44603284

View file

@ -24,11 +24,16 @@ class MessageServer:
self._connections_lock = threading.Lock()
self._connections = set()
# manage the corresponding threads
self._threads = set()
def start(self):
self._sock.listen(5)
logger.info('Start listening on {}'.format(self._sock.getsockname()))
# put server listening into a thread
threading.Thread(target=self._listen).start()
thread = threading.Thread(target=self._listen)
thread.start()
self._threads.add(thread)
self._server_started()
def stop(self):
@ -39,6 +44,9 @@ class MessageServer:
for client in self._connections:
client.close()
for thread in self._threads:
thread.join()
def _listen(self):
try:
while self._is_running:
@ -50,7 +58,9 @@ class MessageServer:
with self._connections_lock:
self._client_connected(client)
self._connections.add(client)
threading.Thread(target=self._read_message, args=(client,)).start()
thread = threading.Thread(target=self._read_message, args=(client,))
thread.start()
self._threads.add(thread)
except socket.timeout:
# ignore timeout exception
pass
@ -77,12 +87,13 @@ class MessageServer:
def _connect(self, ip, port):
logger.info('Connecting to {}'.format((ip, port)))
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((ip, port))
client = socket.create_connection((ip, port))
client.settimeout(MessageServer._SOCKET_TIMEOUT)
with self._connections_lock:
self._connections.add(client)
threading.Thread(target=self._read_message, args=(client,)).start()
thread = threading.Thread(target=self._read_message, args=(client,))
thread.start()
self._threads.add(thread)
logger.info('Successfully connected to {} on {}'.format((ip, port), client.getsockname()))
return client
@ -111,6 +122,7 @@ class MessageServer:
assert client in self._connections
client.close()
self._connections.remove(client)
self._threads.remove(threading.current_thread())
self._client_closed(client)
def _write_message(self, client, message):