Module rpps.coding.types.viterbi

Classes

class viterbi (num, den, constraint, generator)
Expand source code
class viterbi(_code):
    """Viterbi decoder"""
    def __init__(self, num, den, constraint, generator):
        super().__init__(num, den)
        self.con = constraint
        self.gen = generator

        trellis_states = 2**(self.con-1)
        states = np.arange(trellis_states, dtype=np.uint8)
        states = np.unpackbits(states).astype(bool).reshape(-1, 8)[:, -(self.con-1):]

        # print(f"Got {len(states)} states")
        # initialize (total states; input 0,1; register bits)
        all_states = np.zeros((len(states),2,self.con), dtype=bool)
        for i, state in enumerate(all_states):
            for j, st in enumerate(state):
                all_states[i][j] = np.append(j, states[i])
                # input(f"{i},{j}: {st}")

        # initialize (total states; constraints; 0,1)
        codes = np.zeros((len(states),self.con-1,2), dtype=bool)
        # print(f"Initial codes: {codes}")
        for i, state in enumerate(all_states):
            for j, st in enumerate(state):
                for k, g in enumerate(self.gen):
                    codes[i,j,k] = np.bitwise_xor.reduce(st[g])
                # input(f"codes {i},{j} ({st}) = {codes[i,j]}")

        self.states = states
        self.all_codes = codes
        self.next_state = np.empty((len(states), 2), dtype=int)
        next_state = all_states[:, :, :-1]
        for i, state in enumerate(next_state):
            for j, b in enumerate(iterable=state):
                self.next_state[i, j] = np.where((self.states == b).all(axis=1))[0][0]

        # print(f"states: {self.states.shape}:\n{self.states.astype(int)}")
        # print(f"all_codes: {self.all_codes.shape}:\n{self.all_codes.astype(int)}")
        # print(f"next_state: {self.next_state.shape}:\n{self.next_state}")

    def decode(self, bits: np.ndarray):
        blocks = block(bits, self.den)
        trellis_length = len(blocks)

        paths = np.empty((len(self.states), trellis_length, len(self.states)), dtype=np.complex64)
        paths[:,:,:] = np.inf + np.inf*1j

        # print(f"Viterbi decode!")
        for i, state in enumerate(self.states):
            # 0: 00
            # 1: 01
            # 2: 10
            # 3: 11
            for j, blk in enumerate(blocks):
                if j == 0:
                    last = [i]
                else:
                    last = np.where(np.abs(paths[i,j-1])!=np.inf)[0]

                buffer = np.empty((len(last), len(self.states)), dtype=np.complex64)
                buffer[:,:] = np.nan - np.inf*1j

                # print(f"Checking {last}")
                for k, idx in enumerate(last):
                    # print(f"{i},{j},{idx}: {blk}")
                    code_0 = self.all_codes[idx][0]
                    code_1 = self.all_codes[idx][1]

                    dist_0 = hamming_dist(code_0, blk)
                    dist_1 = hamming_dist(code_1, blk)

                    nxt_0 = self.next_state[idx][0]
                    nxt_1 = self.next_state[idx][1]

                    if not j == 0:
                        dist_0 += paths[i,j-1,idx].real
                        dist_1 += paths[i,j-1,idx].real

                    if j < 2:
                        paths[i,j,nxt_0] = dist_0 + idx*1j
                        paths[i,j,nxt_1] = dist_1 + idx*1j
                    else:
                        buffer[k,nxt_0] = dist_0 + idx*1j
                        buffer[k,nxt_1] = dist_1 + idx*1j

                if j >= 2:
                    # print(buffer)
                    for k in range(len(self.states)):
                        valid_idx = np.where(np.abs(buffer[:,k])!=np.inf)[0]
                        valid_inp = buffer[valid_idx,k]
                        paths[i, j, k] = np.min(valid_inp)
                # print(paths[i,j])
                # print()
            break
        # print("Result:")
        # print(paths[0])
        survivor = np.empty((len(self.states), trellis_length), dtype=int)
        code = np.empty((len(self.states), trellis_length, self.den), dtype=bool)
        decoded = np.empty((len(self.states), trellis_length), dtype=bool)

        for i, state in enumerate(self.states):
            selected_path = np.min(paths[i,-1].real)
            selected_path = np.where(paths[i,-1].real==selected_path)[0][0]

            code[i,0] = self.states[0]
            decoded[i,0] = i

            # print(paths[i])

            # Reverse the trellis
            # print(f"Selected path {selected_path}")
            cur_idx = selected_path
            for j in range(len(paths[i])-1, -1, -1):
                # print(f"Decoding path[{j+1}], selected {cur_idx}")
                survivor[i,j] = cur_idx

                cur_idx = paths[i,j][cur_idx].imag.astype(int)

            for idx in range(len(survivor)):
                cur_state = survivor[i, idx]
                bit = np.where(self.next_state[cur_state]==survivor[i, idx+1])[0][0]
                code[i,idx+1] = self.all_codes[cur_state][bit]
                decoded[i,idx+1] = bit

            break

        # print(survivor[0].astype(int))
        # print(code[0].astype(int))
        # print(decoded[0].astype(int))
        return decoded[0]

