Skip to content
Snippets Groups Projects
__init__.py 4.13 KiB
import argparse
import serial
import logging

class SA868(object):

    class AtException(Exception):
        pass

    class CommandException(Exception):
        pass

    def __init__(self, port):
        self._serial = serial.Serial(port, 9200, timeout=1)
        self._serial.RTS = False
        self._logger = logging.getLogger("SA868")

    def _check_freq_range(self, freq):
        if (freq >= 400 and freq <= 480) or (freq >= 134 and freq <=174):
            return True
        else:
            raise Exception("Frequency out of bounds")

    def _check_squelsh_range(self, squelsh):
        if squelsh >= 0 and squelsh <= 8:
            return True
        else:
            raise Exception("Squelsh out of range")

    def _check_volume_range(self, volume):
        if volume >= 0 and volume <=8:
            return True
        else:
            raise Exception("Volume out of range")

    def _command_return(self, command, expected_return):
        self._logger.info("TRX <-- ")
        command = "{}\r\n".format(command)
        self._logger.info(command)
        self._serial.write(command.encode())
        ret = self._serial.readline()
        ret = bytes([x for x in ret if x is not 0x00])
        ret = ret.decode().strip()
        self._logger.info("TRX --> ")
        self._logger.info(ret)
        if ret.startswith(expected_return):
            return ret.split(expected_return)[-1]
        else:
            raise SA868.AtException("Device returned wrong answer: '{}'".format(ret))

    def _command(self, command, expected_return):
        ret = self._command_return(command, expected_return)

        if ret == "0":
            return True
        else:
            raise SA868.CommandException("Command returned non-zero: {}".format(ret))

    def ping(self):
        self._command("AT+DMOCONNECT", "+DMOCONNECT:")

    def sweep(self, frequency):
        self._check_freq_range(frequency)

        return int(self._command_return(
            "S+{:8.4f}".format(frequency),
            "S="))

    def set_group(self, rx_freq, tx_freq, squelsh):
        self._check_freq_range(rx_freq)
        self._check_freq_range(tx_freq)
        self._check_squelsh_range(squelsh)

        self._command("AT+DMOSETGROUP=0,{:8.4f},{:8.4f},0000,{},0000".format(tx_freq, rx_freq, int(squelsh)),
                      "+DMOSETGROUP:")

    def set_volume(self, volume):
        self._check_volume_range(volume)

        self._command("AT+DMOSETVOLUME={}".format(volume), "+DMOSETVOLUME:")

    def get_rssi(self):
        return int(self._command_return("AT+RSSI?","RSSI="))

    def set_filter(self, pre_de_emphasis, lowpass, highpass):
        self._command("AT+SETFILTER={},{},{}".format(
            0 if pre_de_emphasis else 1,
            0 if lowpass else 1,
            0 if highpass else 1),
        "+DMOSETFILTER:")


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("tty", help="/dev/tty* to use")
    parser.add_argument("--set-rxtx", help="Set Rx and Tx frequencies and squelsh", nargs=3, metavar=("rx_freq", "tx_freq", "squelsh"), type=float)
    parser.add_argument("--set-volume", help="Sets the speaker volume to the given level", type=int)
    parser.add_argument("--set-filter", help="Sets the pre-/de-emphasis, highpass- and lowpass-filter. Use {off, on}", nargs=3, metavar=("pre_de_emphasis", "lowpass", "highpass"))
    parser.add_argument("--get-rssi", help="Prints the current RSSI to STDOUT", action="store_true")
    parser.add_argument("--no-ping", help="Disables the initial ping of the device", action="store_true")
    parser.add_argument("-v", help="Be more verbose", action="store_true")
    args = parser.parse_args()

    if args.v:
        logging.basicConfig(level= logging.INFO)
    else:
        logging.basicConfig(level= logging.WARN)

    s = SA868(args.tty)
    if not args.no_ping:
        s.ping()

    if args.set_rxtx:
        s.set_group(*args.set_rxtx)

    if args.set_volume:
        s.set_volume(args.set_volume)

    if args.set_filter:
        filter = [True if x in ["1", "on"] else False for x in args.set_filter]
        s.set_filter(*filter)

    if args.get_rssi:
        print(s.get_rssi())

if __name__ == "__main__":
    main()