矩阵法numpy
注:转移矩阵和发射矩阵以列维度上形式表示概率分布的,即列维度axis=1上元素和为1。注意:把隐状态概率分布表示为向量后,向量乘以状态转移矩阵的结果为新的隐状态。 同理,对发射矩阵做向量矩阵乘法会得到新的观测状态。
import numpy as np
from typing import List, Optional, Tuple
def step(score_prev: np.ndarray,
emission_probs: np.ndarray,
transition_probs: np.ndarray,
observed_state: int) -> Tuple[np.ndarray, np.ndarray]:
"""运行维特比算法一个时间步。
Args:
score_prev: probability distribution with shape (num_hidden),
the previous score
emission_probs: the emission probability matrix (num_hidden,
num_observed)
transition_probs: the transition probability matrix, with
shape (num_hidden, num_hidden)
observed_state: the observed state at the current step
Returns:
- the score for the next step
- the maximizing previous state, before the current state, as an int array with shape (num_hidden)
"""
pre_max = score_prev * transition_probs.T # 乘以各个隐状态转移概率下的分数值(矩阵乘法并非点积,向量广播成矩阵后对应元素相乘)
max_prev_states = np.argmax(pre_max, axis=1) # 来自前一时间步的哪个隐状态。即对应前一时间步隐状态位置
max_vals = pre_max[np.arange(len(max_prev_states)), max_prev_states] # 根据最大值索引取出最大值
score_new = max_vals * emission_probs[:, observed_state] # 发射概率列向量:该观测状态对应的隐状态列
return score_new, max_prev_states
def viterbi(emission_probs: np.ndarray,
transition_probs: np.array,
start_probs: np.ndarray,
observed_states: List[int]) -> Tuple[List[int], float]:
"""运行维特比算法获得最有可能的状态序列。
Args:
emission_probs:
transition_probs:
start_probs:
observed_states:
Returns:
- 最有可能的状态序列。
- 状态和观测序列的联合概率值。
"""
# 运行正向传递,存储最有可能的先前状态。
score = start_probs * emission_probs[:, observed_states[0]] # 第一个时间步的最优解向量(隐含状态向量维度)
all_pre_states = []
for observed_state in observed_states[1:]:
score, prevs = step(score, emission_probs, transition_probs, observed_state)
all_pre_states.append(prevs)
# 回溯
state = np.argmax(score) # 最后一个时间步最大分数出现的位置,该值表示来自前一个时间步的第几个状态
sequence_score = score[state] # 获取最大分数,即累计最大分数,最大隐状态序列
sequence_state = [state] # 最后一个时间步的最优隐状态加入结果序列
for pre_states in all_pre_states[::-1]:
state = pre_states[state] # 回溯获取前一个最优隐状态位置
sequence_state.append(state) # 加入结果序列
return sequence_state[::-1], sequence_score
num_hidden_states = 3 # vocab_size
num_observed_states = 2 # vocab_size
num_time_steps = 4 # sequence_length
# 初始化转移概率矩阵
transition_probs = np.array([
[0.1, 0.2, 0.7],
[0.1, 0.1, 0.8],
[0.5, 0.4, 0.1],
])
assert transition_probs.shape == (num_hidden_states, num_hidden_states)
assert transition_probs.sum(axis=1).mean() == 1.0
# 初始化发射概率矩阵 (行 -> 隐含状态:0,1,2,列 -> 观测状态:0,1)
emission_probs = np.array([
[0.1, 0.9],
[0.3, 0.7],
[0.5, 0.5],
])
assert emission_probs.shape == (num_hidden_states, num_observed_states)
assert emission_probs.sum(axis=1).mean() == 1.0
# 初始隐状态概率
init_hidden_state_probs = np.array([0.1, 0.3, 0.6])
assert init_hidden_state_probs.shape == (num_hidden_states,)
# 定义观测序列
observed_states = [1, 1, 0, 1]
assert len(observed_states) == num_time_steps
max_seq, seq_prob = viterbi(
emission_probs,
transition_probs,
init_hidden_state_probs,
observed_states,
)
max_seq, seq_prob
([2, 0, 2, 0], 0.0212625)
init_hidden_state_probs
array([0.1, 0.3, 0.6])
emission_probs
array([[0.1, 0.9],
[0.3, 0.7],
[0.5, 0.5]])
score = init_hidden_state_probs * emission_probs[:, 0]
score
array([0.01, 0.09, 0.3 ])
transition_probs.T
array([[0.1, 0.1, 0.5],
[0.2, 0.1, 0.4],
[0.7, 0.8, 0.1]])
pre_max = score * transition_probs.T
pre_max
array([[0.001, 0.009, 0.15 ],
[0.002, 0.009, 0.12 ],
[0.007, 0.072, 0.03 ]])
max_prev_states = np.argmax(pre_max, axis=1)
max_prev_states
array([2, 2, 1])
np.arange(len(max_prev_states))
array([0, 1, 2])
pre_max[np.arange(len(max_prev_states)), max_prev_states]
array([0.15 , 0.12 , 0.072])
step(score, emission_probs, transition_probs, 1)
(array([0.135, 0.084, 0.036]), array([2, 2, 1]))
矩阵法pytorch
import torch
# shape: (1, 5, 3)
logits = torch.rand((1,5,3))
mask = torch.tensor([[1,1,1,0,0]])
transisition_probs = torch.rand((3,3))
batch_size, max_len, n_tags = logits.size()
seq_len = mask.long().sum(dim=1)
seq_len
tensor([3])
logits = logits.transpose(0,1).data
print(logits.shape)
mask = mask.transpose(0,1).data.eq(True)
print(mask.shape)
flip_mask = mask.eq(False)
torch.Size([5, 1, 3])
torch.Size([5, 1])
vpath = torch.zeros_like(logits, dtype=torch.int)
print(vpath.shape)
vscore = logits[0] # 初始化
print(vscore)
torch.Size([5, 1, 3])
tensor([[0.3556, 0.4721, 0.6929]])
end_transition_prob = torch.zeros(3).view(1, 3).repeat(batch_size,1,1)
end_transition_prob
tensor([[[0., 0., 0.]]])
def viterbi_decode(self, logits, mask, unpad=False):
r"""给定一个特征矩阵以及转移分数矩阵,计算出最佳的路径以及对应的分数
:param torch.FloatTensor logits: batch_size x max_len x num_tags,特征矩阵。
:param torch.ByteTensor mask: batch_size x max_len, 为0的位置认为是pad;如果为None,则认为没有padding。
:param bool unpad: 是否将结果删去padding。False, 返回的是batch_size x max_len的tensor; True,返回的是
List[List[int]], 内部的List[int]为每个sequence的label,已经除去pad部分,即每个List[int]的长度是这
个sample的有效长度。
:return: 返回 (paths, scores)。
paths: 是解码后的路径, 其值参照unpad参数.
scores: torch.FloatTensor, size为(batch_size,), 对应每个最优路径的分数。
"""
batch_size, max_len, n_tags = logits.size()
seq_len = mask.long().sum(1)
logits = logits.transpose(0, 1).data # L, B, H
mask = mask.transpose(0, 1).data.eq(True) # L, B
flip_mask = mask.eq(False)
# dp
vpath = logits.new_zeros((max_len, batch_size, n_tags), dtype=torch.long) # torch.zeros_like(logits, dtype=torch.int)
vscore = logits[0] # bsz x n_tags # 初始化
transitions = self._constrain.data.clone()
transitions[:n_tags, :n_tags] += self.trans_m.data
if self.include_start_end_trans:
transitions[n_tags, :n_tags] += self.start_scores.data
transitions[:n_tags, n_tags + 1] += self.end_scores.data
vscore += transitions[n_tags, :n_tags] # add all 0
trans_score = transitions[:n_tags, :n_tags].view(1, n_tags, n_tags).data # 转移概率矩阵
end_trans_score = transitions[:n_tags, n_tags + 1].view(1, 1, n_tags).repeat(batch_size, 1, 1) # bsz, 1, n_tags
# 针对长度为1的句子, 长度非1的句子则作为初始值
vscore += transitions[:n_tags, n_tags + 1].view(1, n_tags).repeat(batch_size, 1).masked_fill(seq_len.ne(1).view(-1, 1), 0)
for i in range(1, max_len):
prev_score = vscore.view(batch_size, n_tags, 1)
cur_score = logits[i].view(batch_size, 1, n_tags) + trans_score # emission_prob: logits (1,1,8) + (1,8,8)
score = prev_score + cur_score.masked_fill(flip_mask[i].view(batch_size, 1, 1), 0) # bsz x n_tag x n_tag
# 需要考虑当前位置是该序列的最后一个
score += end_trans_score.masked_fill(seq_len.ne(i + 1).view(-1, 1, 1), 0)
best_score, best_dst = score.max(1)
vpath[i] = best_dst
# 由于最终是通过last_tags回溯,需要保持每个位置的vscore情况
vscore = best_score.masked_fill(flip_mask[i].view(batch_size, 1), 0) + vscore.masked_fill(mask[i].view(batch_size, 1), 0)
# 上面masked_fill均是解决mask为0情况
# backtrace
batch_idx = torch.arange(batch_size, dtype=torch.long, device=logits.device)
seq_idx = torch.arange(max_len, dtype=torch.long, device=logits.device)
lens = (seq_len - 1)
# idxes [L, B], batched idx from seq_len-1 to 0
idxes = (lens.view(1, -1) - seq_idx.view(-1, 1)) % max_len
ans = logits.new_empty((max_len, batch_size), dtype=torch.long)
ans_score, last_tags = vscore.max(1) # 最优路径分数,和最后一个时间步的最优隐状态
ans[idxes[0], batch_idx] = last_tags
for i in range(max_len - 1):
last_tags = vpath[idxes[i], batch_idx, last_tags]
ans[idxes[i + 1], batch_idx] = last_tags
ans = ans.transpose(0, 1)
if unpad:
paths = []
for idx, max_len in enumerate(lens):
paths.append(ans[idx, :max_len + 1].tolist())
else:
paths = ans
return paths, ans_scor
迭代法
pre_max
array([[0.001, 0.002, 0.007, 0.009, 0.009, 0.072, 0.15 , 0.12 , 0.03 ]])
a = pre_max.reshape(3,3)
a
array([[0.001, 0.002, 0.007],
[0.009, 0.009, 0.072],
[0.15 , 0.12 , 0.03 ]])
a[0,0]=99
b = pre_max.reshape(3,3)
b
array([[0.001, 0.002, 0.007],
[0.009, 0.009, 0.072],
[0.15 , 0.12 , 0.03 ]])
a = b.resize(1,9)
a
a
b
array([[0.001, 0.002, 0.007, 0.009, 0.009, 0.072, 0.15 , 0.12 , 0.03 ]])