Module rpps.mod.constellation

IQ/Constellation implementation

Classes

class Constellation (points: Points,
mapping: Mapping = (None, Normal),
log=<pyboiler.logger._Logger object>)
Expand source code
class Constellation:
    """Constellation implementation"""
    __slots__ = ("log", "_points", "_mapping", "_bps")

    def __init__(self, points: Points, mapping: Mapping = Mapping(), log=Logger().Child("Modulation")):
        self.log = log.Child("Constellation", Level.WARN)
        if not isinstance(points, Points):
            points = Points(points)
        self._points = points

        self._mapping = mapping

        self._bps = int(math.log2(len(self.points))) # Bits per symbol

    def __str__(self) -> str:
        return f"Points: {self._points}, Map: {self._mapping}, {self._mapping.comment}"

    def __repr__(self) -> str:
        return f"<Constellation: {self._bps}>"

    def __len__(self) -> int:
        return len(self._points)

    @property
    def points(self):
        """Get constellation points"""
        return self._points

    @property
    def inverted(self):
        """ Returns if constellation is spectral inverted"""
        return self._mapping.inverted
    # { "real": 0.7, "imag": -0.7 },
    # { "real": -0.7, "imag": -0.7 },
    # { "real": 0.7, "imag": 0.7 },
    # { "real": -0.7, "imag": 0.7 }
    def invert(self):
        """Spectral invert the constellation"""
        self._mapping.inverted = not self._mapping.inverted
        rotpoints = self._points.imag() + self._points.real() * 1j
        swaps = np.where(self.points.arr == rotpoints)[0]
        map1 = swaps[0:len(swaps)//2]
        map2 = swaps[len(swaps)//2:]
        for m1, m2 in zip(map1, map2):
            self._mapping.arr[[m1,m2]] = self._mapping.arr[[m2,m1]]

    @points.setter
    def points(self, points):
        self._points = Points(np.array(points))
        self._bps = len(self.points) // 2

    @property
    def mapping(self):
        """Get constellation mapping"""
        return self._mapping

    @mapping.setter
    def mapping(self, map: Mapping):
        self._mapping = map

    @property
    def bits_per_symbol(self):
        """Get bits per symbol"""
        return self._bps

    def modulate(self, dobj: dobject.BitObject, noise: bool = True):
        """Modulate BitObject to IQ symbols"""
        indexes = self.index(dobj)
        points = self.map(indexes)
        symbols = self.to_symbols(points, noise=noise)
        return symbols

    def demodulate(self, syms: dobject.IQObject):
        """Demodulate IQ symbols to ModData"""
        syms.data /= np.max(syms.data) # normalize
        distances = self.from_symbols(syms)
        codewords = self.codewords()
        mod = dobject.ModData()
        mod.soft = base.SoftDecision(codewords, distances)

        return mod

    ##############################
    #  Modulate
    ##############################

    def index(self, dobj: dobject.BitObject):
        """Convert bits to indexes"""
        self.log.trace(f"Data is {dobj.data}")
        self.log.trace(f"Bits per symbol: {self._bps} / {len(dobj)}")

        padding = len(dobj) % self._bps
        if not padding == 0:

            for _ in range(0, (self._bps - padding)):
                dobj.append(0)
            self.log.trace(f"Padded by {self._bps - padding}: {len(dobj)}")
        num_symbols = len(dobj) // self._bps

        self.log.trace(f"Data requires {num_symbols} indexes to encode")
        self.log.trace(f"Data is: {dobj}")

        indexes = np.split(dobj.data, num_symbols)
        indexes = [int("".join(data.astype(int).astype(str)), 2) for data in indexes]
        # indexes = []
        # for i in range(0, len(data.bin), self._bps):
        #    bit_int = int(data.bin[i:i+self._bps], 2)
        #    indexes.append(int(bit_int))

        self.log.trace(f"Indexes are {indexes}")
        return indexes

    def map(self, indexes):
        """Convert indexes to self.mapping values"""
        self.log.trace(f"Using mapping: {self.mapping}")
        points = []
        for idx in indexes:
            points.append(
                int(
                    np.where(self.mapping.arr == idx)[0][0]
                )
            )
        self.log.trace(f"Points are {points}")
        return points

    def to_symbols(self, points, noise: bool = False):
        """Convert mapping values to symbols"""
        points = np.array(points)
        symbols = self.points[points]
        self.log.trace(f"Symbols are:\n{symbols} / {len(symbols)}")
        # Add noise
        if noise:
            n = (np.random.randn(len(symbols)) + 1j*np.random.randn(len(symbols)))/np.sqrt(2) # AWGN with unity power
            symbols = symbols + n * np.sqrt(0.01) # noise power of 0.01

        symbols = symbols.astype(np.complex64)
        # self.log.trace(f"Symbols are: {symbols}")
        return dobject.IQData(symbols)

    ##############################
    #  Demodulate
    ##############################

    def from_symbols(self, symbols: dobject.IQObject):
        """Convert symbols to soft decisions"""
        # self.log.trace(f"Symbols are:\n{symbols}")
        # codewords = np.zeros((len(self.points), self._bps), dtype=bool)
        # Distances[i] are values 0-1 of how far away sym[i] was from each constellation point
        distances = np.zeros((len(symbols), len(self.points)), dtype=np.float16)

        distances[:] = np.abs(self.points.arr - symbols.data.reshape(-1, 1))
        distances[:] = 1 - np.round(distances / distances.max(axis=0), decimals=2)
        self.log.trace(f"Points are {distances} / Demodulated {len(distances)} symbols")
        return distances

    def unmap(self, points, meta):
        """Convert points to map indexes"""
        self.log.trace(f"Using mapping: {self.mapping}")
        indexes = []
        for pnt in points:
            indexes.append(int(self.mapping[pnt]))
        self.log.trace(f"Indexes are {indexes}")
        return indexes

    def unindex(self, indexes, meta):
        """Convert indexes to bits"""
        self.log.trace(f"Bits per symbol: {self._bps}")
        bits = ""
        for ind in indexes:
            bits += bin(ind)[2:].zfill(self._bps)

        padding = len(bits) % 8
        if not padding == 0:
            self.log.trace(f"Unpadding by {padding}")
            bits = bits[:-padding]

        data = dobject.ModData(np.array([int(bit) for bit in bits], dtype=bool))

        # data = Stream.from_bin(bits, len(bits))

        self.log.trace(f"Data bits are {data} / {len(data)}")

        return data

    def codewords(self):
        bits = np.array([bin(n)[2:].zfill(self._bps) for n in self.mapping.arr])
        codewords = np.zeros((len(self.points), self._bps), dtype=int)

        for i, b in enumerate(bits):
            for j, c in enumerate(b):
                codewords[i, j] = True if c == '1' else False
        return codewords

Constellation implementation

Instance variables

prop bits_per_symbol
Expand source code
@property
def bits_per_symbol(self):
    """Get bits per symbol"""
    return self._bps

Get bits per symbol

prop inverted
Expand source code
@property
def inverted(self):
    """ Returns if constellation is spectral inverted"""
    return self._mapping.inverted

Returns if constellation is spectral inverted

var log
Expand source code
class Constellation:
    """Constellation implementation"""
    __slots__ = ("log", "_points", "_mapping", "_bps")

    def __init__(self, points: Points, mapping: Mapping = Mapping(), log=Logger().Child("Modulation")):
        self.log = log.Child("Constellation", Level.WARN)
        if not isinstance(points, Points):
            points = Points(points)
        self._points = points

        self._mapping = mapping

        self._bps = int(math.log2(len(self.points))) # Bits per symbol

    def __str__(self) -> str:
        return f"Points: {self._points}, Map: {self._mapping}, {self._mapping.comment}"

    def __repr__(self) -> str:
        return f"<Constellation: {self._bps}>"

    def __len__(self) -> int:
        return len(self._points)

    @property
    def points(self):
        """Get constellation points"""
        return self._points

    @property
    def inverted(self):
        """ Returns if constellation is spectral inverted"""
        return self._mapping.inverted
    # { "real": 0.7, "imag": -0.7 },
    # { "real": -0.7, "imag": -0.7 },
    # { "real": 0.7, "imag": 0.7 },
    # { "real": -0.7, "imag": 0.7 }
    def invert(self):
        """Spectral invert the constellation"""
        self._mapping.inverted = not self._mapping.inverted
        rotpoints = self._points.imag() + self._points.real() * 1j
        swaps = np.where(self.points.arr == rotpoints)[0]
        map1 = swaps[0:len(swaps)//2]
        map2 = swaps[len(swaps)//2:]
        for m1, m2 in zip(map1, map2):
            self._mapping.arr[[m1,m2]] = self._mapping.arr[[m2,m1]]

    @points.setter
    def points(self, points):
        self._points = Points(np.array(points))
        self._bps = len(self.points) // 2

    @property
    def mapping(self):
        """Get constellation mapping"""
        return self._mapping

    @mapping.setter
    def mapping(self, map: Mapping):
        self._mapping = map

    @property
    def bits_per_symbol(self):
        """Get bits per symbol"""
        return self._bps

    def modulate(self, dobj: dobject.BitObject, noise: bool = True):
        """Modulate BitObject to IQ symbols"""
        indexes = self.index(dobj)
        points = self.map(indexes)
        symbols = self.to_symbols(points, noise=noise)
        return symbols

    def demodulate(self, syms: dobject.IQObject):
        """Demodulate IQ symbols to ModData"""
        syms.data /= np.max(syms.data) # normalize
        distances = self.from_symbols(syms)
        codewords = self.codewords()
        mod = dobject.ModData()
        mod.soft = base.SoftDecision(codewords, distances)

        return mod

    ##############################
    #  Modulate
    ##############################

    def index(self, dobj: dobject.BitObject):
        """Convert bits to indexes"""
        self.log.trace(f"Data is {dobj.data}")
        self.log.trace(f"Bits per symbol: {self._bps} / {len(dobj)}")

        padding = len(dobj) % self._bps
        if not padding == 0:

            for _ in range(0, (self._bps - padding)):
                dobj.append(0)
            self.log.trace(f"Padded by {self._bps - padding}: {len(dobj)}")
        num_symbols = len(dobj) // self._bps

        self.log.trace(f"Data requires {num_symbols} indexes to encode")
        self.log.trace(f"Data is: {dobj}")

        indexes = np.split(dobj.data, num_symbols)
        indexes = [int("".join(data.astype(int).astype(str)), 2) for data in indexes]
        # indexes = []
        # for i in range(0, len(data.bin), self._bps):
        #    bit_int = int(data.bin[i:i+self._bps], 2)
        #    indexes.append(int(bit_int))

        self.log.trace(f"Indexes are {indexes}")
        return indexes

    def map(self, indexes):
        """Convert indexes to self.mapping values"""
        self.log.trace(f"Using mapping: {self.mapping}")
        points = []
        for idx in indexes:
            points.append(
                int(
                    np.where(self.mapping.arr == idx)[0][0]
                )
            )
        self.log.trace(f"Points are {points}")
        return points

    def to_symbols(self, points, noise: bool = False):
        """Convert mapping values to symbols"""
        points = np.array(points)
        symbols = self.points[points]
        self.log.trace(f"Symbols are:\n{symbols} / {len(symbols)}")
        # Add noise
        if noise:
            n = (np.random.randn(len(symbols)) + 1j*np.random.randn(len(symbols)))/np.sqrt(2) # AWGN with unity power
            symbols = symbols + n * np.sqrt(0.01) # noise power of 0.01

        symbols = symbols.astype(np.complex64)
        # self.log.trace(f"Symbols are: {symbols}")
        return dobject.IQData(symbols)

    ##############################
    #  Demodulate
    ##############################

    def from_symbols(self, symbols: dobject.IQObject):
        """Convert symbols to soft decisions"""
        # self.log.trace(f"Symbols are:\n{symbols}")
        # codewords = np.zeros((len(self.points), self._bps), dtype=bool)
        # Distances[i] are values 0-1 of how far away sym[i] was from each constellation point
        distances = np.zeros((len(symbols), len(self.points)), dtype=np.float16)

        distances[:] = np.abs(self.points.arr - symbols.data.reshape(-1, 1))
        distances[:] = 1 - np.round(distances / distances.max(axis=0), decimals=2)
        self.log.trace(f"Points are {distances} / Demodulated {len(distances)} symbols")
        return distances

    def unmap(self, points, meta):
        """Convert points to map indexes"""
        self.log.trace(f"Using mapping: {self.mapping}")
        indexes = []
        for pnt in points:
            indexes.append(int(self.mapping[pnt]))
        self.log.trace(f"Indexes are {indexes}")
        return indexes

    def unindex(self, indexes, meta):
        """Convert indexes to bits"""
        self.log.trace(f"Bits per symbol: {self._bps}")
        bits = ""
        for ind in indexes:
            bits += bin(ind)[2:].zfill(self._bps)

        padding = len(bits) % 8
        if not padding == 0:
            self.log.trace(f"Unpadding by {padding}")
            bits = bits[:-padding]

        data = dobject.ModData(np.array([int(bit) for bit in bits], dtype=bool))

        # data = Stream.from_bin(bits, len(bits))

        self.log.trace(f"Data bits are {data} / {len(data)}")

        return data

    def codewords(self):
        bits = np.array([bin(n)[2:].zfill(self._bps) for n in self.mapping.arr])
        codewords = np.zeros((len(self.points), self._bps), dtype=int)

        for i, b in enumerate(bits):
            for j, c in enumerate(b):
                codewords[i, j] = True if c == '1' else False
        return codewords
prop mapping
Expand source code
@property
def mapping(self):
    """Get constellation mapping"""
    return self._mapping

Get constellation mapping

prop points
Expand source code
@property
def points(self):
    """Get constellation points"""
    return self._points

Get constellation points

Methods

def codewords(self)
Expand source code
def codewords(self):
    bits = np.array([bin(n)[2:].zfill(self._bps) for n in self.mapping.arr])
    codewords = np.zeros((len(self.points), self._bps), dtype=int)

    for i, b in enumerate(bits):
        for j, c in enumerate(b):
            codewords[i, j] = True if c == '1' else False
    return codewords
def demodulate(self,
syms: IQObject)
Expand source code
def demodulate(self, syms: dobject.IQObject):
    """Demodulate IQ symbols to ModData"""
    syms.data /= np.max(syms.data) # normalize
    distances = self.from_symbols(syms)
    codewords = self.codewords()
    mod = dobject.ModData()
    mod.soft = base.SoftDecision(codewords, distances)

    return mod

Demodulate IQ symbols to ModData

def from_symbols(self,
symbols: IQObject)
Expand source code
def from_symbols(self, symbols: dobject.IQObject):
    """Convert symbols to soft decisions"""
    # self.log.trace(f"Symbols are:\n{symbols}")
    # codewords = np.zeros((len(self.points), self._bps), dtype=bool)
    # Distances[i] are values 0-1 of how far away sym[i] was from each constellation point
    distances = np.zeros((len(symbols), len(self.points)), dtype=np.float16)

    distances[:] = np.abs(self.points.arr - symbols.data.reshape(-1, 1))
    distances[:] = 1 - np.round(distances / distances.max(axis=0), decimals=2)
    self.log.trace(f"Points are {distances} / Demodulated {len(distances)} symbols")
    return distances

Convert symbols to soft decisions

def index(self,
dobj: BitObject)
Expand source code
def index(self, dobj: dobject.BitObject):
    """Convert bits to indexes"""
    self.log.trace(f"Data is {dobj.data}")
    self.log.trace(f"Bits per symbol: {self._bps} / {len(dobj)}")

    padding = len(dobj) % self._bps
    if not padding == 0:

        for _ in range(0, (self._bps - padding)):
            dobj.append(0)
        self.log.trace(f"Padded by {self._bps - padding}: {len(dobj)}")
    num_symbols = len(dobj) // self._bps

    self.log.trace(f"Data requires {num_symbols} indexes to encode")
    self.log.trace(f"Data is: {dobj}")

    indexes = np.split(dobj.data, num_symbols)
    indexes = [int("".join(data.astype(int).astype(str)), 2) for data in indexes]
    # indexes = []
    # for i in range(0, len(data.bin), self._bps):
    #    bit_int = int(data.bin[i:i+self._bps], 2)
    #    indexes.append(int(bit_int))

    self.log.trace(f"Indexes are {indexes}")
    return indexes

Convert bits to indexes

def invert(self)
Expand source code
def invert(self):
    """Spectral invert the constellation"""
    self._mapping.inverted = not self._mapping.inverted
    rotpoints = self._points.imag() + self._points.real() * 1j
    swaps = np.where(self.points.arr == rotpoints)[0]
    map1 = swaps[0:len(swaps)//2]
    map2 = swaps[len(swaps)//2:]
    for m1, m2 in zip(map1, map2):
        self._mapping.arr[[m1,m2]] = self._mapping.arr[[m2,m1]]

Spectral invert the constellation

def map(self, indexes)
Expand source code
def map(self, indexes):
    """Convert indexes to self.mapping values"""
    self.log.trace(f"Using mapping: {self.mapping}")
    points = []
    for idx in indexes:
        points.append(
            int(
                np.where(self.mapping.arr == idx)[0][0]
            )
        )
    self.log.trace(f"Points are {points}")
    return points

Convert indexes to self.mapping values

def modulate(self,
dobj: BitObject,
noise: bool = True)
Expand source code
def modulate(self, dobj: dobject.BitObject, noise: bool = True):
    """Modulate BitObject to IQ symbols"""
    indexes = self.index(dobj)
    points = self.map(indexes)
    symbols = self.to_symbols(points, noise=noise)
    return symbols

Modulate BitObject to IQ symbols

def to_symbols(self, points, noise: bool = False)
Expand source code
def to_symbols(self, points, noise: bool = False):
    """Convert mapping values to symbols"""
    points = np.array(points)
    symbols = self.points[points]
    self.log.trace(f"Symbols are:\n{symbols} / {len(symbols)}")
    # Add noise
    if noise:
        n = (np.random.randn(len(symbols)) + 1j*np.random.randn(len(symbols)))/np.sqrt(2) # AWGN with unity power
        symbols = symbols + n * np.sqrt(0.01) # noise power of 0.01

    symbols = symbols.astype(np.complex64)
    # self.log.trace(f"Symbols are: {symbols}")
    return dobject.IQData(symbols)

Convert mapping values to symbols

def unindex(self, indexes, meta)
Expand source code
def unindex(self, indexes, meta):
    """Convert indexes to bits"""
    self.log.trace(f"Bits per symbol: {self._bps}")
    bits = ""
    for ind in indexes:
        bits += bin(ind)[2:].zfill(self._bps)

    padding = len(bits) % 8
    if not padding == 0:
        self.log.trace(f"Unpadding by {padding}")
        bits = bits[:-padding]

    data = dobject.ModData(np.array([int(bit) for bit in bits], dtype=bool))

    # data = Stream.from_bin(bits, len(bits))

    self.log.trace(f"Data bits are {data} / {len(data)}")

    return data

Convert indexes to bits

def unmap(self, points, meta)
Expand source code
def unmap(self, points, meta):
    """Convert points to map indexes"""
    self.log.trace(f"Using mapping: {self.mapping}")
    indexes = []
    for pnt in points:
        indexes.append(int(self.mapping[pnt]))
    self.log.trace(f"Indexes are {indexes}")
    return indexes

Convert points to map indexes

class Mapping (map=None, comment='')
Expand source code
class Mapping:
    """Constellation map"""
    __slots__ = ("arr", "_comment", "_inv")

    def __init__(self, map=None, comment=""):
        self._comment = comment
        if isinstance(map, int):
            map = np.array([0] * map)
        elif not isinstance(map, np.ndarray):
            map = np.array(map)
        elif map is None:
            map = np.array([])
        self.arr = map
        self._inv = False

    @staticmethod
    def new(map):
        """Create a new mapping"""
        return Mapping(map)

    @staticmethod
    def empty(length: int):
        """Create a new empty mapping"""
        return Mapping(length)

    def str(self):
        """Return map values as str"""
        return "-".join(self.arr.astype(str))

    def __len__(self):
        return len(self.arr)

    def __str__(self):
        return str(self.arr)

    def __repr__(self):
        return f"({str(self)}, {self.comment})"

    def __getitem__(self, item):
        return self.arr[item]

    def __setitem__(self, item, val):
        self.arr[item] = val

    @property
    def inverted(self):
        return self._inv

    @inverted.setter
    def inverted(self, val: bool):
        self._inv = val

    @property
    def comment(self):
        if self._inv:
            return f"{self._comment} Inverted"
        return f"{self._comment} Normal"

Constellation map

Static methods

def empty(length: int)
Expand source code
@staticmethod
def empty(length: int):
    """Create a new empty mapping"""
    return Mapping(length)

Create a new empty mapping

def new(map)
Expand source code
@staticmethod
def new(map):
    """Create a new mapping"""
    return Mapping(map)

Create a new mapping

Instance variables

var arr
Expand source code
class Mapping:
    """Constellation map"""
    __slots__ = ("arr", "_comment", "_inv")

    def __init__(self, map=None, comment=""):
        self._comment = comment
        if isinstance(map, int):
            map = np.array([0] * map)
        elif not isinstance(map, np.ndarray):
            map = np.array(map)
        elif map is None:
            map = np.array([])
        self.arr = map
        self._inv = False

    @staticmethod
    def new(map):
        """Create a new mapping"""
        return Mapping(map)

    @staticmethod
    def empty(length: int):
        """Create a new empty mapping"""
        return Mapping(length)

    def str(self):
        """Return map values as str"""
        return "-".join(self.arr.astype(str))

    def __len__(self):
        return len(self.arr)

    def __str__(self):
        return str(self.arr)

    def __repr__(self):
        return f"({str(self)}, {self.comment})"

    def __getitem__(self, item):
        return self.arr[item]

    def __setitem__(self, item, val):
        self.arr[item] = val

    @property
    def inverted(self):
        return self._inv

    @inverted.setter
    def inverted(self, val: bool):
        self._inv = val

    @property
    def comment(self):
        if self._inv:
            return f"{self._comment} Inverted"
        return f"{self._comment} Normal"
prop comment
Expand source code
@property
def comment(self):
    if self._inv:
        return f"{self._comment} Inverted"
    return f"{self._comment} Normal"
prop inverted
Expand source code
@property
def inverted(self):
    return self._inv

Methods

def str(self)
Expand source code
def str(self):
    """Return map values as str"""
    return "-".join(self.arr.astype(str))

Return map values as str

class Maps (maps)
Expand source code
class Maps:
    """Collection of constellation mappings"""
    __slots__ = ("maps")

    def __init__(self, maps):
        self.maps = maps

    def __str__(self):
        return str(self.maps)

    def __len__(self):
        return len(self.maps)

    def __getitem__(self, item):
        return self.maps[item]

    def __setitem__(self, item, val):
        self.maps[item] = val

Collection of constellation mappings

Instance variables

var maps
Expand source code
class Maps:
    """Collection of constellation mappings"""
    __slots__ = ("maps")

    def __init__(self, maps):
        self.maps = maps

    def __str__(self):
        return str(self.maps)

    def __len__(self):
        return len(self.maps)

    def __getitem__(self, item):
        return self.maps[item]

    def __setitem__(self, item, val):
        self.maps[item] = val
class Points (points)
Expand source code
class Points:
    """Complex points"""
    __slots__ = ("arr")

    def __init__(self, points):
        self.arr = np.array(points)

    def __len__(self):
        return len(self.arr)

    def __str__(self):
        return f"{self.arr}"

    def __getitem__(self, item):
        return self.arr[item]

    def __setitem__(self, item, val):
        self.arr[item] = val

    def real(self):
        """Get real from points"""
        return np.real(self.arr)

    def imag(self):
        """Get imag from points"""
        return np.imag(self.arr)

    def degrees(self):
        """Get point degrees"""
        return np.angle(self.arr, deg=True)

Complex points

Instance variables

var arr
Expand source code
class Points:
    """Complex points"""
    __slots__ = ("arr")

    def __init__(self, points):
        self.arr = np.array(points)

    def __len__(self):
        return len(self.arr)

    def __str__(self):
        return f"{self.arr}"

    def __getitem__(self, item):
        return self.arr[item]

    def __setitem__(self, item, val):
        self.arr[item] = val

    def real(self):
        """Get real from points"""
        return np.real(self.arr)

    def imag(self):
        """Get imag from points"""
        return np.imag(self.arr)

    def degrees(self):
        """Get point degrees"""
        return np.angle(self.arr, deg=True)

Methods

def degrees(self)
Expand source code
def degrees(self):
    """Get point degrees"""
    return np.angle(self.arr, deg=True)

Get point degrees

def imag(self)
Expand source code
def imag(self):
    """Get imag from points"""
    return np.imag(self.arr)

Get imag from points

def real(self)
Expand source code
def real(self):
    """Get real from points"""
    return np.real(self.arr)

Get real from points