1.2.6 实战:日期转换
Transformer模型的完整架构如图1-27所示。从功能角度来看,编码器的核心作用是从输入序列中提取特征,解码器的核心作用则是处理生成任务。从结构上看,编码器=嵌入层+位置编码+N×[(多头自注意力+残差连接+标准化)+(前馈神经网络+残差连接+标准化)],解码器=嵌入层+位置编码+N×[(带掩码的多头自注意力+残差连接+标准化)+(多头注意力+残差连接+标准化)+(前馈神经网络+残差连接+标准化)]。本节我们将通过Transformer模型再实现一次日期转换的功能。
图1-27 Transformer模型的完整架构
1.组件定义
首先定义多头注意力层。
python
class MultiHeadAttention(nn.Module):
def __init__(self, n_head, model_dim, drop_rate):
super().__init__()
# 每个注意力头的维度
self.head_dim=model_dim//n_head
# 注意力头的数量
self.n_head=n_head
# 模型的维度
self.model_dim=model_dim
# 初始化线性变换层,用于生成query、key和value
self.wq=nn.Linear(model_dim, n_head * self.head_dim)
self.wk=nn.Linear(model_dim, n_head * self.head_dim)
self.wv=nn.Linear(model_dim, n_head * self.head_dim)
# 输出的全连接层
self.output_dense=nn.Linear(model_dim, model_dim)
# Dropout层,用于防止模型过拟合
self.output_drop=nn.Dropout(drop_rate)
# 层标准化,用于稳定神经网络的训练
self.layer_norm=nn.LayerNorm(model_dim)
self.attention=None
def forward(self, q, k, v, mask):
# 保存原始输入q,用于后续的残差连接
residual=q
# 分别对输入的q、k、v做线性变换,生成query、key和value
query=self.wq(q)
key=self.wk(k)
value=self.wv(v)
# 对生成的query、key和value进行头分割,以便进行多头注意力计算
query=self.split_heads(query)
key=self.split_heads(key)
value=self.split_heads(value)
# 计算上下文向量
context=self.scaled_dot_product_attention(query, key, value, mask)
# 对上下文向量进行线性变换
output=self.output_dense(context)
# 添加dropout
output=self.output_drop(output)
# 添加残差连接并进行层标准化
output=self.layer_norm(residual+output)
return output
def split_heads(self, x):
# 将输入x的形状(shape)变为(n, step, n_head, head_dim),然后重排,得到(n, n_head, step, head_dim)
x=th.reshape(x, (x.shape[0], x.shape[1], self.n_head, self.head_dim))
return x.permute(0, 2, 1, 3)
def scaled_dot_product_attention(self, q, k, v, mask=None):
# 计算缩放因子
dk=th.tensor(k.shape[-1]).type(th.float)
# 计算注意力分数
score=th.matmul(q, k.permute(0, 1, 3, 2))/(th.sqrt(dk)+1e-8)
if mask is not None:
# 如果提供了mask,则将mask位置的分数设置为负无穷,使得这些位置的softmax值接近0
score=score.masked_fill_(mask,-np.inf)
# 应用softmax函数计算得到注意力权重
self.attention=softmax(score,dim=-1)
# 计算上下文向量
context=th.matmul(self.attention,v)
# 重排上下文向量的维度并进行维度合并
context=context.permute(0, 2, 1, 3)
context=context.reshape((context.shape[0], context.shape[1],-1))
return context
重点介绍一下scaled_dot_product_attention函数,此函数实现了“缩放点积注意力机制”的注意力计算过程。这是Transformer模型中的核心部分。以下是函数执行的主要步骤。
1)计算缩放因子。函数计算了缩放因子dk,它等于k(对应key)的最后一个维度。这个缩放因子用于在计算注意力分数时,缓解可能因维度较高而导致的点积梯度消失或梯度爆炸问题。
2)计算注意力分数。函数计算了注意力分数score。注意力分数是通过对q(对应query)和k(对应key)进行点积运算并除以缩放因子dk的平方根来计算的。
3)应用mask。如果提供了mask,那么函数将在计算softmax值之前将mask位置的分数设置为负无穷。这将使得这些位置的softmax值接近0,也就是说,模型不会关注这些位置。
4)计算注意力权重。函数通过对score应用softmax函数,计算得到注意力权重self.attention。
5)计算上下文向量。使用注意力权重和v(对应value)进行矩阵乘法,计算出上下文向量context。
6)重排和合并维度。函数通过重排和合并维度,得到了最终的上下文向量。这个上下文向量将被用作多头注意力机制的输出。
注意:这个过程被多次应用于多头注意力机制中,每个头都会有自己的query、key和value,它们通过不同的线性变换得到,然后用于计算各自的注意力权重和上下文向量。
然后,我们定义注意力计算后的前馈神经网络,它在每个Transformer模型的编码器和解码器的层中使用,并且独立地应用于每个位置的输入。这个网络包含两个线性变换层,其中间隔一个ReLU激活函数,并且在输出之前使用了dropout和层标准化。
python
class PositionWiseFFN(nn.Module):
def __init__(self, model_dim, dropout=0.0):
super().__init__()
# 前馈神经网络的隐藏层维度,设为模型维度的4倍
ffn_dim=model_dim * 4
# 第一个线性变换层,其输出维度为前馈神经网络的隐藏层维度
self.linear1=nn.Linear(model_dim, ffn_dim)
# 第二个线性变换层,其输出维度为模型的维度
self.linear2=nn.Linear(ffn_dim, model_dim)
# Dropout层,用于防止模型过拟合
self.dropout=nn.Dropout(dropout)
# 层标准化,用于稳定神经网络的训练
self.layer_norm=nn.LayerNorm(model_dim)
def forward(self, x):
# 对输入x进行前馈神经网络的计算
# 首先,通过第一个线性变换层并使用ReLU作为激活函数
output=relu(self.linear1(x))
# 然后,通过第二个线性变换层
output=self.linear2(output)
# 接着,对上述输出进行dropout操作
output=self.dropout(output)
# 最后,对输入x和前馈神经网络的输出做残差连接,然后进行层标准化
output=self.layer_norm(x+output)
return output # 返回结果,其形状为[n, step, dim]
2.实现编码器与解码器
之后我们定义Transformer模型的编码器。
python
class EncoderLayer(nn.Module):
def __init__(self, n_head, emb_dim, drop_rate):
super().__init__()
# 多头注意力机制层
self.mha=MultiHeadAttention(n_head, emb_dim, drop_rate)
# 前馈神经网络层
self.ffn=PositionWiseFFN(emb_dim, drop_rate)
def forward(self, xz, mask):
# xz的形状为 [n, step, emb_dim]
# 通过多头注意力机制层处理xz,得到context,其形状也为 [n, step, emb_dim]
context=self.mha(xz, xz, xz, mask)
# 将context传入前馈神经网络层,得到输出
output=self.ffn(context)
return output
class Encoder(nn.Module):
def __init__(self, n_head, emb_dim, drop_rate, n_layer):
super().__init__()
# 定义n_layer个EncoderLayer,保存在ModuleList中
self.encoder_layers=nn.ModuleList(
[EncoderLayer(n_head, emb_dim, drop_rate) for _ in range(n_layer)]
)
def forward(self, xz, mask):
# 依次通过所有的EncoderLayer
for encoder in self.encoder_layers:
xz=encoder(xz, mask)
return xz # 返回的xz形状为 [n, step, emb_dim]
再定义Transformer模型的解码器。
python
class DecoderLayer(nn.Module):
def __init__(self, n_head, model_dim, drop_rate):
super().__init__()
# 定义两个多头注意力机制层
self.mha=nn.ModuleList([MultiHeadAttention(n_head, model_dim, drop_rate) for _ in range(2)])
# 定义一个前馈神经网络层
self.ffn=PositionWiseFFN(model_dim, drop_rate)
def forward(self, yz, xz, yz_look_ahead_mask, xz_pad_mask):
# 执行第一个注意力层的计算,3个输入均为yz,使用自注意力机制
dec_output=self.mha[0](yz, yz, yz, yz_look_ahead_mask) # [n, step, model_dim]
# 执行第二个注意力层的计算,其中Q来自前一个注意力层的输出,K和V来自编码器的输出
dec_output=self.mha[1](dec_output, xz, xz, xz_pad_mask) # [n, step, model_dim]
# 通过前馈神经网络层
dec_output=self.ffn(dec_output) # [n, step, model_dim]
return dec_output
class Decoder(nn.Module):
def __init__(self, n_head, model_dim, drop_rate, n_layer):
super().__init__()
# 定义n_layer个DecoderLayer,保存在ModuleList中
self.num_layers=n_layer
self.decoder_layers=nn.ModuleList(
[DecoderLayer(n_head, model_dim, drop_rate) for _ in range(n_layer)]
)
def forward(self, yz, xz, yz_look_ahead_mask, xz_pad_mask):
# 依次通过所有的DecoderLayer
for decoder in self.decoder_layers:
yz=decoder(yz, xz, yz_look_ahead_mask, xz_pad_mask)
return yz # 返回的yz形状为 [n, step, model_dim]
重点介绍一下解码器的两个注意力层。在解码器的前向传播过程中,输入的yz首先会传入第一个多头注意力机制层中。这个注意力层是一个自注意力机制层,也就是说其query、key和value都来自yz。并且,这个注意力层使用了一个look ahead mask方法,使得在计算注意力分数时,每个位置只能关注它之前的位置,而不能关注它之后的位置。这是因为在预测时,模型只能看到已经预测出的词,不能看到还未预测出的词。这个注意力层的输出dec_output也是一个序列,其形状为[n, step, model_dim]。
然后,dec_output和编码器的输出xz一起传入第二个多头注意力机制层中。这个注意力层的query来自dec_output,而key和value来自xz。并且,这个注意力层使用了一个padding mask方法,使得在计算注意力分数时,模型不会关注xz中的padding位置。这个注意力层的输出dec_output同样是一个序列,其形状为[n, step, model_dim]。
在解码器中,每个位置的输出不仅取决于当前位置的输入,还取决于前面位置的输入和编码器所有位置的输出。这意味着,解码器可以捕捉到输入和输出之间的复杂依赖关系。
3.组装Transformer
在处理输入时,还需要定义位置编码层,用于处理序列数据。这个层的作用是将序列中每个位置的词编码为一个固定大小的向量,这个向量包含了词的信息和它在序列中的位置信息。
python
class PositionEmbedding(nn.Module):
def __init__(self, max_len, emb_dim, n_vocab):
super().__init__()
# 生成位置编码矩阵
pos=np.expand_dims(np.arange(max_len), 1) # [max_len, 1]
# 使用正弦和余弦函数生成位置编码
pe=pos/np.power(1000, 2*np.expand_dims(np.arange(emb_dim)//2, 0)/emb_dim)
pe[:, 0::2]=np.sin(pe[:, 0::2])
pe[:, 1::2]=np.cos(pe[:, 1::2])
pe=np.expand_dims(pe, 0) # [1, max_len, emb_dim]
self.pe=th.from_numpy(pe).type(th.float32)
# 定义词嵌入层
self.embeddings=nn.Embedding(n_vocab, emb_dim)
# 初始化词嵌入层的权重
self.embeddings.weight.data.normal_(0, 0.1)
def forward(self, x):
# 确保位置编码在与词嵌入权重相同的设备上
device=self.embeddings.weight.device
self.pe=self.pe.to(device)
# 计算输入的词嵌入权重,并加上位置编码
x_embed=self.embeddings(x)+self.pe # [n, step, emb_dim]
return x_embed # [n, step, emb_dim]
最后,我们将其组装成Transformer。
python
class Transformer(nn.Module):
def __init__(self, n_vocab, max_len, n_layer=6, emb_dim=512, n_head=8, drop_rate=0.1, padding_idx=0):
super().__init__()
# 初始化最大长度、填充索引、词汇表大小
self.max_len=max_len
self.padding_idx=th.tensor(padding_idx)
self.dec_v_emb=n_vocab
# 初始化位置嵌入、编码器、解码器和输出层
self.embed=PositionEmbedding(max_len, emb_dim, n_vocab)
self.encoder=Encoder(n_head, emb_dim, drop_rate, n_layer)
self.decoder=Decoder(n_head, emb_dim, drop_rate, n_layer)
self.output=nn.Linear(emb_dim, n_vocab)
# 初始化优化器
self.opt=th.optim.Adam(self.parameters(), lr=0.002)
def forward(self, x, y):
# 对输入和目标进行嵌入
x_embed, y_embed=self.embed(x), self.embed(y)
# 创建填充掩码
pad_mask=self._pad_mask(x)
# 对输入进行编码
encoded_z=self.encoder(x_embed, pad_mask)
# 创建前瞻掩码
yz_look_ahead_mask=self._look_ahead_mask(y)
# 将编码后的输入和前瞻掩码传入解码器
decoded_z=self.decoder(
y_embed, encoded_z, yz_look_ahead_mask, pad_mask)
# 通过输出层得到最终输出
output=self.output(decoded_z)
return output
def step(self, x, y):
# 清空梯度
self.opt.zero_grad()
# 计算输出和损失
logits=self(x, y[:, :-1])
loss=cross_entropy(logits.reshape(-1, self.dec_v_emb), y[:, 1:].reshape(-1))
# 进行反向传播
loss.backward()
# 更新参数
self.opt.step()
return loss.cpu().data.numpy(), logits
def _pad_bool(self, seqs):
# 创建掩码,标记哪些位置是填充的
return th.eq(seqs, self.padding_idx)
def _pad_mask(self, seqs):
# 将填充掩码扩展到合适的维度
len_q=seqs.size(1)
mask=self._pad_bool(seqs).unsqueeze(1).expand(-1, len_q,-1)
return mask.unsqueeze(1)
def _look_ahead_mask(self, seqs):
# 创建前瞻掩码,防止在生成序列时看到未来位置的信息
device=next(self.parameters()).device
_, seq_len=seqs.shape
mask=th.triu(th.ones((seq_len, seq_len), dtype=th.long),
diagonal=1).to(device)
mask=th.where(self._pad_bool(seqs)[:, None, None, :], 1, mask[None, None, :, :]).to(device)
return mask>0
在处理序列数据(如文本或时间序列数据)时,通常会遇到一个问题,即序列的长度不一致。在大多数深度学习框架中,我们需要将一个批量的数据整理成相同的形状才能进行计算。因此,我们需要一种方法来处理长度不一的序列,这就是“填充”(padding)的用处。通过填充,我们可以将不同长度的序列转变为相同长度,具体来说,我们会找到批量中最长的序列,然后将其他较短的序列通过添加特殊的“填充值”(如0或特殊的标记)来扩展到相同的长度。
填充之后,我们就可以将序列数据整理成相同的形状,这样就可以用来训练模型了。然而,填充值是没有实际意义的,我们不希望它们对模型的训练造成影响。因此,我们通常会创建一个掩码(mask),用来告诉模型哪些位置是填充值,也就是Transformer模型定义中的_pad_mask和_look_ahead_mask函数。它们会返回一个布尔值矩阵,标记输入中哪些位置是填充值。
python
def pad_zero(seqs, max_len):
# 初始化一个全是填充标识符PAD_token的二维矩阵,大小为(len(seqs), max_len)
padded=np.full((len(seqs), max_len), fill_value=PAD_token, dtype=np.int32)
for i, seq in enumerate(seqs):
# 将seqs中的每个seq序列的元素填入padded对应的行中,未填满的部分仍为PAD_token
padded[i, :len(seq)]=seq
return padded
4.训练与评估
接下来就可以开始训练了。
python
# 初始化一个Transformer模型,设置词汇表大小、最大序列长度、层数、嵌入维度、多头注意力的头数、dropout比率和填充标记的索引
model=Transformer(n_vocab=dataset.num_word, max_len=MAX_LENGTH, n_layer=3, emb_dim=32, n_head=8, drop_rate=0.1, padding_idx=0)
# 检测是否有可用的GPU,如果有,则使用GPU进行计算;如果没有,则使用CPU
device=th.device("cuda" if th.cuda.is_available() else "cpu")
# 将模型移动到相应的设备(CPU或GPU)
model=model.to(device)
# 创建一个数据集,包含1000个样本
dataset=DateDataset(1000)
# 创建一个数据加载器,设定批量大小为32,每个批量的数据会被打乱
dataloader=DataLoader(dataset, batch_size=32, shuffle=True)
# 执行10个训练周期
for i in range(10):
# 对于数据加载器中的每批数据,对输入和目标张量进行零填充,使其长度达到最大,然后将其转换为PyTorch张量,并移动到相应的设备(CPU或GPU)
for input_tensor, target_tensor, _ in dataloader:
input_tensor=th.from_numpy(
pad_zero(input_tensor, max_len=MAX_LENGTH)).long().to(device)
target_tensor=th.from_numpy(
pad_zero(target_tensor, MAX_LENGTH+1)).long().to(device)
# 使用模型的step方法进行一步训练,并获取损失值
loss, _=model.step(input_tensor, target_tensor)
# 打印每个训练周期后的损失值
print(f"epoch: {i+1}, \tloss: {loss}")
类似于Seq2Seq结构模型的日期转换,我们可以定义一个评估方法,查看Transformer模型能否正确地进行日期转换。
python
def evaluate(model, x, y):
model.eval()
x=th.from_numpy(pad_zero([x], max_len=MAX_LENGTH)).long().to(device)
y=th.from_numpy(pad_zero([y], max_len=MAX_LENGTH)).long().to(device)
decoder_outputs=model(x, y)
_, topi=decoder_outputs.topk(1)
decoded_ids=topi.squeeze()
decoded_words=[]
for idx in decoded_ids:
decoded_words.append(dataset.index2word[idx.item()])
return ''.join(decoded_words)
最终模型的输出如图1-28所示。
图1-28 Transformer模型的日期转换输出示例