Module rpps.mod.constellation
IQ/Constellation implementation
Classes
class Constellation (points: Points,
mapping: Mapping = (None, Normal),
log=<pyboiler.logger._Logger object>)-
Expand source code
class Constellation: """Constellation implementation""" __slots__ = ("log", "_points", "_mapping", "_bps") def __init__(self, points: Points, mapping: Mapping = Mapping(), log=Logger().Child("Modulation")): self.log = log.Child("Constellation", Level.WARN) if not isinstance(points, Points): points = Points(points) self._points = points self._mapping = mapping self._bps = int(math.log2(len(self.points))) # Bits per symbol def __str__(self) -> str: return f"Points: {self._points}, Map: {self._mapping}, {self._mapping.comment}" def __repr__(self) -> str: return f"<Constellation: {self._bps}>" def __len__(self) -> int: return len(self._points) @property def points(self): """Get constellation points""" return self._points @property def inverted(self): """ Returns if constellation is spectral inverted""" return self._mapping.inverted # { "real": 0.7, "imag": -0.7 }, # { "real": -0.7, "imag": -0.7 }, # { "real": 0.7, "imag": 0.7 }, # { "real": -0.7, "imag": 0.7 } def invert(self): """Spectral invert the constellation""" self._mapping.inverted = not self._mapping.inverted rotpoints = self._points.imag() + self._points.real() * 1j swaps = np.where(self.points.arr == rotpoints)[0] map1 = swaps[0:len(swaps)//2] map2 = swaps[len(swaps)//2:] for m1, m2 in zip(map1, map2): self._mapping.arr[[m1,m2]] = self._mapping.arr[[m2,m1]] @points.setter def points(self, points): self._points = Points(np.array(points)) self._bps = len(self.points) // 2 @property def mapping(self): """Get constellation mapping""" return self._mapping @mapping.setter def mapping(self, map: Mapping): self._mapping = map @property def bits_per_symbol(self): """Get bits per symbol""" return self._bps def modulate(self, dobj: dobject.BitObject, noise: bool = True): """Modulate BitObject to IQ symbols""" indexes = self.index(dobj) points = self.map(indexes) symbols = self.to_symbols(points, noise=noise) return symbols def demodulate(self, syms: dobject.IQObject): """Demodulate IQ symbols to ModData""" syms.data /= np.max(syms.data) # normalize distances = self.from_symbols(syms) codewords = self.codewords() mod = dobject.ModData() mod.soft = base.SoftDecision(codewords, distances) return mod ############################## # Modulate ############################## def index(self, dobj: dobject.BitObject): """Convert bits to indexes""" self.log.trace(f"Data is {dobj.data}") self.log.trace(f"Bits per symbol: {self._bps} / {len(dobj)}") padding = len(dobj) % self._bps if not padding == 0: for _ in range(0, (self._bps - padding)): dobj.append(0) self.log.trace(f"Padded by {self._bps - padding}: {len(dobj)}") num_symbols = len(dobj) // self._bps self.log.trace(f"Data requires {num_symbols} indexes to encode") self.log.trace(f"Data is: {dobj}") indexes = np.split(dobj.data, num_symbols) indexes = [int("".join(data.astype(int).astype(str)), 2) for data in indexes] # indexes = [] # for i in range(0, len(data.bin), self._bps): # bit_int = int(data.bin[i:i+self._bps], 2) # indexes.append(int(bit_int)) self.log.trace(f"Indexes are {indexes}") return indexes def map(self, indexes): """Convert indexes to self.mapping values""" self.log.trace(f"Using mapping: {self.mapping}") points = [] for idx in indexes: points.append( int( np.where(self.mapping.arr == idx)[0][0] ) ) self.log.trace(f"Points are {points}") return points def to_symbols(self, points, noise: bool = False): """Convert mapping values to symbols""" points = np.array(points) symbols = self.points[points] self.log.trace(f"Symbols are:\n{symbols} / {len(symbols)}") # Add noise if noise: n = (np.random.randn(len(symbols)) + 1j*np.random.randn(len(symbols)))/np.sqrt(2) # AWGN with unity power symbols = symbols + n * np.sqrt(0.01) # noise power of 0.01 symbols = symbols.astype(np.complex64) # self.log.trace(f"Symbols are: {symbols}") return dobject.IQData(symbols) ############################## # Demodulate ############################## def from_symbols(self, symbols: dobject.IQObject): """Convert symbols to soft decisions""" # self.log.trace(f"Symbols are:\n{symbols}") # codewords = np.zeros((len(self.points), self._bps), dtype=bool) # Distances[i] are values 0-1 of how far away sym[i] was from each constellation point distances = np.zeros((len(symbols), len(self.points)), dtype=np.float16) distances[:] = np.abs(self.points.arr - symbols.data.reshape(-1, 1)) distances[:] = 1 - np.round(distances / distances.max(axis=0), decimals=2) self.log.trace(f"Points are {distances} / Demodulated {len(distances)} symbols") return distances def unmap(self, points, meta): """Convert points to map indexes""" self.log.trace(f"Using mapping: {self.mapping}") indexes = [] for pnt in points: indexes.append(int(self.mapping[pnt])) self.log.trace(f"Indexes are {indexes}") return indexes def unindex(self, indexes, meta): """Convert indexes to bits""" self.log.trace(f"Bits per symbol: {self._bps}") bits = "" for ind in indexes: bits += bin(ind)[2:].zfill(self._bps) padding = len(bits) % 8 if not padding == 0: self.log.trace(f"Unpadding by {padding}") bits = bits[:-padding] data = dobject.ModData(np.array([int(bit) for bit in bits], dtype=bool)) # data = Stream.from_bin(bits, len(bits)) self.log.trace(f"Data bits are {data} / {len(data)}") return data def codewords(self): bits = np.array([bin(n)[2:].zfill(self._bps) for n in self.mapping.arr]) codewords = np.zeros((len(self.points), self._bps), dtype=int) for i, b in enumerate(bits): for j, c in enumerate(b): codewords[i, j] = True if c == '1' else False return codewordsConstellation implementation
Instance variables
prop bits_per_symbol-
Expand source code
@property def bits_per_symbol(self): """Get bits per symbol""" return self._bpsGet bits per symbol
prop inverted-
Expand source code
@property def inverted(self): """ Returns if constellation is spectral inverted""" return self._mapping.invertedReturns if constellation is spectral inverted
var log-
Expand source code
class Constellation: """Constellation implementation""" __slots__ = ("log", "_points", "_mapping", "_bps") def __init__(self, points: Points, mapping: Mapping = Mapping(), log=Logger().Child("Modulation")): self.log = log.Child("Constellation", Level.WARN) if not isinstance(points, Points): points = Points(points) self._points = points self._mapping = mapping self._bps = int(math.log2(len(self.points))) # Bits per symbol def __str__(self) -> str: return f"Points: {self._points}, Map: {self._mapping}, {self._mapping.comment}" def __repr__(self) -> str: return f"<Constellation: {self._bps}>" def __len__(self) -> int: return len(self._points) @property def points(self): """Get constellation points""" return self._points @property def inverted(self): """ Returns if constellation is spectral inverted""" return self._mapping.inverted # { "real": 0.7, "imag": -0.7 }, # { "real": -0.7, "imag": -0.7 }, # { "real": 0.7, "imag": 0.7 }, # { "real": -0.7, "imag": 0.7 } def invert(self): """Spectral invert the constellation""" self._mapping.inverted = not self._mapping.inverted rotpoints = self._points.imag() + self._points.real() * 1j swaps = np.where(self.points.arr == rotpoints)[0] map1 = swaps[0:len(swaps)//2] map2 = swaps[len(swaps)//2:] for m1, m2 in zip(map1, map2): self._mapping.arr[[m1,m2]] = self._mapping.arr[[m2,m1]] @points.setter def points(self, points): self._points = Points(np.array(points)) self._bps = len(self.points) // 2 @property def mapping(self): """Get constellation mapping""" return self._mapping @mapping.setter def mapping(self, map: Mapping): self._mapping = map @property def bits_per_symbol(self): """Get bits per symbol""" return self._bps def modulate(self, dobj: dobject.BitObject, noise: bool = True): """Modulate BitObject to IQ symbols""" indexes = self.index(dobj) points = self.map(indexes) symbols = self.to_symbols(points, noise=noise) return symbols def demodulate(self, syms: dobject.IQObject): """Demodulate IQ symbols to ModData""" syms.data /= np.max(syms.data) # normalize distances = self.from_symbols(syms) codewords = self.codewords() mod = dobject.ModData() mod.soft = base.SoftDecision(codewords, distances) return mod ############################## # Modulate ############################## def index(self, dobj: dobject.BitObject): """Convert bits to indexes""" self.log.trace(f"Data is {dobj.data}") self.log.trace(f"Bits per symbol: {self._bps} / {len(dobj)}") padding = len(dobj) % self._bps if not padding == 0: for _ in range(0, (self._bps - padding)): dobj.append(0) self.log.trace(f"Padded by {self._bps - padding}: {len(dobj)}") num_symbols = len(dobj) // self._bps self.log.trace(f"Data requires {num_symbols} indexes to encode") self.log.trace(f"Data is: {dobj}") indexes = np.split(dobj.data, num_symbols) indexes = [int("".join(data.astype(int).astype(str)), 2) for data in indexes] # indexes = [] # for i in range(0, len(data.bin), self._bps): # bit_int = int(data.bin[i:i+self._bps], 2) # indexes.append(int(bit_int)) self.log.trace(f"Indexes are {indexes}") return indexes def map(self, indexes): """Convert indexes to self.mapping values""" self.log.trace(f"Using mapping: {self.mapping}") points = [] for idx in indexes: points.append( int( np.where(self.mapping.arr == idx)[0][0] ) ) self.log.trace(f"Points are {points}") return points def to_symbols(self, points, noise: bool = False): """Convert mapping values to symbols""" points = np.array(points) symbols = self.points[points] self.log.trace(f"Symbols are:\n{symbols} / {len(symbols)}") # Add noise if noise: n = (np.random.randn(len(symbols)) + 1j*np.random.randn(len(symbols)))/np.sqrt(2) # AWGN with unity power symbols = symbols + n * np.sqrt(0.01) # noise power of 0.01 symbols = symbols.astype(np.complex64) # self.log.trace(f"Symbols are: {symbols}") return dobject.IQData(symbols) ############################## # Demodulate ############################## def from_symbols(self, symbols: dobject.IQObject): """Convert symbols to soft decisions""" # self.log.trace(f"Symbols are:\n{symbols}") # codewords = np.zeros((len(self.points), self._bps), dtype=bool) # Distances[i] are values 0-1 of how far away sym[i] was from each constellation point distances = np.zeros((len(symbols), len(self.points)), dtype=np.float16) distances[:] = np.abs(self.points.arr - symbols.data.reshape(-1, 1)) distances[:] = 1 - np.round(distances / distances.max(axis=0), decimals=2) self.log.trace(f"Points are {distances} / Demodulated {len(distances)} symbols") return distances def unmap(self, points, meta): """Convert points to map indexes""" self.log.trace(f"Using mapping: {self.mapping}") indexes = [] for pnt in points: indexes.append(int(self.mapping[pnt])) self.log.trace(f"Indexes are {indexes}") return indexes def unindex(self, indexes, meta): """Convert indexes to bits""" self.log.trace(f"Bits per symbol: {self._bps}") bits = "" for ind in indexes: bits += bin(ind)[2:].zfill(self._bps) padding = len(bits) % 8 if not padding == 0: self.log.trace(f"Unpadding by {padding}") bits = bits[:-padding] data = dobject.ModData(np.array([int(bit) for bit in bits], dtype=bool)) # data = Stream.from_bin(bits, len(bits)) self.log.trace(f"Data bits are {data} / {len(data)}") return data def codewords(self): bits = np.array([bin(n)[2:].zfill(self._bps) for n in self.mapping.arr]) codewords = np.zeros((len(self.points), self._bps), dtype=int) for i, b in enumerate(bits): for j, c in enumerate(b): codewords[i, j] = True if c == '1' else False return codewords prop mapping-
Expand source code
@property def mapping(self): """Get constellation mapping""" return self._mappingGet constellation mapping
prop points-
Expand source code
@property def points(self): """Get constellation points""" return self._pointsGet constellation points
Methods
def codewords(self)-
Expand source code
def codewords(self): bits = np.array([bin(n)[2:].zfill(self._bps) for n in self.mapping.arr]) codewords = np.zeros((len(self.points), self._bps), dtype=int) for i, b in enumerate(bits): for j, c in enumerate(b): codewords[i, j] = True if c == '1' else False return codewords def demodulate(self,
syms: IQObject)-
Expand source code
def demodulate(self, syms: dobject.IQObject): """Demodulate IQ symbols to ModData""" syms.data /= np.max(syms.data) # normalize distances = self.from_symbols(syms) codewords = self.codewords() mod = dobject.ModData() mod.soft = base.SoftDecision(codewords, distances) return modDemodulate IQ symbols to ModData
def from_symbols(self,
symbols: IQObject)-
Expand source code
def from_symbols(self, symbols: dobject.IQObject): """Convert symbols to soft decisions""" # self.log.trace(f"Symbols are:\n{symbols}") # codewords = np.zeros((len(self.points), self._bps), dtype=bool) # Distances[i] are values 0-1 of how far away sym[i] was from each constellation point distances = np.zeros((len(symbols), len(self.points)), dtype=np.float16) distances[:] = np.abs(self.points.arr - symbols.data.reshape(-1, 1)) distances[:] = 1 - np.round(distances / distances.max(axis=0), decimals=2) self.log.trace(f"Points are {distances} / Demodulated {len(distances)} symbols") return distancesConvert symbols to soft decisions
def index(self,
dobj: BitObject)-
Expand source code
def index(self, dobj: dobject.BitObject): """Convert bits to indexes""" self.log.trace(f"Data is {dobj.data}") self.log.trace(f"Bits per symbol: {self._bps} / {len(dobj)}") padding = len(dobj) % self._bps if not padding == 0: for _ in range(0, (self._bps - padding)): dobj.append(0) self.log.trace(f"Padded by {self._bps - padding}: {len(dobj)}") num_symbols = len(dobj) // self._bps self.log.trace(f"Data requires {num_symbols} indexes to encode") self.log.trace(f"Data is: {dobj}") indexes = np.split(dobj.data, num_symbols) indexes = [int("".join(data.astype(int).astype(str)), 2) for data in indexes] # indexes = [] # for i in range(0, len(data.bin), self._bps): # bit_int = int(data.bin[i:i+self._bps], 2) # indexes.append(int(bit_int)) self.log.trace(f"Indexes are {indexes}") return indexesConvert bits to indexes
def invert(self)-
Expand source code
def invert(self): """Spectral invert the constellation""" self._mapping.inverted = not self._mapping.inverted rotpoints = self._points.imag() + self._points.real() * 1j swaps = np.where(self.points.arr == rotpoints)[0] map1 = swaps[0:len(swaps)//2] map2 = swaps[len(swaps)//2:] for m1, m2 in zip(map1, map2): self._mapping.arr[[m1,m2]] = self._mapping.arr[[m2,m1]]Spectral invert the constellation
def map(self, indexes)-
Expand source code
def map(self, indexes): """Convert indexes to self.mapping values""" self.log.trace(f"Using mapping: {self.mapping}") points = [] for idx in indexes: points.append( int( np.where(self.mapping.arr == idx)[0][0] ) ) self.log.trace(f"Points are {points}") return pointsConvert indexes to self.mapping values
def modulate(self,
dobj: BitObject,
noise: bool = True)-
Expand source code
def modulate(self, dobj: dobject.BitObject, noise: bool = True): """Modulate BitObject to IQ symbols""" indexes = self.index(dobj) points = self.map(indexes) symbols = self.to_symbols(points, noise=noise) return symbolsModulate BitObject to IQ symbols
def to_symbols(self, points, noise: bool = False)-
Expand source code
def to_symbols(self, points, noise: bool = False): """Convert mapping values to symbols""" points = np.array(points) symbols = self.points[points] self.log.trace(f"Symbols are:\n{symbols} / {len(symbols)}") # Add noise if noise: n = (np.random.randn(len(symbols)) + 1j*np.random.randn(len(symbols)))/np.sqrt(2) # AWGN with unity power symbols = symbols + n * np.sqrt(0.01) # noise power of 0.01 symbols = symbols.astype(np.complex64) # self.log.trace(f"Symbols are: {symbols}") return dobject.IQData(symbols)Convert mapping values to symbols
def unindex(self, indexes, meta)-
Expand source code
def unindex(self, indexes, meta): """Convert indexes to bits""" self.log.trace(f"Bits per symbol: {self._bps}") bits = "" for ind in indexes: bits += bin(ind)[2:].zfill(self._bps) padding = len(bits) % 8 if not padding == 0: self.log.trace(f"Unpadding by {padding}") bits = bits[:-padding] data = dobject.ModData(np.array([int(bit) for bit in bits], dtype=bool)) # data = Stream.from_bin(bits, len(bits)) self.log.trace(f"Data bits are {data} / {len(data)}") return dataConvert indexes to bits
def unmap(self, points, meta)-
Expand source code
def unmap(self, points, meta): """Convert points to map indexes""" self.log.trace(f"Using mapping: {self.mapping}") indexes = [] for pnt in points: indexes.append(int(self.mapping[pnt])) self.log.trace(f"Indexes are {indexes}") return indexesConvert points to map indexes
class Mapping (map=None, comment='')-
Expand source code
class Mapping: """Constellation map""" __slots__ = ("arr", "_comment", "_inv") def __init__(self, map=None, comment=""): self._comment = comment if isinstance(map, int): map = np.array([0] * map) elif not isinstance(map, np.ndarray): map = np.array(map) elif map is None: map = np.array([]) self.arr = map self._inv = False @staticmethod def new(map): """Create a new mapping""" return Mapping(map) @staticmethod def empty(length: int): """Create a new empty mapping""" return Mapping(length) def str(self): """Return map values as str""" return "-".join(self.arr.astype(str)) def __len__(self): return len(self.arr) def __str__(self): return str(self.arr) def __repr__(self): return f"({str(self)}, {self.comment})" def __getitem__(self, item): return self.arr[item] def __setitem__(self, item, val): self.arr[item] = val @property def inverted(self): return self._inv @inverted.setter def inverted(self, val: bool): self._inv = val @property def comment(self): if self._inv: return f"{self._comment} Inverted" return f"{self._comment} Normal"Constellation map
Static methods
def empty(length: int)-
Expand source code
@staticmethod def empty(length: int): """Create a new empty mapping""" return Mapping(length)Create a new empty mapping
def new(map)-
Expand source code
@staticmethod def new(map): """Create a new mapping""" return Mapping(map)Create a new mapping
Instance variables
var arr-
Expand source code
class Mapping: """Constellation map""" __slots__ = ("arr", "_comment", "_inv") def __init__(self, map=None, comment=""): self._comment = comment if isinstance(map, int): map = np.array([0] * map) elif not isinstance(map, np.ndarray): map = np.array(map) elif map is None: map = np.array([]) self.arr = map self._inv = False @staticmethod def new(map): """Create a new mapping""" return Mapping(map) @staticmethod def empty(length: int): """Create a new empty mapping""" return Mapping(length) def str(self): """Return map values as str""" return "-".join(self.arr.astype(str)) def __len__(self): return len(self.arr) def __str__(self): return str(self.arr) def __repr__(self): return f"({str(self)}, {self.comment})" def __getitem__(self, item): return self.arr[item] def __setitem__(self, item, val): self.arr[item] = val @property def inverted(self): return self._inv @inverted.setter def inverted(self, val: bool): self._inv = val @property def comment(self): if self._inv: return f"{self._comment} Inverted" return f"{self._comment} Normal" prop comment-
Expand source code
@property def comment(self): if self._inv: return f"{self._comment} Inverted" return f"{self._comment} Normal" prop inverted-
Expand source code
@property def inverted(self): return self._inv
Methods
def str(self)-
Expand source code
def str(self): """Return map values as str""" return "-".join(self.arr.astype(str))Return map values as str
class Maps (maps)-
Expand source code
class Maps: """Collection of constellation mappings""" __slots__ = ("maps") def __init__(self, maps): self.maps = maps def __str__(self): return str(self.maps) def __len__(self): return len(self.maps) def __getitem__(self, item): return self.maps[item] def __setitem__(self, item, val): self.maps[item] = valCollection of constellation mappings
Instance variables
var maps-
Expand source code
class Maps: """Collection of constellation mappings""" __slots__ = ("maps") def __init__(self, maps): self.maps = maps def __str__(self): return str(self.maps) def __len__(self): return len(self.maps) def __getitem__(self, item): return self.maps[item] def __setitem__(self, item, val): self.maps[item] = val
class Points (points)-
Expand source code
class Points: """Complex points""" __slots__ = ("arr") def __init__(self, points): self.arr = np.array(points) def __len__(self): return len(self.arr) def __str__(self): return f"{self.arr}" def __getitem__(self, item): return self.arr[item] def __setitem__(self, item, val): self.arr[item] = val def real(self): """Get real from points""" return np.real(self.arr) def imag(self): """Get imag from points""" return np.imag(self.arr) def degrees(self): """Get point degrees""" return np.angle(self.arr, deg=True)Complex points
Instance variables
var arr-
Expand source code
class Points: """Complex points""" __slots__ = ("arr") def __init__(self, points): self.arr = np.array(points) def __len__(self): return len(self.arr) def __str__(self): return f"{self.arr}" def __getitem__(self, item): return self.arr[item] def __setitem__(self, item, val): self.arr[item] = val def real(self): """Get real from points""" return np.real(self.arr) def imag(self): """Get imag from points""" return np.imag(self.arr) def degrees(self): """Get point degrees""" return np.angle(self.arr, deg=True)
Methods
def degrees(self)-
Expand source code
def degrees(self): """Get point degrees""" return np.angle(self.arr, deg=True)Get point degrees
def imag(self)-
Expand source code
def imag(self): """Get imag from points""" return np.imag(self.arr)Get imag from points
def real(self)-
Expand source code
def real(self): """Get real from points""" return np.real(self.arr)Get real from points