Menu
Home
About
Our Role
Goals
The Team
Roadmap
Tokenomics
How To Buy
Knowledge Base
Contacts
Sitemap & Links
A.I.
Chart
Shop
IMMORTALITY
🏠
⬇️
Attention is all you need
New name
B
I
U
S
link
image
code
HTML
list
Show page
Syntax
{pre} import torch import torch.nn as nn import torch.nn.functional as F class MultiHeadAttention(nn.Module): def __init__(self, d_model, n_heads): super(MultiHeadAttention, self).__init__() assert d_model % n_heads == 0, "d_model must be divisible by n_heads" self.d_model = d_model self.n_heads = n_heads self.d_k = d_model // n_heads self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) def forward(self, Q, K, V, mask=None): batch_size = Q.size(0) Q = self.W_q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) K = self.W_k(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) V = self.W_v(V).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32)) if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) attention = F.softmax(scores, dim=-1) output = torch.matmul(attention, V) output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) output = self.W_o(output) return output, attention class PositionwiseFeedForward(nn.Module): def __init__(self, d_model, d_ff): super(PositionwiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, d_ff) self.linear2 = nn.Linear(d_ff, d_model) def forward(self, x): return self.linear2(F.relu(self.linear1(x))) class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super(PositionalEncoding, self).__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:x.size(0), :] return x class EncoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(EncoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) def forward(self, x, mask=None): attn_output, _ = self.self_attn(x, x, x, mask) x = self.norm1(x + self.dropout1(attn_output)) ff_output = self.feed_forward(x) x = self.norm2(x + self.dropout2(ff_output)) return x class DecoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(DecoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.src_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.dropout3 = nn.Dropout(dropout) def forward(self, x, memory, src_mask=None, tgt_mask=None): attn1, _ = self.self_attn(x, x, x, tgt_mask) x = self.norm1(x + self.dropout1(attn1)) attn2, _ = self.src_attn(x, memory, memory, src_mask) x = self.norm2(x + self.dropout2(attn2)) ff_output = self.feed_forward(x) x = self.norm3(x + self.dropout3(ff_output)) return x class Transformer(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, n_layers, dropout=0.1): super(Transformer, self).__init__() self.encoder = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]) self.decoder = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]) self.src_embed = nn.Sequential(nn.Embedding(src_vocab_size, d_model), PositionalEncoding(d_model)) self.tgt_embed = nn.Sequential(nn.Embedding(tgt_vocab_size, d_model), PositionalEncoding(d_model)) self.out = nn.Linear(d_model, tgt_vocab_size) def forward(self, src, tgt, src_mask=None, tgt_mask=None): enc_output = self.src_embed(src) for layer in self.encoder: enc_output = layer(enc_output, src_mask) dec_output = self.tgt_embed(tgt) for layer in self.decoder: dec_output = layer(dec_output, enc_output, src_mask, tgt_mask) output = self.out(dec_output) return output # Example usage src_vocab_size = 1000 tgt_vocab_size = 1000 d_model = 512 n_heads = 8 d_ff = 2048 n_layers = 6 dropout = 0.1 model = Transformer(src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, n_layers, dropout) {/pre} Another version... uses the popular Squad Q&A data set in a directory named ./datasets, train-v2.0.json and dev-v2.0.json More verbatim version to the model described in Attention is all you need paper {pre} {pre} import torch import torch.nn as nn import torch.nn.functional as F class MultiHeadAttention(nn.Module): def __init__(self, d_model, n_heads): super(MultiHeadAttention, self).__init__() assert d_model % n_heads == 0, "d_model must be divisible by n_heads" self.d_model = d_model self.n_heads = n_heads self.d_k = d_model // n_heads self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) def forward(self, Q, K, V, mask=None): batch_size = Q.size(0) Q = self.W_q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) K = self.W_k(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) V = self.W_v(V).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32)) if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) attention = F.softmax(scores, dim=-1) output = torch.matmul(attention, V) output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) output = self.W_o(output) return output, attention class PositionwiseFeedForward(nn.Module): def __init__(self, d_model, d_ff): super(PositionwiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, d_ff) self.linear2 = nn.Linear(d_ff, d_model) def forward(self, x): return self.linear2(F.relu(self.linear1(x))) class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super(PositionalEncoding, self).__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:x.size(0), :] return x class EncoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(EncoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) def forward(self, x, mask=None): attn_output, _ = self.self_attn(x, x, x, mask) x = self.norm1(x + self.dropout1(attn_output)) ff_output = self.feed_forward(x) x = self.norm2(x + self.dropout2(ff_output)) return x class DecoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(DecoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.src_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.dropout3 = nn.Dropout(dropout) def forward(self, x, memory, src_mask=None, tgt_mask=None): attn1, _ = self.self_attn(x, x, x, tgt_mask) x = self.norm1(x + self.dropout1(attn1)) attn2, _ = self.src_attn(x, memory, memory, src_mask) x = self.norm2(x + self.dropout2(attn2)) ff_output = self.feed_forward(x) x = self.norm3(x + self.dropout3(ff_output)) return x class Transformer(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, n_layers, dropout=0.1): super(Transformer, self).__init__() self.encoder = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]) self.decoder = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]) self.src_embed = nn.Sequential(nn.Embedding(src_vocab_size, d_model), PositionalEncoding(d_model)) self.tgt_embed = nn.Sequential(nn.Embedding(tgt_vocab_size, d_model), PositionalEncoding(d_model)) self.out = nn.Linear(d_model, tgt_vocab_size) def forward(self, src, tgt, src_mask=None, tgt_mask=None): enc_output = self.src_embed(src) for layer in self.encoder: enc_output = layer(enc_output, src_mask) dec_output = self.tgt_embed(tgt) for layer in self.decoder: dec_output = layer(dec_output, enc_output, src_mask, tgt_mask) output = self.out(dec_output) return output # Example usage src_vocab_size = 1000 tgt_vocab_size = 1000 d_model = 512 n_heads = 8 d_ff = 2048 n_layers = 6 dropout = 0.1 model = Transformer(src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, n_layers, dropout) {/pre} Another version, very verbatim implementation... uses the popular Squad Q&A data set in a directory named ./datasets, train-v2.0.json and dev-v2.0.json {pre} import os import json import math import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from transformers import BertTokenizer, squad_convert_examples_to_features, SquadV2Processor from tqdm import tqdm def scaled_dot_product_attention(q, k, v, mask=None): matmul_qk = torch.matmul(q, k.transpose(-2, -1)) d_k = q.size(-1) scaled_attention_logits = matmul_qk / torch.sqrt(torch.tensor(d_k, dtype=torch.float32)) if mask is not None: scaled_attention_logits = scaled_attention_logits.masked_fill(mask == 0, float('-inf')) attention_weights = torch.softmax(scaled_attention_logits, dim=-1) output = torch.matmul(attention_weights, v) return output, attention_weights class MultiHeadAttention(nn.Module): def __init__(self, d_model, n_heads): super(MultiHeadAttention, self).__init__() assert d_model % n_heads == 0, "d_model must be divisible by n_heads" self.d_model = d_model self.n_heads = n_heads self.d_k = d_model // n_heads self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) def forward(self, Q, K, V, mask=None): batch_size = Q.size(0) Q = self.W_q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) K = self.W_k(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) V = self.W_v(V).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) output, _ = scaled_dot_product_attention(Q, K, V, mask) output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) output = self.W_o(output) return output class PositionwiseFeedForward(nn.Module): def __init__(self, d_model, d_ff): super(PositionwiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, d_ff) self.linear2 = nn.Linear(d_ff, d_model) def forward(self, x): return self.linear2(torch.relu(self.linear1(x))) class EncoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(EncoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) def forward(self, x, mask=None): attn_output = self.self_attn(x, x, x, mask) x = self.norm1(x + self.dropout1(attn_output)) ff_output = self.feed_forward(x) x = self.norm2(x + self.dropout2(ff_output)) return x class DecoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(DecoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.cross_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.dropout3 = nn.Dropout(dropout) def forward(self, x, enc_output, src_mask=None, tgt_mask=None): attn_output = self.self_attn(x, x, x, tgt_mask) x = self.norm1(x + self.dropout1(attn_output)) attn_output = self.cross_attn(x, enc_output, enc_output, src_mask) x = self.norm2(x + self.dropout2(attn_output)) ff_output = self.feed_forward(x) x = self.norm3(x + self.dropout3(ff_output)) return x class Transformer(nn.Module): def __init__(self, vocab_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout=0.1): super(Transformer, self).__init__() self.embedding = nn.Embedding(vocab_size, d_model) self.pos_encoder = self.generate_positional_encoding(d_model, 512) # Max sequence length of 512 self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_encoder_layers)]) self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_decoder_layers)]) self.linear_start = nn.Linear(d_model, 1) self.linear_end = nn.Linear(d_model, 1) def generate_positional_encoding(self, d_model, max_len): pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) return pe def forward(self, input_ids, attention_mask): seq_length = input_ids.size(1) pos = torch.arange(0, seq_length, dtype=torch.long, device=input_ids.device).unsqueeze(0).expand_as(input_ids) enc_output = self.embedding(input_ids) + self.pos_encoder[:seq_length, :].squeeze(1).to(input_ids.device) # Create encoder self-attention mask src_mask = attention_mask.unsqueeze(1).unsqueeze(2) for layer in self.encoder_layers: enc_output = layer(enc_output, src_mask) # Use the same input for decoder (this is a simplification, you might want to modify this for your specific use case) dec_output = enc_output # Create decoder self-attention mask (causal mask) tgt_mask = self.generate_square_subsequent_mask(seq_length).to(input_ids.device) for layer in self.decoder_layers: dec_output = layer(dec_output, enc_output, src_mask, tgt_mask) start_logits = self.linear_start(dec_output).squeeze(-1) end_logits = self.linear_end(dec_output).squeeze(-1) return start_logits, end_logits def generate_square_subsequent_mask(self, sz): mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1) mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) return mask class SquadDataset(Dataset): def __init__(self, features): self.features = features def __len__(self): return len(self.features) def __getitem__(self, idx): feature = self.features[idx] return { 'input_ids': torch.tensor(feature.input_ids, dtype=torch.long), 'attention_mask': torch.tensor(feature.attention_mask, dtype=torch.long), 'token_type_ids': torch.tensor(feature.token_type_ids, dtype=torch.long), 'start_positions': torch.tensor(feature.start_position, dtype=torch.long), 'end_positions': torch.tensor(feature.end_position, dtype=torch.long), } def train_epoch(model, dataloader, optimizer, scheduler, device): model.train() total_loss = 0 for batch in tqdm(dataloader, desc="Training"): input_ids, attention_mask, token_type_ids, start_positions, end_positions = [b.to(device) for b in batch] optimizer.zero_grad() start_logits, end_logits = model(input_ids, attention_mask) loss_fct = nn.CrossEntropyLoss() start_loss = loss_fct(start_logits, start_positions) end_loss = loss_fct(end_logits, end_positions) loss = (start_loss + end_loss) / 2 loss.backward() optimizer.step() scheduler.step() total_loss += loss.item() return total_loss / len(dataloader) def evaluate(model, dataloader, device): model.eval() total_loss = 0 with torch.no_grad(): for batch in tqdm(dataloader, desc="Evaluating"): input_ids, attention_mask, token_type_ids, start_positions, end_positions = [b.to(device) for b in batch] decoder_input_ids = input_ids[:, :-1] src_mask = attention_mask.unsqueeze(1).unsqueeze(2) tgt_mask = attention_mask.unsqueeze(1).unsqueeze(2)[:, :, :-1, :-1] start_logits, end_logits = model(input_ids, decoder_input_ids, src_mask, tgt_mask) loss_fct = nn.CrossEntropyLoss() start_loss = loss_fct(start_logits, start_positions) end_loss = loss_fct(end_logits, end_positions) loss = (start_loss + end_loss) / 2 total_loss += loss.item() return total_loss / len(dataloader) def get_cosine_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps, num_cycles=0.5): def lr_lambda(current_step): if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps)) return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))) return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda) def custom_collate(batch): input_ids = torch.stack([item['input_ids'] for item in batch]) attention_mask = torch.stack([item['attention_mask'] for item in batch]) token_type_ids = torch.stack([item['token_type_ids'] for item in batch]) start_positions = torch.stack([item['start_positions'] for item in batch]) end_positions = torch.stack([item['end_positions'] for item in batch]) return input_ids, attention_mask, token_type_ids, start_positions, end_positions def main(): squad_data_dir = 'datasets' train_file = 'train-v2.0.json' dev_file = 'dev-v2.0.json' processor = SquadV2Processor() tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') # Load and process training examples train_examples = processor.get_train_examples(squad_data_dir, filename=train_file) train_features = squad_convert_examples_to_features( examples=train_examples, tokenizer=tokenizer, max_seq_length=384, doc_stride=128, max_query_length=64, is_training=True, return_dataset=False, # Changed back to False to get the list of features ) train_dataset = SquadDataset(train_features) train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=custom_collate) # Load and process validation examples val_examples = processor.get_dev_examples(squad_data_dir, filename=dev_file) val_features = squad_convert_examples_to_features( examples=val_examples, tokenizer=tokenizer, max_seq_length=384, doc_stride=128, max_query_length=64, is_training=False, return_dataset=False, # Changed back to False to get the list of features ) val_dataset = SquadDataset(val_features) val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False, collate_fn=custom_collate) model = Transformer(vocab_size=30522, d_model=768, nhead=12, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model.to(device) optimizer = optim.Adam(model.parameters(), lr=5e-5, betas=(0.9, 0.98), eps=1e-9) scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=4000, num_training_steps=100000) for epoch in range(3): print(f"Epoch {epoch + 1}") train_loss = train_epoch(model, train_dataloader, optimizer, scheduler, device) print(f"Train Loss: {train_loss}") val_loss = evaluate(model, val_dataloader, device) print(f"Validation Loss: {val_loss}") # Save the model torch.save(model.state_dict(), 'transformer_squad.pt') if __name__ == "__main__": main() {/pre}
Password
Summary of changes
📜
⏱️
⬆️