Module pysdrlib.file.file

Wrapper for processing files

Classes

class File
Expand source code
class File:
    def __init__(self):
        self.log = logger.new("device.File")
        self.fmt: Formats = None # type: ignore
        self.path: Path = None # type: ignore
        self.count = 8192
        self.cur_samp = 0
        self.max_samp = 0
        self._fsize = 0
        self.f = None

    def __str__(self):
        return f"File ({self.cur_samp}/{self.max_samp},{self.fmt.name}): {self.path}"

    def open(self):
        self.log.trace("open()")
        if self.f is not None:
            self.log.warning("File is already open!")
        else:
            self.f = open(self.path, "rb")
            self.f.seek(self.cur_samp // self.fmt.bytes)
    def close(self):
        self.log.trace("close()")
        if self.f is None:
            self.log.warning("File is already closed!")
        else:
            self.f.close()

    def get_samples(self):
        return self.next(self.count)

    # File specific interface
    def set_sample_count(self, count: int):
        self.log.trace("set_sample_count(%s)", count)
        self.count = count
    def set_fmt(self, fmt: str):
        self.log.trace("set_fmt(%s)", fmt)
        self.fmt = Formats[fmt]
        if not self._fsize == 0:
            self.max_samp = self._fsize // self.fmt.bytes
    def get_fmt(self):
        return self.fmt
    def set_path(self, path):
        self.log.trace("set_path(%s)", path)
        if not isinstance(path, Path):
            path = Path(path)
        self.path = path
        self._fsize = path.stat().st_size
        self.max_samp = self._fsize // self.fmt.bytes
    def get_path(self):
        return self.path


    def reset(self):
        self.log.trace("reset()")
        self.cur_samp = 0
        if self.f is not None:
            self.f.seek(0)
    def next(self, count: int):
        self.log.trace("next()")
        if self.cur_samp + count > self.max_samp:
            raise err.Overflow(f"{self.cur_samp}+{count} > {self.max_samp}")
        samps = self._read(count)
        if len(samps) < count:
            raise err.Underflow(f"self._read({count}) at idx {self.cur_samp}, got {len(samps)}")
        self.cur_samp += count
        return samps
    def prev(self, count: int):
        self.log.trace("prev()")
        if self.cur_samp - count < 0:
            raise err.Overflow(f"{self.cur_samp}-{count} < 0")
        self.cur_samp -= count
        samps = self._read(count)
        return samps
    def _read(self, count: int):
        samps = self.fmt.read(self.f, count)
        self.cur_samp += count
        return samps

    def forward(self, count: int):
        """Return <count> samples until EOF"""
        while self.cur_samp + count <= self.max_samp:
            # return self.next(count)
            yield self.next(count)
    def reverse(self, count: int):
        while self.cur_samp - count >= 0:
            # return self.prev(count)
            yield self.prev(count)

    def percent(self):
        """Return percent of file read"""
        return float(self.cur_samp/self.max_samp)*100

    def __call__(self, count):
        return self.forward(count)

Methods

def close(self)
Expand source code
def close(self):
    self.log.trace("close()")
    if self.f is None:
        self.log.warning("File is already closed!")
    else:
        self.f.close()
def forward(self, count: int)
Expand source code
def forward(self, count: int):
    """Return <count> samples until EOF"""
    while self.cur_samp + count <= self.max_samp:
        # return self.next(count)
        yield self.next(count)

Return samples until EOF

def get_fmt(self)
Expand source code
def get_fmt(self):
    return self.fmt
def get_path(self)
Expand source code
def get_path(self):
    return self.path
def get_samples(self)
Expand source code
def get_samples(self):
    return self.next(self.count)
def next(self, count: int)
Expand source code
def next(self, count: int):
    self.log.trace("next()")
    if self.cur_samp + count > self.max_samp:
        raise err.Overflow(f"{self.cur_samp}+{count} > {self.max_samp}")
    samps = self._read(count)
    if len(samps) < count:
        raise err.Underflow(f"self._read({count}) at idx {self.cur_samp}, got {len(samps)}")
    self.cur_samp += count
    return samps
def open(self)
Expand source code
def open(self):
    self.log.trace("open()")
    if self.f is not None:
        self.log.warning("File is already open!")
    else:
        self.f = open(self.path, "rb")
        self.f.seek(self.cur_samp // self.fmt.bytes)
def percent(self)
Expand source code
def percent(self):
    """Return percent of file read"""
    return float(self.cur_samp/self.max_samp)*100

Return percent of file read

def prev(self, count: int)
Expand source code
def prev(self, count: int):
    self.log.trace("prev()")
    if self.cur_samp - count < 0:
        raise err.Overflow(f"{self.cur_samp}-{count} < 0")
    self.cur_samp -= count
    samps = self._read(count)
    return samps
def reset(self)
Expand source code
def reset(self):
    self.log.trace("reset()")
    self.cur_samp = 0
    if self.f is not None:
        self.f.seek(0)
def reverse(self, count: int)
Expand source code
def reverse(self, count: int):
    while self.cur_samp - count >= 0:
        # return self.prev(count)
        yield self.prev(count)
def set_fmt(self, fmt: str)
Expand source code
def set_fmt(self, fmt: str):
    self.log.trace("set_fmt(%s)", fmt)
    self.fmt = Formats[fmt]
    if not self._fsize == 0:
        self.max_samp = self._fsize // self.fmt.bytes
def set_path(self, path)
Expand source code
def set_path(self, path):
    self.log.trace("set_path(%s)", path)
    if not isinstance(path, Path):
        path = Path(path)
    self.path = path
    self._fsize = path.stat().st_size
    self.max_samp = self._fsize // self.fmt.bytes
def set_sample_count(self, count: int)
Expand source code
def set_sample_count(self, count: int):
    self.log.trace("set_sample_count(%s)", count)
    self.count = count