本篇文章结束时,你将达成什么目标? 你将能够自己动手构建和训练一个大型语言模型(LLM),并跟着我一起写代码。虽然我们要建立的是一个能将任何给定文本从英语翻译成马来语的LLM,你可以轻松地将此LLM架构修改,以适用于其他语言的翻译任务。
大型语言模型(LLM)是这些最流行的AI聊天机器人的核心基础,如ChatGPT、Gemini、MetaAI、Mistral AI等。在每个这样的大型语言模型内部,都有一种名为Transformer的架构。所以,我们将根据这篇著名的论文“Attention Is All You Need”,请参阅https://arxiv.org/abs/1706.03762,首先构建Transformer架构。
《注意力就是你所需要的》论文中的Transformer架构
首先,我们将一块一块地构建变压器模型的所有组件。然后,我们将把这些块组装起来以构建我们的模型。之后,我们将使用从Hugging Face获取的数据集来训练并验证我们的模型。最后,我们将通过在新的文本数据上进行翻译来测试我们的模型。
重要提示:我会一步步编码 transformer 架构中的所有组件,并解释这些组件的定义、意义和实现方式。我还会对需要解释的每一行代码添加注释。这样我相信你能更好地理解编码的整体流程。
一起来敲代码吧!
第一步:导入数据集为了让LLM模型能够从英语翻译成马来语,我们需要使用一个包含源语言(英语)和目标语言(马来语)配对的数据集。因此,我们将使用来自Huggingface的“Helsinki-NLP/opus-100”数据集。这个数据集包含100万个英语-马来语训练数据对,足以保证良好的准确性,并且分别有2000个验证数据和测试数据。数据集已经预先分好,我们不再需要进行数据集划分。
# 步骤1:加载数据并将其分为训练集、验证集和测试集
import os
import math
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, random_split
from pathlib import Path
from datasets import load_dataset
from tqdm import tqdm
# 如果还没安装,请先安装datasets和tokenizers库(例如:!pip install datasets tokenizers)。
os.mkdir("./malaygpt")
os.mkdir("./tokenizer_en")
os.mkdir("./tokenizer_my")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_dataset = load_dataset("Helsinki-NLP/opus-100", "en-ms", split='train')
validation_dataset = load_dataset("Helsinki-NLP/opus-100", "en-ms", split='validation')
# 为了加快训练速度,减少数据集中的数据量
raw_train_dataset, rt_to_skip = random_split(train_dataset, [1500, len(train_dataset)-1500])
raw_validation_dataset, vt_to_skip = random_split(validation_dataset, [50, len(validation_dataset)-50])
步骤 2:创建一个分词工具
Transformer模型不直接处理原始文本,它只处理数字。因此,我们需要做一些转换工作,将原始文本转换为数字。为此,我们将使用一种非常流行的次词级别分词器,称为BPE分词器,在像GPT3这样的模型中非常流行。我们将首先使用在步骤1中准备的训练数据集来训练BPE分词器。流程如下图。
分词过程
训练完成后,分词器会为英语和马来语生成词汇表。词汇表是从语料库数据中提取的一系列唯一词汇。由于我们正在执行翻译任务,因此需要为这两种语言都准备分词工具。BPE 分词器将原始文本映射到词汇表中的词或子词令牌,并为输入原始文本中的每个词返回一个令牌。令牌可以是一个完整的词或子词片段。这是子词分词器相对于其他分词器的优势之一,因为它可以克服 OOV(词汇外)问题。分词器会为每个令牌返回一个唯一的索引或位置ID,这些索引将用于创建词嵌入,如上文所示。
# 步骤2:创建分词器
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace
def get_ds_iterator(raw_train_dataset, lang):
for data in raw_train_dataset:
yield data['translation'][lang]
# 创建源分词器 - 英语
tokenizer_en = Tokenizer(BPE(unk_token="[UNK]"))
trainer_en = BpeTrainer(min_frequency=2, special_tokens=["[PAD]","[UNK]","[CLS]", "[SEP]", "[MASK]"])
# 我们还需要添加一个预分词器,将输入拆分成单词,因为如果没有预分词器,我们可能会得到跨越多个单词的分词:例如,我们可能会得到一个 "there is" 分词,因为这两个词经常出现在一起。
# 使用预分词器将确保没有一个分词大于由预分词器返回的单词。
tokenizer_en.pre_tokenizer = Whitespace()
tokenizer_en.train_from_iterator(get_ds_iterator(raw_train_dataset, "en"), trainer=trainer_en)
tokenizer_en.save("./tokenizer_en/tokenizer_en.json")
# 创建目标分词器 - 印尼语
tokenizer_my = Tokenizer(BPE(unk_token="[UNK]"))
trainer_my = BpeTrainer(min_frequency=2, special_tokens=["[PAD]","[UNK]","[CLS]", "[SEP]", "[MASK]"])
tokenizer_my.pre_tokenizer = Whitespace()
tokenizer_my.train_from_iterator(get_ds_iterator(raw_train_dataset, "ms"), trainer=trainer_my)
tokenizer_my.save("./tokenizer_my/tokenizer_my.json")
tokenizer_en = Tokenizer.from_file("./tokenizer_en/tokenizer_en.json")
tokenizer_my = Tokenizer.from_file("./tokenizer_my/tokenizer_my.json")
source_vocab_size = tokenizer_en.get_vocab_size()
target_vocab_size = tokenizer_my.get_vocab_size()
# 计算整个训练数据集的最大序列长度
max_seq_len_source = 0
max_seq_len_target = 0
for data in raw_train_dataset:
enc_ids = tokenizer_en.encode(data['translation']['en']).ids
dec_ids = tokenizer_my.encode(data['translation']['ms']).ids
max_seq_len_source = max(max_seq_len_source, len(enc_ids))
max_seq_len_target = max(max_seq_len_target, len(dec_ids))
print(f'max_seqlen_source: {max_seq_len_source}') # 99 - 可能不同
print(f'max_seqlen_target: {max_seq_len_target}') # 109 - 可能不同
# 为了使训练过程标准化,我们只需取 max_seq_len_source 并增加 20-50 以覆盖诸如 PAD、CLS、SEP 等特殊分词
max_seq_len = 155
步骤 3:准备数据集和数据加载器
在这一步中,我们将准备源语言和目标语言的数据集用于后续的模型训练和验证。我们将创建一个类,该类接受原始数据集,并定义一个函数,使用源(tokenizer_en)和目标(tokenizer_my)分词器分别对源文本和目标文本进行编码。最后,我们将为训练和验证数据集创建一个DataLoader,该DataLoader将以每次10个样本的批次形式迭代数据集(在我们的例子中,每批包含10个样本)。可以依据数据量和可用计算资源调整批次大小。
# 步骤3:准备数据集和数据加载器
# 将原始数据集转换为能够由模型处理的编码数据集
class EncodeDataset(Dataset):
def __init__(self, raw_dataset, max_seq_len):
super().__init__()
self.raw_dataset = raw_dataset
self.max_seq_len = max_seq_len
def __len__(self):
return len(self.raw_dataset)
def __getitem__(self, index):
# 通过给定的索引值获取包含英语和马来语的单条数据记录。
raw_text = self.raw_dataset[index]
# 将文本按源语言和目标语言分开,稍后用于编码。
source_text = raw_text['translation']['en']
target_text = raw_text['translation']['ms']
# 使用英语标记化器编码源文本,使用马来语标记化器编码目标文本
source_text_encoded = tokenizer_en.encode(source_text).ids
target_text_encoded = tokenizer_my.encode(target_text).ids
# 使用标记化器将CLS、SEP和PAD标记转换为词汇表中的相应索引ID(这两个标记化器的ID是一样的)
CLS_ID = torch.tensor([tokenizer_my.token_to_id("[CLS]")], dtype=torch.int64)
SEP_ID = torch.tensor([tokenizer_my.token_to_id("[SEP]")], dtype=torch.int64)
PAD_ID = torch.tensor([tokenizer_my.token_to_id("[PAD]")], dtype=torch.int64)
# 为了训练模型,每个输入序列的长度应等于最大序列长度。因此,如果长度不等于最大序列长度,则需要添加填充。
num_source_padding = self.max_seq_len - len(source_text_encoded) - 2
num_target_padding = self.max_seq_len - len(target_text_encoded) - 1
encoder_padding = torch.tensor([PAD_ID] * num_source_padding, dtype=torch.int64)
decoder_padding = torch.tensor([PAD_ID] * num_target_padding, dtype=torch.int64)
# 编码器输入的起始标记为CLS_ID,随后是源编码,接着是句子结束标记SEP。
# 为了达到所需的最大序列长度,在末端添加PAD标记。
encoder_input = torch.cat([CLS_ID, torch.tensor(source_text_encoded, dtype=torch.int64), SEP_ID, encoder_padding], dim=0)
# 解码器输入的起始标记为CLS_ID,随后是目标编码。
# 为了达到所需的最大序列长度,在末端添加PAD标记。解码器输入中没有句子结束标记SEP。
decoder_input = torch.cat([CLS_ID, torch.tensor(target_text_encoded, dtype=torch.int64), decoder_padding], dim=0)
# 目标标签在训练期间用于损失计算,比较预测标签和目标标签。
# 目标标签的起始标记为目标编码,随后是实际目标编码。目标标签中没有起始标记CLS。
# 为了达到所需的最大序列长度,在末端添加PAD标记。
target_label = torch.cat([torch.tensor(target_text_encoded, dtype=torch.int64), SEP_ID, decoder_padding], dim=0)
# 在生成自注意力输出之前,将编码器块中的填充值置零。
encoder_mask = (encoder_input != PAD_ID).unsqueeze(0).unsqueeze(0).int()
# 在掩码多头注意力中实施因果掩码来处理这种情况,确保任何位于当前标记之后的标记都会被屏蔽,这意味着这些值将被替换为-无穷大,经过softmax操作后将变为零或接近零。因此,模型将忽略这些值或无法从中学习任何内容。
decoder_mask = (decoder_input != PAD_ID).unsqueeze(0).unsqueeze(0).int() & causal_mask(decoder_input.size(0))
return {
'encoder_input': encoder_input,
'decoder_input': decoder_input,
'target_label': target_label,
'encoder_mask': encoder_mask,
'decoder_mask': decoder_mask,
'source_text': source_text,
'target_text': target_text
}
# 因果掩码确保任何位于当前标记之后的标记都会被屏蔽,这意味着这些值将被替换为-无穷大,经过softmax操作后将变为零或接近零。因此,模型将忽略这些值或无法从中学习任何内容。
def causal_mask(size):
# 创建一个维度为 'size x size' 的平方矩阵,填充为1
mask = torch.triu(torch.ones(1, size, size), diagonal=1).type(torch.int)
return mask == 0
# 创建数据加载器以供模型训练和验证使用
train_ds = EncodeDataset(raw_train_dataset, max_seq_len)
val_ds = EncodeDataset(raw_validation_dataset, max_seq_len)
train_dataloader = DataLoader(train_ds, batch_size=5, shuffle=True)
val_dataloader = DataLoader(val_ds, batch_size=1, shuffle=True)
步骤四:输入嵌入层和位置编码层
输入嵌入层:在步骤2中由分词器生成的token ID序列将被输入到嵌入层。嵌入层将token-id映射到词汇表,并为每个token生成一个512维的嵌入向量。【512维来自注意力机制的论文】。嵌入向量能根据训练数据集捕捉token的语义信息。嵌入向量中的每个维度代表与token相关的某种特征。例如,如果token是一个狗,有些维度可能代表眼睛、嘴巴、腿、高度等特征。如果我们把这种向量画在n维空间中,相似的物体如狗和猫会被定位在彼此附近,而不相似的物体,如学校和家的嵌入向量则会离得更远。
位置编码: Transformer架构的一个优势在于可以并行处理任意数量的输入序列,这大大节省了训练时间,同时也加快了预测速度。然而,一个缺点是在并行处理多个令牌序列时,句子中令牌的位置将不再保持有序。这可能导致由于令牌的位置不同,句子的意义或上下文发生变化。因此,该论文通过实现位置编码方法解决了这一问题。论文建议在每个令牌的512维索引级别上应用两个数学函数(一个是正弦函数,一个是余弦函数)。
sin函数应用于每个偶数维度,而Cosine函数应用于每个奇数维度。最后,得到的位置编码向量将被加到嵌入向量上。现在,我们得到了一个能捕捉词汇意义及位置的嵌入向量。请注意,位置编码在每个序列中都保持不变。
# 步骤4:输入嵌入和位置编码
import torch
import torch.nn as nn
import math
class EmbeddingLayer(nn.Module):
def __init__(self, d_model: int, vocab_size: int):
super().__init__()
self.d_model = d_model
# 使用pytorch的模型嵌入层将token id映射为具有形状(vocab_size, d_model)的嵌入向量
# vocab_size是步骤2中由分词器创建的训练数据的词汇表大小
self.embedding = nn.Embedding(vocab_size, d_model)
def forward(self, input):
# 除了将输入传递给嵌入层外,我们还会额外乘以d_model的平方根来对嵌入层的输出进行归一化处理
embedding_output = self.embedding(input) * math.sqrt(self.d_model)
return embedding_output
class PositionalEncoding(nn.Module):
def __init__(self, d_model: int, max_seq_len: int, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)
pe = torch.zeros(max_seq_len, d_model)
pos = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(pos * div_term)
pe[:, 1::2] = torch.cos(pos * div_term)
# 由于输入句子将以批次形式出现,因此在位置编码张量的维度0处添加了一个额外的维度以便适应批次大小
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, input_embdding):
input_embdding = input_embdding + (self.pe[:, :input_embdding.shape[1], :]).requires_grad_(False) # 为了防止计算梯度,我们将位置编码张量设置为不需要梯度计算
return self.dropout(input_embdding)
第五步:多头注意力机制
就像Transformer是大语言模型的核心一样,自注意力是Transformer架构的核心。
那你为什么要用自注意力呢? 下面我们就通过一个简单的例子来解释一下。
在 句子 1 和 句子 2 中,“bank”这个词显然有不同的含义。然而,在这两个句子中,“bank”这个词的嵌入值是一样的。这不正确。我们希望“bank”这个词的嵌入值能根据句子的语境变化。因此,我们需要一种机制,让嵌入值能够根据句子的整体含义动态变化,以反映出具体的语境含义。自注意力机制可以动态更新嵌入值,以根据句子表示具体的语境。
如果自注意力已经这么厉害了,为什么我们还需要多头注意力机制?下面我们就通过一个例子来看看答案是什么。
在这个例子中,如果我们使用自注意力机制,它可能会只关注句子的一个方面,比如说只是 “什么” 方面,也就是说,它只能捕获 “约翰做了什么?”。然而,像 “何时” 或 “何地” 这样的其他方面,对于模型表现得更好来说也同样重要。因此,这就是多头自注意力机制发挥作用的时候。在多头注意力中,单头嵌入将被分为多个头,这样每个头就可以关注句子的不同方面并相应地学习。这正是我们所追求的。
现在,我们知道为什么我们需要多头注意力。我们来看看多头注意力是如何工作的。我们直接进入正题。
如果你对矩阵乘法很熟悉,那么理解这个机制对你来说会非常简单。我们先来看看整个流程图,然后我会通过逐个细节描述来解释从输入到多头注意力输出的整个流程。
图片来源:https://github.com/hkproj/transformer。从零开始的笔记。
1. 首先,让我们将编码器输入(输入嵌入和位置编码的组合体,我们在步骤4中已经完成了这个操作,)复制3份。我们将它们分别命名为Q、K和V。它们各自都是编码器输入的一个副本。编码器输入形状:(seq_len, d_model),seq_len:最大序列长度,d_model:在这种情况下,嵌入向量的维度是512。
2. 接下来,我们将执行 Q 与权重 W_q、K 与权重 W_k 以及 V 与权重 W_v 的矩阵乘法。每个权重矩阵的维度为 (d_model, d_model)。结果的新的 query、key 和 value 嵌入向量的形状为 (seq_len, d_model)。权重参数将由模型随机初始化,并在模型开始训练时更新。为什么要进行权重矩阵乘法?因为这些是可学习的参数,可以使得 query、key 和 value 嵌入向量具有更好的表示能力。
3. 根据那篇注意力机制的论文,头的数量为8。每个新的查询、键和值嵌入向量会被切分成8个更小的部分。新的嵌入向量形状会变成(seq_len, d_model/num_heads) 或 (seq_len, d_k)。d_k = d_model/num_heads。
4. 每个查询嵌入向量都会和自身以及序列中其他所有嵌入向量(先进行转置)执行点积操作。这个点积操作会得到注意力得分。注意力得分表示该标记与输入序列中其他所有标记的相似程度。得分越高,表示越相似。
- 注意得分将通过除以 (d_k) 的平方根来标准化,以在整个矩阵中保持分值的一致性。但实际上可以除以任何其他数。主要原因在于,随着嵌入向量维度的增加,注意力矩阵中的总体方差也会相应增大。这就是为什么除以 (d_k) 可以平衡这种方差的增加。如果不除以 (d_k),那么对于任何给定的较高注意力得分,softmax 函数会给出非常高的概率值;而较低的注意力得分,softmax 函数则会给出非常低的概率值。这最终会导致模型只关注学习那些高概率值相关的特征,而忽略低概率值相关的特征,从而导致梯度消失。因此,规范化注意力得分矩阵是非常必要的。
- 在执行 softmax 函数之前,如果编码器掩码不为 None,则注意力得分将会与编码器掩码进行矩阵乘法。如果掩码是一个因果掩码,则输入序列中当前嵌入标记之后的所有注意力得分值将会被替换为负无穷大。softmax 函数会将负无穷大转换为接近零的值。因此,模型将不会学习当前标记之后的任何特征。这就是我们如何防止未来的标记影响模型的学习。
5. 接着,应用softmax函数到注意力得分矩阵,输出一个形状为(seq_len, seq_len)的权重矩阵。
6. 这些权重矩阵将与对应的 value 嵌入向量进行矩阵乘法。这将得到 8 个注意力头,每个头的形状为 (seq_len, d_v)。[ d_v = d_model/num_heads ]。
7. 最后,所有的Head将被合并成一个新的Head,其形状为(seq_len, d_model)。这个新的单一Head然后与输出权重矩阵W_o (d_model, d_model) 进行矩阵乘法运算。多头注意力的最终输出不仅表示了单词的上下文意义,还体现了学习输入句子多个方面的能力。
这么说吧,我们就开始写多头注意力块的代码,这部分代码会简单很多,也更短。
# 步骤5:多头注意力
class MultiHeadAttention(nn.Module):
def __init__(self, d_model: int, num_heads: int, dropout_rate: float):
super().__init__()
# 定义dropout以防止过拟合
self.dropout = nn.Dropout(dropout_rate)
self.num_heads = num_heads
assert d_model % num_heads == 0, "d_model 必须能被头的数量整除,否则会抛出异常"
# 每个自注意力头的新维度
self.d_k = d_model // num_heads
# 定义权重矩阵,它们都是可学习参数
self.W_q = nn.Linear(d_model, d_model, bias=False)
self.W_k = nn.Linear(d_model, d_model, bias=False)
self.W_v = nn.Linear(d_model, d_model, bias=False)
self.W_o = nn.Linear(d_model, d_model, bias=False)
def forward(self, q, k, v, encoder_mask):
# 请注意,我们将处理的不仅仅是单个序列,而是批处理的序列
# query、key 和 value 是通过与输入嵌入的对应权重进行矩阵乘法计算得到的
# 形状变化:query(batch_size, seq_len, d_model) 转换为 query(batch_size, seq_len, num_heads, d_k) => query(batch_size, num_heads, seq_len, d_k) [key和value同样适用]
query = self.W_q(q)
key = self.W_k(k)
value = self.W_v(v)
# 将query、key和value划分为多个头,因此新的维度将是d_k
# 形状变化:query(batch_size, seq_len, d_model) => query(batch_size, seq_len, num_heads, d_k) -> query(batch_size, num_heads, seq_len, d_k) [同样适用于key和value]
query = query.view(query.shape[0], query.shape[1], self.num_heads, self.d_k).transpose(1, 2)
key = key.view(key.shape[0], key.shape[1], self.num_heads, self.d_k).transpose(1, 2)
value = value.view(value.shape[0], value.shape[1], self.num_heads, self.d_k).transpose(1, 2)
# :: 自注意力块开始 ::
# 计算注意力分数以确定query与自身及其他序列中嵌入之间的相似性和关系
# 形状变化:query(batch_size, num_heads, seq_len, d_k) @ key(batch_size, num_heads, seq_len, d_k) => attention_score(batch_size, num_heads, seq_len, seq_len)
attention_score = (query @ key.transpose(-2, -1)) / math.sqrt(self.d_k)
# 如果提供了掩码,则需要根据掩码值调整注意力分数
if encoder_mask is not None:
attention_score.masked_fill_(encoder_mask == 0, -1e9)
# Softmax操作用于计算注意力分数的概率分布
# 形状变化:与attention_score相同
attention_score = attention_score.softmax(dim=-1)
if self.dropout is not None:
attention_score = self.dropout(attention_score)
# 自注意力块的最后一步是将注意力权重与value嵌入进行矩阵乘法
# 形状变化:attention_score(batch_size, num_heads, seq_len, seq_len) @ value(batch_size, num_heads, seq_len, d_k) => attention_output(batch_size, num_heads, seq_len, d_k)
attention_output = attention_score @ value
# :: 自注意力块结束 ::
# 现在,所有头将重新连接回单个头
# 形状变化:attention_output(batch_size, num_heads, seq_len, d_k) => attention_output(batch_size, seq_len, num_heads, d_k) => attention_output(batch_size, seq_len, d_model)
attention_output = attention_output.transpose(1, 2).contiguous().view(attention_output.shape[0], -1, self.num_heads * self.d_k)
# 最后,将attention_output与输出权重矩阵进行矩阵乘法以生成最终的多头注意力输出
# 多头输出的形状与嵌入输入相同
# 形状变化:attention_output(batch_size, seq_len, d_model) @ W_o(d_model, d_model) => multihead_output(batch_size, seq_len, d_model)
multihead_output = self.W_o(attention_output)
return multihead_output
步骤 6:前馈网络,层规范化和AddNorm
前馈网络:前馈网络使用深度神经网络在两个线性层(其中 d_model
和 d_ff
的值根据注意力论文设定)来学习嵌入向量的特征,并在第一层线性层的输出上应用ReLU激活函数以引入非线性,并使用 dropout 来进一步防止过拟合。
LayerNorm:我们对嵌入值应用层归一化,以确保嵌入向量中的值分布在网络中保持一致。这确保了学习过程的平稳。我们还将使用额外的学习参数gamma和beta来根据网络需求调整嵌入值的缩放和平移。
AddAndNorm:这包括一个跳过连接和分层归一化(如前所述)。在前向传播过程中,跳过连接确保了早期层中的特征在后期阶段仍然可以被记住,以在计算输出时作出必要的贡献。同样,在反向传播过程中,跳过连接通过减少每个阶段的反向传播次数来防止梯度消失。在编码器中使用两次,在解码器块中使用三次。它从上一层获取输入,先进行归一化处理,然后将其加到上一层的输出上。
# 步骤 6:前馈网络、层归一化和加和归一化
class FeedForward(nn.Module):
def __init__(self, d_model: int, d_ff: int, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)
self.layer_1 = nn.Linear(d_model, d_ff)
self.layer_2 = nn.Linear(d_ff, d_model)
def forward(self, input):
return self.layer_2(self.dropout(torch.relu(self.layer_1(input))))
class LayerNorm(nn.Module):
# def __init__(self, features:int=512, eps: float = 1e-5):
def __init__(self, eps: float = 1e-5):
super().__init__()
# epsilon 是一个非常小的值,用于防止除零问题
self.eps = eps
# 我们引入了额外的可学习参数 gamma 和 beta,用于按需缩放和调整嵌入值
self.gamma = nn.Parameter(torch.ones(512)) # 512(建议与 d_model 的值相同)
self.beta = nn.Parameter(torch.zeros(512))
def forward(self, input):
mean = input.mean(dim = -1, keepdim=True)
std = input.std(dim = -1, keepdim=True)
return self.gamma * (input - mean)/(std + self.eps) + self.beta
class AddAndNorm(nn.Module):
def __init__(self, dropout_rate: float):
super().__init__()
self.dropout = nn.Dropout(dropout_rate)
self.layer_norm = LayerNorm()
def forward(self, input, sub_layer):
return input + self.dropout(sub_layer(self.layer_norm(input)))
步骤 7:编码器模块和编码器
编码器块:编码器块内有两个主要组件:多头注意力和Feedforward。还有两个Add & Norm单元。我们将按照注意力机制论文中的流程,在EncoderBlock类中组装这些组件。根据论文所述,这个编码器块被重复使用了6次。
编码器:我们将再创建一个名为Encoder的类,它会将EncoderBlock堆叠起来,从而输出最终的编码器结果。
# 步骤 7:编码器块和编码器
class EncoderBlock(nn.Module):
def __init__(self, multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float) -> None:
super().__init__()
self.multihead_attention = multihead_attention
self.feed_forward = feed_forward
self.addnorm_1 = AddAndNorm(dropout_rate)
self.addnorm_2 = AddAndNorm(dropout_rate)
def forward(self, encoder_input, encoder_mask):
# 第一个AddAndNorm单元通过跳过连接获取编码器输入,并将其与多头注意力的输出相加
encoder_input = self.addnorm_1(encoder_input, lambda encoder_input: self.multihead_attention(encoder_input, encoder_input, encoder_input, encoder_mask))
# 第二个AddAndNorm单元通过跳过连接获取多头注意力的输出,并将其与前向网络层的输出相加
encoder_input = self.addnorm_2(encoder_input, self.feed_forward)
return encoder_input
class Encoder(nn.Module):
def __init__(self, encoderblocklist: nn.ModuleList) -> None:
super().__init__()
# 由编码器块列表初始化
self.encoderblocklist = encoderblocklist
self.layer_norm = LayerNorm()
def forward(self, encoder_input, encoder_mask):
# 遍历所有编码器块(共6次)
for encoderblock in self.encoderblocklist:
encoder_input = encoderblock(encoder_input, encoder_mask)
# 对最终编码器块的输出进行归一化,并返回。此输出将作为解码器块交叉注意力的键和值
encoder_output = self.layer_norm(encoder_input)
return encoder_output
步骤 8:解码器模块、解码器和投影层
解码器块: 解码器块中有三个主要组件:带掩码的多头注意力、多头注意力和前馈网络。解码器块还包括三个“残差连接和层归一化”单元。我们会按照注意力机制论文中的流程,在DecoderBlock类中组装这些组件。根据论文所述,这个解码器块重复了六次。
Decoder: 我们将创建一个名为Decoder的类,该类会接收一个DecoderBlock列表,将这些块堆叠起来,并输出最终的解码结果。
在解码器块中存在两种类型的多头注意力。第一种是掩码多头注意力。它将解码器输入用作查询、键和值,并使用一个解码器掩码(也称为因果掩码)。因果掩码可防止模型查看序列中后续位置的嵌入。有关其如何工作的详细解释将在步骤3和步骤5中给出。
投影层: 最终的解码器输出会送到投影层。在此层中,解码器输出会先经过一个线性层,嵌入的形状会按照代码部分的指示改变。随后,softmax函数将解码器输出转换为词汇的概率分布,从所有词元中选出概率最高的作为预测输出。
# 步骤 8:解码器块和解码器以及投影层
class DecoderBlock(nn.Module):
# def __init__(self, features: int, self_attention_block: MultiHeadAttention, cross_attention_block: MultiHeadAttention, feed_forward_block: FeedForward, dropout_rate: float) -> None:
def __init__(self, masked_multihead_attention: MultiHeadAttention, cross_multihead_attention: MultiHeadAttention, feed_forward: FeedForward, dropout_rate: float) -> None:
super().__init__()
self.masked_multihead_attention = masked_multihead_attention
self.cross_multihead_attention = cross_multihead_attention
self.feed_forward = feed_forward
self.addnorm_1 = AddNorm(dropout_rate)
self.addnorm_2 = AddNorm(dropout_rate)
self.addnorm_3 = AddNorm(dropout_rate)
def forward(self, decoder_input, encoder_output, encoder_mask, decoder_mask):
# 第一个 AddNorm 单元从跳过链接中获取解码器输入,并将其与 Masked Multi-Head attention 块的输出相加
decoder_input = self.addnorm_1(decoder_input, lambda decoder_input: self.masked_multihead_attention(decoder_input, decoder_input, decoder_input, decoder_mask))
# 第二个 AddNorm 单元从跳过链接中获取 Masked Multi-Head attention 块的输出,并将其与 cross attention 块的输出相加
decoder_input = self.addnorm_2(decoder_input, lambda decoder_input: self.cross_multihead_attention(decoder_input, encoder_output, encoder_output, encoder_mask))
# 第三个 AddNorm 单元从跳过链接中获取 cross attention 块的输出,并将其与前馈层的输出相加
decoder_input = self.addnorm_3(decoder_input, self.feed_forward)
return decoder_input
class Decoder(nn.Module):
# def __init__(self, features: int, layers: nn.ModuleList) -> None:
def __init__(self, decoderblocklist: nn.ModuleList) -> None:
super().__init__()
self.decoderblocklist = decoderblocklist
self.layer_norm = LayerNorm()
def forward(self, decoder_input, encoder_output, encoder_mask, decoder_mask):
for decoderblock in self.decoderblocklist:
decoder_input = decoderblock(decoder_input, encoder_output, encoder_mask, decoder_mask)
decoder_output = self.layer_norm(decoder_input)
return decoder_output
class ProjectionLayer(nn.Module):
def __init__(self, d_model, vocab_size) -> None:
super().__init__()
self.projection_layer = nn.Linear(d_model, vocab_size)
def forward(self, decoder_output) -> None:
# 投影层首先接受解码器输出并将其传递给形状 (d_model, vocab_size) 的线性层
# 形状变化:decoder_output (batch_size, seq_len, d_model) @ 线性层 (d_model, vocab_size) => 输出 (batch_size, seq_len, vocab_size)
output = self.projection_layer(decoder_output)
return output
步骤 9 : 创建并搭建一个Transformer
最后,我们已经完成了变压器架构中所有组件的构建。最后一步是把它们组装起来。
首先,我们创建一个Transformer类(或类Transformer),该类将初始化所有组件类的实例对象。在Transformer类中,我们将首先定义编码函数,该函数负责执行编码部分的所有任务并生成编码输出结果。
接下来,我们定义一个解码器函数,它负责执行变压器解码部分的所有任务,并输出解码结果。
第三步,我们定义一个函数,该函数接收解码器的输出,并将其映射到词汇表以便预测。
现在,Transformer架构已经准备就绪。我们可以通过定义一个如代码所示的函数,该函数需要所有必要参数,来构建我们的翻译大模型。
# 步骤 9:创建和构建 Transformer
class Transformer(nn.Module):
def __init__(self, encoder: Encoder, decoder: Decoder, source_embed: EmbeddingLayer, target_embed: EmbeddingLayer, source_pos: PositionalEncoding, target_pos: PositionalEncoding, projection_layer: ProjectionLayer) -> None:
super().__init__()
self.source_embed = source_embed
self.source_pos = source_pos
self.encoder = encoder
self.target_embed = target_embed
self.target_pos = target_pos
self.decoder = decoder
self.projection_layer = projection_layer
def encode(self, encoder_input, encoder_mask):
encoder_input = self.source_embed(encoder_input)
encoder_input = self.source_pos(encoder_input)
encoder_output = self.encoder(encoder_input, encoder_mask)
return encoder_output
def decode(self, encoder_output, encoder_mask, decoder_input, decoder_mask):
decoder_input = self.target_embed(decoder_input)
decoder_input = self.target_pos(decoder_input)
decoder_output = self.decoder(decoder_input, encoder_output, encoder_mask, decoder_mask)
return decoder_output
def project(self, decoder_output):
return self.projection_layer(decoder_output)
def build_model(source_vocab_size: int, target_vocab_size: int, source_seq_len: int, target_seq_len: int, d_model: int=512, num_blocks: int=6, num_heads: int=8, dropout_rate: float=0.1, d_ff: int=2048) -> Transformer:
# 创建嵌入层
source_embed = EmbeddingLayer(d_model, source_vocab_size)
target_embed = EmbeddingLayer(d_model, target_vocab_size)
# 创建位置编码层
source_pos = PositionalEncoding(d_model, source_seq_len, dropout_rate)
target_pos = PositionalEncoding(d_model, target_seq_len, dropout_rate)
# 创建编码器块列表
encoderblocklist = []
for _ in range(num_blocks):
multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate)
feed_forward = FeedForward(d_model, d_ff, dropout_rate)
encoder_block = EncoderBlock(multihead_attention, feed_forward, dropout_rate)
encoderblocklist.append(encoder_block)
# 创建编码器
encoder = Encoder(nn.ModuleList(encoderblocklist))
# 创建解码器块列表
decoderblocklist = []
for _ in range(num_blocks):
masked_multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate)
cross_multihead_attention = MultiHeadAttention(d_model, num_heads, dropout_rate)
feed_forward = FeedForward(d_model, d_ff, dropout_rate)
decoder_block = DecoderBlock(masked_multihead_attention, cross_multihead_attention, feed_forward, dropout_rate)
decoderblocklist.append(decoder_block)
# 创建解码器
decoder = Decoder(nn.ModuleList(decoderblocklist))
# 创建投影层
projection_layer = ProjectionLayer(d_model, target_vocab_size)
# 现在我们已经初始化了所有所需的部分,可以创建模型了
model = Transformer(encoder, decoder, source_embed, target_embed, source_pos, target_pos, projection_layer)
# 初次使用 Xavier 均匀方法初始化模型参数。一旦开始训练,这些参数将在训练过程中被网络更新
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
# 让我们来构建最终的模型。
model = build_model(tokenizer_en.get_vocab_size(), tokenizer_my.get_vocab_size(), max_seq_len, max_seq_len, d_model=512).to(device)
# 让我们看看刚刚完成的模型架构
print(model)
步骤 10:训练和测试我们构建的大型语言模型
现在是训练我们模型的时候了。训练过程相当简单。我们将使用在第3步创建的训练DataLoader。由于总训练数据集的数量是1百万,我强烈建议在GPU设备上进行训练。我用了大约5小时完成了20个epoch。每个epoch结束后,我们将保存模型的权重以及优化器的状态,这样在训练中断后,可以从之前的状态继续,而不是从头开始,更加方便。
每个epoch之后,我们将使用验证DataLoader进行一次验证。验证数据集包含2000个样本,这个规模相当合适。在验证过程中,我们只需要计算一次编码器的输出,直到解码器输出句子结束符[SEP]。这是因为直到解码器拿到[SEP]标记之前,我们都需要重复使用相同的编码器输出,这样做就没有必要了。
解码器输入会首先使用句子开始标记[CLS]。每次预测后,解码器输入会添加下一个生成的标记,直到遇到句子结束标记[SEP]。最后,投影层会将输出映射到相应的文本表示。
# 步骤 10: malayGPT 的训练和验证
def run_validation(model, validation_ds, tokenizer_en, tokenizer_my, max_seq_len, device, print_msg, global_step):
model.eval()
count = 0
with torch.no_grad():
for batch in validation_ds:
count += 1
encoder_input = batch["encoder_input"].to(device)
encoder_mask = batch["encoder_mask"].to(device)
cls_id = tokenizer_my.token_to_id('[CLS]')
sep_id = tokenizer_my.token_to_id('[SEP]')
# 计算源序列的编码器输出
encoder_output = model.encode(encoder_input, encoder_mask)
# 在预测任务中,解码器的输入应该以 [CLS] 令牌开始
decoder_input = torch.empty(1, 1).fill_(cls_id).type_as(encoder_input).to(device)
# 由于我们需要在每次添加新的输出到解码器输入时,接收到 [SEP] 结束令牌之前持续进行
while True:
# 检查是否达到最大长度
if decoder_input.size(1) == max_seq_len:
break
# 每次在添加新的输出到解码器输入时,重新创建掩码以预测下一个令牌
decoder_mask = causal_mask(decoder_input.size(1)).type_as(encoder_mask).to(device)
# 仅对下一个令牌应用投影
out = model.decode(encoder_output, encoder_mask, decoder_input, decoder_mask)
# 仅对下一个令牌应用投影
prob = model.project(out[:, -1])
# 选择具有最高概率的令牌,这是贪婪搜索的一种实现
_, next_word = torch.max(prob, dim=1)
decoder_input = torch.cat(
[decoder_input, torch.empty(1, 1).type_as(encoder_input).fill_(next_word.item()).to(device)], dim=1
)
# 检查新令牌是否是结束令牌
if next_word == sep_id:
break
# 最终输出是在接收到 [SEP] 结束令牌之前拼接的解码器输入
model_out = decoder_input.squeeze(0)
source_text = batch["source_text"][0]
target_text = batch["target_text"][0]
model_out_text = tokenizer_my.decode(model_out.detach().cpu().numpy())
# 打印源文本、目标文本和模型的预测结果
print_msg('-'*55)
# print_msg(f"{f'SOURCE: ':>12}{source_text}")
# print_msg(f"{f'TARGET: ':>12}{target_text}")
# print_msg(f"{f'PREDICTED: ':>12}{model_out_text}")
print_msg(f'源文本: {source_text}')
print_msg(f'目标文本: {target_text}')
print_msg(f'MalayGPT 预测: {model_out_text}')
if count == 2:
break
def train_model(preload_epoch=None):
# 整个训练和验证循环将运行每次迭代 10 轮
EPOCHS = 10
initial_epoch = 0
global_step = 0
# Adam 是一种常用的优化算法,它会基于计算的梯度更新参数的状态
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4, eps=1e-9)
# 如果 preload_epoch 不为 None,这意味着训练将从最后保存的权重和优化器开始,并从 preload_epoch + 1 开始
if preload_epoch is not None:
model_filename = f"./malaygpt/model_{preload_epoch}.pt"
state = torch.load(model_filename)
model.load_state_dict(state['model_state_dict'])
initial_epoch = state['epoch'] + 1
optimizer.load_state_dict(state['optimizer_state_dict'])
global_step = state['global_step']
# CrossEntropyLoss 损失函数用于计算投影输出与目标标签之间的差异
loss_fn = nn.CrossEntropyLoss(ignore_index=tokenizer_en.token_to_id('[PAD]'), label_smoothing=0.1).to(device)
for epoch in range(initial_epoch, EPOCHS):
# torch.cuda.empty_cache()
model.train()
batch_iterator = tqdm(train_dataloader, desc=f"处理 Epoch {epoch:02d}")
for batch in batch_iterator:
encoder_input = batch['encoder_input'].to(device) # (b, seq_len)
decoder_input = batch['decoder_input'].to(device) # (B, seq_len)
encoder_mask = batch['encoder_mask'].to(device) # (B, 1, 1, seq_len)
decoder_mask = batch['decoder_mask'].to(device) # (B, 1, seq_len, seq_len)
target_label = batch['target_label'].to(device) # (B, seq_len)
# 将张量通过编码器、解码器和投影层
encoder_output = model.encode(encoder_input, encoder_mask) # (B, seq_len, d_model)
decoder_output = model.decode(encoder_output, encoder_mask, decoder_input, decoder_mask) # (B, seq_len, d_model)
projection_output = model.project(decoder_output) # (B, seq_len, vocab_size)
# 使用简单的交叉熵计算损失值
loss = loss_fn(projection_output.view(-1, tokenizer_my.get_vocab_size()), target_label.view(-1))
batch_iterator.set_postfix({"loss": f"{loss.item():6.3f}"})
# 将损失进行反向传播
loss.backward()
# 更新模型权重
optimizer.step()
optimizer.zero_grad(set_to_none=True)
global_step += 1
# 这是在每个训练周期结束时运行的验证部分
run_validation(model, val_dataloader, tokenizer_en, tokenizer_my, max_seq_len, device, lambda msg: batch_iterator.write(msg), global_step)
# 在每个训练周期结束时保存模型
model_filename = f"./malaygpt/model_{epoch}.pt"
torch.save({
'epoch': epoch,
'model_state_dict': model.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
'global_step': global_step
}, model_filename)
# 开始训练我们的模型
train_model(preload_epoch=None)
步骤 11:创建一个函数来使用我们的模型测试新的翻译任务。
我们将我们的翻译函数命名为 malaygpt,这是一个新的通用名称。这个函数接收用户输入的英文文本,并输出马来语翻译。让我们来试一下这个函数。
# 步骤 11:最终测试我们的malayGPT模型来翻译新句子。让我们试一试。
def malaygpt(user_input_text):
# 使用输入文本进行验证
user_input_text = str(user_input_text).strip()
# 获取模型。定义设备、分词器和模型。
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer_en = Tokenizer.from_file("./tokenizer_en/tokenizer_en.json")
tokenizer_my = Tokenizer.from_file("./tokenizer_my/tokenizer_my.json")
# 构建我们的模型
# model = build_model(tokenizer_en.get_vocab_size(), tokenizer_my.get_vocab_size(), max_seq_len, max_seq_len, d_model=512).to(device)
# model = get_model(tokenizer_en.get_vocab_size(), tokenizer_my.get_vocab_size()).to(device)
model = build_model(tokenizer_en.get_vocab_size(), tokenizer_my.get_vocab_size(), max_seq_len, max_seq_len, d_model=512).to(device)
# 加载训练期间保存的特定检查点的模型文件
checkpoint_number = 9 # 对于这次测试,我使用检查点编号10
model_filename = f"./malaygpt/model_{checkpoint_number}.pt"
state = torch.load(model_filename)
model.load_state_dict(state['model_state_dict'])
# 进行推理
model.eval()
with torch.no_grad():
# 预计算编码器输出并在每个生成步骤中重复使用
source_text_encoding = tokenizer_en.encode(user_input_text)
source_text_encoding = torch.cat([
torch.tensor([tokenizer_en.token_to_id('[CLS]')], dtype=torch.int64),
torch.tensor(source_text_encoding.ids, dtype=torch.int64),
torch.tensor([tokenizer_en.token_to_id('[SEP]')], dtype=torch.int64),
torch.tensor([tokenizer_en.token_to_id('[PAD]')] * (max_seq_len - len(source_text_encoding.ids) - 2), dtype=torch.int64)
], dim=0).to(device)
source_mask = (source_text_encoding != tokenizer_en.token_to_id('[PAD]')).unsqueeze(0).unsqueeze(0).int().to(device)
encoder_output = model.encode(source_text_encoding, source_mask)
# 用sos标记初始化解码器输入
decoder_input = torch.empty(1, 1).fill_(tokenizer_my.token_to_id('[CLS]')).type_as(source_text_encoding).to(device)
# 逐字生成翻译
while decoder_input.size(1) < max_seq_len:
# 构建目标掩码并计算输出结果
decoder_mask = torch.triu(torch.ones((1, decoder_input.size(1), decoder_input.size(1))), diagonal=1).type(torch.int).type_as(source_mask).to(device)
out = model.decode(encoder_output, source_mask, decoder_input, decoder_mask)
# 投影下一个词
prob = model.project(out[:, -1])
_, next_word = torch.max(prob, dim=1)
decoder_input = torch.cat([decoder_input, torch.empty(1, 1).type_as(source_text_encoding).fill_(next_word.item()).to(device)], dim=1)
# 打印翻译的词
print(f"{tokenizer_my.decode([next_word.item()])}", end=' ')
# 如果预测到句子结束标记,则退出
if next_word == tokenizer_my.token_to_id('[SEP]'):
break
# 将id转换为tokens
return tokenizer_my.decode(decoder_input[0].tolist())
测试时间到啦!让我们来测试一下翻译吧。
“翻译看起来做得不错。”
这就完了!我非常有信心,你现在能够从头开始使用PyTorch构建你自己的大型语言模型。你还可以在其他语言的数据集上训练这个模型,并在该语言中执行翻译任务。现在,既然你已经学会了从头开始构建自己的Transformer模型,我可以保证,你现在有能力学习并实现大多数市场上可用的大型语言模型应用。
接下来要做什么呢? 我将通过微调Llama 3模型来构建完全功能的应用程序,这是目前市场上最受欢迎的开源LLM模型之一,比如Llama 3。我还会分享全部源代码。
请继续关注我们,谢谢大家的阅读!
链接到Google Colab笔记本,我鼓励你自己尝试搭建,这只需要你花最多10分钟时间运行这个Colab笔记本并亲自运行并查看结果。请务必使用免费的T4 GPU。
我们可以在这里连接:https://www.linkedin.com/in/tamangmilan
参考资料
- 你需要关注的全部 — 论文,Ashish Vaswani, Noam Shazeer 及团队
- Transformer 中注意力机制的详解,3Blue1Brown — YouTube
- 让我们构建 GPT,Andrej Karpathy,YouTube(视频教程)
- https://github.com/hkproj/pytorch-transformer — Umar Jamil(项目地址)
共同學習,寫下你的評論
評論加載中...
作者其他優(yōu)質(zhì)文章