Module rpps.inter.interface

Defines interface bindings

Classes

class Interface (meta, pipeline, baud=None)

Parent interface class

Expand source code
class Interface:
    """Parent interface class"""
    def __init__(self, meta, pipeline, baud=None):
        self.meta = meta
        self.pipeline = pipeline

        self._r_thread = threading.Thread(target=self.read)
        self._w_thread = threading.Thread(target=self.baud)
        self._r_buffer = []

        if meta.inter.get("Baud", None) is None:
            meta.inter = InterMeta()
            meta.inter["Baud"] = baud

        if meta.inter.fields.get("Baud", None) is None:
            raise Exception("No baud rate specified!")

    def start(self):
        """Start read/write threads"""
        self._r_thread.start()
        self._w_thread.start()

    def read(self):
        """Check and read new data"""
        while True:
            data = self._read()
            if data is None:
                continue
            input(f"Got data {data}!")
            self.pipeline.dec(data)
            self._r_buffer.insert(0, data)
            print(f"Buffer is {len(self._r_buffer)}: {self._r_buffer[0]}")

    def baud(self):
        """Write data at baud rate"""
        while True:
            time.sleep(self.meta.inter["Baud"]/1000)
            # self._write(self.pipeline.enc(Stream.from_bytes(0x7E.to_bytes())))

    def write(self, data):
        """Write data to interface"""
        self._write(self.pipeline.enc(data))

    def _read(self):
        return None

    def _write(self, data):
        return None

Subclasses

Methods

def baud(self)

Write data at baud rate

def read(self)

Check and read new data

def start(self)

Start read/write threads

def write(self, data)

Write data to interface