The main idea of having a good project is to have good basis. Below I am presenting you methods for reliable UART communication.
Repositories
RX buffer working description
def serial_listener(self):
"""
Main serial listener function
"""
while not self._stop_event.is_set():
read = self._serial.readline()
if read != bytes():
try:
read = read.strip().decode()
self._rx_buf.append(read)
except BaseException as e:
Logger.log_error(e)
Logger.log_debug(f'Serial output: {self._rx_buf}')
The function is constantly listening if any message is received. It is possible to stop the thread by seting the stop event, called by another function. Might be useful for future usage (I am using it as well in tests).
Received messages are appending into the rx_buf list. To avoid any unnecessary data in the buffer, it is verified if it is string object. In this case I want only to have decodable messages.
def start_serial_listen_thread(self):
"""
Run serial listener in another thread
"""
if self._stop_event.is_set():
self._stop_event.clear()
if not self._serial.isOpen():
self._serial.open()
self._serial_thread = threading.Thread(target=self.serial_listener, args=())
self._serial_thread.start()
After starting main program, thread for listening RX is called.
TX messages and retreiving data from the RX buffer
def send_cmd(self, command: str = None, response: str = None, timeout: float = 2.0) -> bool:
"""
Send command and await for expected answer if defined, else wait for timeout
Parameters
----------
command: str
Command send via UART
response: str
Expected response to find in serial output
timeout: float
Timeout for expecting answer or only wait time, default is 2.0
Returns
-------
bool
True if expected answer occurred else False
"""
try:
# Exit function with warning if no command to send defined
if not command:
Logger.log_warning(f'No command to send defined')
return False
Logger.log_info(f'Sent command: {command};'
f'Expecting response in answer: {response};'
f'Timeout: {timeout};')
self._serial.write(f'{command}\r\n'.encode('ascii'))
time.sleep(config.MESSAGE_PROPAGATION_TIME)
timeout = time.time() + timeout
# Wait for occurrence of expected string, if not return False
if response and timeout:
while not any(response in s for s in self.get_rx_buf()) and time.time() < timeout:
pass
if time.time() >= timeout:
Logger.log_warning(f'Timeout occurred while sending: {command} and waiting for: {response}')
return False
# If response argument is present, return True
if any(response in s for s in self.get_rx_buf()):
return True
return False
# If only timeout is defined, wait max time to go further
if not response and timeout:
while time.time() < timeout:
pass
return True
# Before exit, always clear rx_buf
finally:
self.set_rx_buf([])
send_cmd function is a wrapper for sending message and awaiting for expected response (readed out form RX buffer) in expected timeout. Of course if expected response will be found in RX buffer, function will return True without wasting more time, otherwise it will return False.
The function has the possibility to send only a message without awaiting expected response but then it has to wait till declared timeout will be achieved (default one is 2 seconds).
Before return anything, RX buffer is cleared.
def query_cmd(self, command: str = None, final_response: str = None, timeout: float = 2.0) -> list:
"""
Send command and await for final_response if defined, else wait for timeout
Parameters
----------
command: str
Command send via UART
final_response: str
Expected final response to find in serial output
timeout: float
Timeout for expecting answer or only wait time, default is 2.0
Returns
-------
list
List of returned values if final response occurred else empty list
"""
try:
# Exit function with warning if no command to send defined
if not command:
Logger.log_warning(f'No command to send defined')
return []
Logger.log_info(f'Sent command: {command};'
f'Expecting final response in answer: {final_response}; '
f'Timeout: {timeout};')
self._serial.write(f'{command}\r\n'.encode('ascii'))
time.sleep(config.MESSAGE_PROPAGATION_TIME)
timeout = time.time() + timeout
# Wait for occurrence of expected string, if not return empty list
if final_response and timeout:
while not any(final_response in s for s in self.get_rx_buf()) and time.time() < timeout:
pass
if time.time() >= timeout:
Logger.log_warning(f'Timeout occurred while sending: {command} and waiting for: {final_response}')
return []
# If final response argument is present, return True
if any(final_response in s for s in self.get_rx_buf()):
return self.get_rx_buf()
return []
# If only timeout is defined, wait max time to go further
if not final_response and timeout:
while time.time() < timeout:
pass
return self.get_rx_buf()
# Before exit, always clear rx_buf
finally:
self.set_rx_buf([])
query_cmd function is very similar to send_cmd with one main difference, it returns RX buffer instead of True or False. In this case, if any expected final return string will be not in answer or timeout will be acheived, it will return empty list.
The function will send a message without awaiting expected response but then it has to wait till declared timeout will be achieved (default one is 2 seconds). After that it returns everything what is in RX buffer.
Before return anything, RX buffer is cleared.
Testing
Tests are based on module which I am currently playing with - SIM7000E and are covering most of the scenarios of intentional and unintentional usage of two methods send_cmd and query_cmd. To run the tests pytest module has to be installed.
Tests were performed on Raspberry Pi 3 and CP2102 USB UART adapter.
Additional Information
Description how to configure UART on Raspberry and how to run tests