Viterbi decoder

Ancestors

  • rpps.coding.types._code._code

Methods

def decode(self, bits: numpy.ndarray)
Expand source code
def decode(self, bits: np.ndarray):
    blocks = block(bits, self.den)
    trellis_length = len(blocks)

    paths = np.empty((len(self.states), trellis_length, len(self.states)), dtype=np.complex64)
    paths[:,:,:] = np.inf + np.inf*1j

    # print(f"Viterbi decode!")
    for i, state in enumerate(self.states):
        # 0: 00
        # 1: 01
        # 2: 10
        # 3: 11
        for j, blk in enumerate(blocks):
            if j == 0:
                last = [i]
            else:
                last = np.where(np.abs(paths[i,j-1])!=np.inf)[0]

            buffer = np.empty((len(last), len(self.states)), dtype=np.complex64)
            buffer[:,:] = np.nan - np.inf*1j

            # print(f"Checking {last}")
            for k, idx in enumerate(last):
                # print(f"{i},{j},{idx}: {blk}")
                code_0 = self.all_codes[idx][0]
                code_1 = self.all_codes[idx][1]

                dist_0 = hamming_dist(code_0, blk)
                dist_1 = hamming_dist(code_1, blk)

                nxt_0 = self.next_state[idx][0]
                nxt_1 = self.next_state[idx][1]

                if not j == 0:
                    dist_0 += paths[i,j-1,idx].real
                    dist_1 += paths[i,j-1,idx].real

                if j < 2:
                    paths[i,j,nxt_0] = dist_0 + idx*1j
                    paths[i,j,nxt_1] = dist_1 + idx*1j
                else:
                    buffer[k,nxt_0] = dist_0 + idx*1j
                    buffer[k,nxt_1] = dist_1 + idx*1j

            if j >= 2:
                # print(buffer)
                for k in range(len(self.states)):
                    valid_idx = np.where(np.abs(buffer[:,k])!=np.inf)[0]
                    valid_inp = buffer[valid_idx,k]
                    paths[i, j, k] = np.min(valid_inp)
            # print(paths[i,j])
            # print()
        break
    # print("Result:")
    # print(paths[0])
    survivor = np.empty((len(self.states), trellis_length), dtype=int)
    code = np.empty((len(self.states), trellis_length, self.den), dtype=bool)
    decoded = np.empty((len(self.states), trellis_length), dtype=bool)

    for i, state in enumerate(self.states):
        selected_path = np.min(paths[i,-1].real)
        selected_path = np.where(paths[i,-1].real==selected_path)[0][0]

        code[i,0] = self.states[0]
        decoded[i,0] = i

        # print(paths[i])

        # Reverse the trellis
        # print(f"Selected path {selected_path}")
        cur_idx = selected_path
        for j in range(len(paths[i])-1, -1, -1):
            # print(f"Decoding path[{j+1}], selected {cur_idx}")
            survivor[i,j] = cur_idx

            cur_idx = paths[i,j][cur_idx].imag.astype(int)

        for idx in range(len(survivor)):
            cur_state = survivor[i, idx]
            bit = np.where(self.next_state[cur_state]==survivor[i, idx+1])[0][0]
            code[i,idx+1] = self.all_codes[cur_state][bit]
            decoded[i,idx+1] = bit

        break

    # print(survivor[0].astype(int))
    # print(code[0].astype(int))
    # print(decoded[0].astype(int))
    return decoded[0]

Decode bits