...

Multithreaded functions for UART

Custom pyserial implementation for communication with SIM7000E module.


The main idea of having a good project is to have good basis. Below I am presenting you methods for reliable UART communication.

Repositories


 

RX buffer working description


def serial_listener(self):
    """
    Main serial listener function
    """

    while not self._stop_event.is_set():
        read = self._serial.readline()
        if read != bytes():
            try:
                read = read.strip().decode()
                self._rx_buf.append(read)
            except BaseException as e:
                Logger.log_error(e)

            Logger.log_debug(f'Serial output: {self._rx_buf}')

The function is constantly listening if any message is received. It is possible to stop the thread by seting the stop event, called by another function. Might be useful for future usage (I am using it as well in tests).

Received messages are appending into the rx_buf list. To avoid any unnecessary data in the buffer, it is verified if it is string object. In this case I want only to have decodable messages.

def start_serial_listen_thread(self):
    """
    Run serial listener in another thread
    """
    
    if self._stop_event.is_set():
        self._stop_event.clear()

    if not self._serial.isOpen():
        self._serial.open()

    self._serial_thread = threading.Thread(target=self.serial_listener, args=())
    self._serial_thread.start()

After starting main program, thread for listening RX is called.

 

TX messages and retreiving data from the RX buffer


def send_cmd(self, command: str = None, response: str = None, timeout: float = 2.0) -> bool:
    """
    Send command and await for expected answer if defined, else wait for timeout

    Parameters
    ----------
    command: str
        Command send via UART
    response: str
        Expected response to find in serial output
    timeout: float
        Timeout for expecting answer or only wait time, default is 2.0

    Returns
    -------
    bool
        True if expected answer occurred else False
    """

    try:
        # Exit function with warning if no command to send defined
        if not command:
            Logger.log_warning(f'No command to send defined')
            return False

        Logger.log_info(f'Sent command: {command};'
                        f'Expecting response in answer: {response};'
                        f'Timeout: {timeout};')

        self._serial.write(f'{command}\r\n'.encode('ascii'))
        time.sleep(config.MESSAGE_PROPAGATION_TIME)

        timeout = time.time() + timeout

        # Wait for occurrence of expected string, if not return False
        if response and timeout:
            while not any(response in s for s in self.get_rx_buf()) and time.time() < timeout:
                pass

            if time.time() >= timeout:
                Logger.log_warning(f'Timeout occurred while sending: {command} and waiting for: {response}')
                return False

            # If response argument is present, return True
            if any(response in s for s in self.get_rx_buf()):
                return True
            return False

        # If only timeout is defined, wait max time to go further
        if not response and timeout:
            while time.time() < timeout:
                pass
            return True

    # Before exit, always clear rx_buf
    finally:
        self.set_rx_buf([])

send_cmd function is a wrapper for sending message and awaiting for expected response (readed out form RX buffer) in expected timeout. Of course if expected response will be found in RX buffer, function will return True without wasting more time, otherwise it will return False.

The function has the possibility to send only a message without awaiting expected response but then it has to wait till declared timeout will be achieved (default one is 2 seconds).

Before return anything, RX buffer is cleared.

def query_cmd(self, command: str = None, final_response: str = None, timeout: float = 2.0) -> list:
    """
    Send command and await for final_response if defined, else wait for timeout

    Parameters
    ----------
    command: str
        Command send via UART
    final_response: str
        Expected final response to find in serial output
    timeout: float
        Timeout for expecting answer or only wait time, default is 2.0

    Returns
    -------
    list
        List of returned values if final response occurred else empty list
    """

    try:
        # Exit function with warning if no command to send defined
        if not command:
            Logger.log_warning(f'No command to send defined')
            return []

        Logger.log_info(f'Sent command: {command};'
                        f'Expecting final response in answer: {final_response}; '
                        f'Timeout: {timeout};')

        self._serial.write(f'{command}\r\n'.encode('ascii'))
        time.sleep(config.MESSAGE_PROPAGATION_TIME)

        timeout = time.time() + timeout

        # Wait for occurrence of expected string, if not return empty list
        if final_response and timeout:
            while not any(final_response in s for s in self.get_rx_buf()) and time.time() < timeout:
                pass

            if time.time() >= timeout:
                Logger.log_warning(f'Timeout occurred while sending: {command} and waiting for: {final_response}')
                return []

            # If final response argument is present, return True
            if any(final_response in s for s in self.get_rx_buf()):
                return self.get_rx_buf()
            return []

        # If only timeout is defined, wait max time to go further
        if not final_response and timeout:
            while time.time() < timeout:
                pass
            return self.get_rx_buf()

    # Before exit, always clear rx_buf
    finally:
        self.set_rx_buf([])

query_cmd function is very similar to send_cmd with one main difference, it returns RX buffer instead of True or False. In this case, if any expected final return string will be not in answer or timeout will be acheived, it will return empty list.

The function will send a message without awaiting expected response but then it has to wait till declared timeout will be achieved (default one is 2 seconds). After that it returns everything what is in RX buffer.

Before return anything, RX buffer is cleared.

 

Testing


Tests are based on module which I am currently playing with - SIM7000E and are covering most of the scenarios of intentional and unintentional usage of two methods send_cmd and query_cmd. To run the tests pytest module has to be installed.

Tests were performed on Raspberry Pi 3 and CP2102 USB UART adapter.

 

Additional Information


Description how to configure UART on Raspberry and how to run tests