Add timeout to prevent forever waiting.
This commit is contained in:
parent
2450554ffb
commit
468df73f08
1 changed files with 25 additions and 12 deletions
|
@ -9,6 +9,8 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
|
||||
class MessageServer:
|
||||
_SOCKET_TIMEOUT = 5
|
||||
|
||||
def __init__(self, host, port):
|
||||
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self._sock.bind((host, port))
|
||||
|
@ -40,12 +42,18 @@ class MessageServer:
|
|||
def _listen(self):
|
||||
try:
|
||||
while self._is_running:
|
||||
try:
|
||||
client, address = self._sock.accept()
|
||||
# add timeout to prevent waiting forever
|
||||
client.settimeout(MessageServer._SOCKET_TIMEOUT)
|
||||
logger.info('New connection from {}'.format(address))
|
||||
with self._connections_lock:
|
||||
self._client_connected(client)
|
||||
self._connections.add(client)
|
||||
threading.Thread(target=self._read_message, args=(client,)).start()
|
||||
except socket.timeout:
|
||||
# ignore timeout exception
|
||||
pass
|
||||
except (ConnectionAbortedError, OSError) as e:
|
||||
if self._is_running:
|
||||
# if exception occurred during normal execution
|
||||
|
@ -55,19 +63,23 @@ class MessageServer:
|
|||
|
||||
@staticmethod
|
||||
def __recvall(sock, n):
|
||||
"""helper function to recv n bytes or return None if EOF is hit"""
|
||||
"""helper function to recv n bytes or raise exception if EOF is hit"""
|
||||
data = b''
|
||||
while len(data) < n:
|
||||
try:
|
||||
packet = sock.recv(n - len(data))
|
||||
if not packet:
|
||||
raise EOFError('peer socket closed')
|
||||
data += packet
|
||||
except socket.timeout:
|
||||
pass
|
||||
return data
|
||||
|
||||
def _connect(self, ip, port):
|
||||
logger.info('Connecting to {}'.format((ip, port)))
|
||||
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
client.connect((ip, port))
|
||||
client.settimeout(MessageServer._SOCKET_TIMEOUT)
|
||||
with self._connections_lock:
|
||||
self._connections.add(client)
|
||||
threading.Thread(target=self._read_message, args=(client,)).start()
|
||||
|
@ -94,9 +106,10 @@ class MessageServer:
|
|||
self._process_message(client, msg)
|
||||
except (EOFError, OSError):
|
||||
if self._is_running:
|
||||
logger.warning('{} closed unexpectedly'.format(client.getpeername()))
|
||||
logger.warning('{} closed'.format(client.getpeername()))
|
||||
with self._connections_lock:
|
||||
assert client in self._connections
|
||||
client.close()
|
||||
self._connections.remove(client)
|
||||
self._client_closed(client)
|
||||
|
||||
|
|
Loading…
Reference in a new issue