Menu
Home
About
Our Role
Goals
The Team
Roadmap
Tokenomics
How To Buy
Knowledge Base
Contacts
Sitemap & Links
A.I.
Chart
Shop
IMMORTALITY
🏠
⬇️
Attention is all you need
Afrikaans
Shqip
አማርኛ
العربية
Հայերեն
Azərbaycan dili
Euskara
Беларуская мова
বাংলা
Bosanski
Български
Català
Cebuano
Chichewa
简体中文
繁體中文
Corsu
Hrvatski
Čeština
Dansk
Nederlands
English
Esperanto
Eesti
Filipino
Suomi
Français
Frysk
Galego
ქართული
Deutsch
Ελληνικά
ગુજરાતી
Kreyol ayisyen
Harshen Hausa
Ōlelo Hawaiʻi
עִבְרִית
हिन्दी
Hmong
Magyar
Íslenska
Igbo
Bahasa Indonesia
Gaeilge
Italiano
日本語
Basa Jawa
ಕನ್ನಡ
Қазақ тілі
ភាសាខ្មែរ
한국어
كوردی
Кыргызча
ພາສາລາວ
Latin
Latviešu valoda
Lietuvių kalba
Lëtzebuergesch
Македонски јазик
Malagasy
Bahasa Melayu
മലയാളം
Maltese
Te Reo Māori
मराठी
Монгол
ဗမာစာ
नेपाली
Norsk bokmål
پښتو
فارسی
Polski
Português
ਪੰਜਾਬੀ
Română
Русский
Samoan
Gàidhlig
Српски језик
Sesotho
Shona
سنڌي
සිංහල
Slovenčina
Slovenščina
Afsoomaali
Español
Basa Sunda
Kiswahili
Svenska
Тоҷикӣ
தமிழ்
తెలుగు
ไทย
Türkçe
Українська
اردو
O‘zbekcha
Tiếng Việt
Cymraeg
isiXhosa
יידיש
Yorùbá
Zulu
en
New name
B
I
U
S
link
image
code
HTML
list
Show page
Syntax
{pre} import torch import torch.nn as nn import torch.nn.functional as F class MultiHeadAttention(nn.Module): def __init__(self, d_model, n_heads): super(MultiHeadAttention, self).__init__() assert d_model % n_heads == 0, "d_model must be divisible by n_heads" self.d_model = d_model self.n_heads = n_heads self.d_k = d_model // n_heads self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) def forward(self, Q, K, V, mask=None): batch_size = Q.size(0) Q = self.W_q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) K = self.W_k(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) V = self.W_v(V).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32)) if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) attention = F.softmax(scores, dim=-1) output = torch.matmul(attention, V) output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) output = self.W_o(output) return output, attention class PositionwiseFeedForward(nn.Module): def __init__(self, d_model, d_ff): super(PositionwiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, d_ff) self.linear2 = nn.Linear(d_ff, d_model) def forward(self, x): return self.linear2(F.relu(self.linear1(x))) class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super(PositionalEncoding, self).__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:x.size(0), :] return x class EncoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(EncoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) def forward(self, x, mask=None): attn_output, _ = self.self_attn(x, x, x, mask) x = self.norm1(x + self.dropout1(attn_output)) ff_output = self.feed_forward(x) x = self.norm2(x + self.dropout2(ff_output)) return x class DecoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(DecoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.src_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.dropout3 = nn.Dropout(dropout) def forward(self, x, memory, src_mask=None, tgt_mask=None): attn1, _ = self.self_attn(x, x, x, tgt_mask) x = self.norm1(x + self.dropout1(attn1)) attn2, _ = self.src_attn(x, memory, memory, src_mask) x = self.norm2(x + self.dropout2(attn2)) ff_output = self.feed_forward(x) x = self.norm3(x + self.dropout3(ff_output)) return x class Transformer(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, n_layers, dropout=0.1): super(Transformer, self).__init__() self.encoder = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]) self.decoder = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]) self.src_embed = nn.Sequential(nn.Embedding(src_vocab_size, d_model), PositionalEncoding(d_model)) self.tgt_embed = nn.Sequential(nn.Embedding(tgt_vocab_size, d_model), PositionalEncoding(d_model)) self.out = nn.Linear(d_model, tgt_vocab_size) def forward(self, src, tgt, src_mask=None, tgt_mask=None): enc_output = self.src_embed(src) for layer in self.encoder: enc_output = layer(enc_output, src_mask) dec_output = self.tgt_embed(tgt) for layer in self.decoder: dec_output = layer(dec_output, enc_output, src_mask, tgt_mask) output = self.out(dec_output) return output # Example usage src_vocab_size = 1000 tgt_vocab_size = 1000 d_model = 512 n_heads = 8 d_ff = 2048 n_layers = 6 dropout = 0.1 model = Transformer(src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, n_layers, dropout) {/pre} Another version... uses the popular Squad Q&A data set in a directory named ./datasets, train-v2.0.json and dev-v2.0.json More verbatim version to the model described in Attention is all you need paper {pre} {pre} import torch import torch.nn as nn import torch.nn.functional as F class MultiHeadAttention(nn.Module): def __init__(self, d_model, n_heads): super(MultiHeadAttention, self).__init__() assert d_model % n_heads == 0, "d_model must be divisible by n_heads" self.d_model = d_model self.n_heads = n_heads self.d_k = d_model // n_heads self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) def forward(self, Q, K, V, mask=None): batch_size = Q.size(0) Q = self.W_q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) K = self.W_k(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) V = self.W_v(V).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) scores = torch.matmul(Q, K.transpose(-2, -1)) / torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32)) if mask is not None: scores = scores.masked_fill(mask == 0, -1e9) attention = F.softmax(scores, dim=-1) output = torch.matmul(attention, V) output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) output = self.W_o(output) return output, attention class PositionwiseFeedForward(nn.Module): def __init__(self, d_model, d_ff): super(PositionwiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, d_ff) self.linear2 = nn.Linear(d_ff, d_model) def forward(self, x): return self.linear2(F.relu(self.linear1(x))) class PositionalEncoding(nn.Module): def __init__(self, d_model, max_len=5000): super(PositionalEncoding, self).__init__() pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) self.register_buffer('pe', pe) def forward(self, x): x = x + self.pe[:x.size(0), :] return x class EncoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(EncoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) def forward(self, x, mask=None): attn_output, _ = self.self_attn(x, x, x, mask) x = self.norm1(x + self.dropout1(attn_output)) ff_output = self.feed_forward(x) x = self.norm2(x + self.dropout2(ff_output)) return x class DecoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(DecoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.src_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.dropout3 = nn.Dropout(dropout) def forward(self, x, memory, src_mask=None, tgt_mask=None): attn1, _ = self.self_attn(x, x, x, tgt_mask) x = self.norm1(x + self.dropout1(attn1)) attn2, _ = self.src_attn(x, memory, memory, src_mask) x = self.norm2(x + self.dropout2(attn2)) ff_output = self.feed_forward(x) x = self.norm3(x + self.dropout3(ff_output)) return x class Transformer(nn.Module): def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, n_layers, dropout=0.1): super(Transformer, self).__init__() self.encoder = nn.ModuleList([EncoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]) self.decoder = nn.ModuleList([DecoderLayer(d_model, n_heads, d_ff, dropout) for _ in range(n_layers)]) self.src_embed = nn.Sequential(nn.Embedding(src_vocab_size, d_model), PositionalEncoding(d_model)) self.tgt_embed = nn.Sequential(nn.Embedding(tgt_vocab_size, d_model), PositionalEncoding(d_model)) self.out = nn.Linear(d_model, tgt_vocab_size) def forward(self, src, tgt, src_mask=None, tgt_mask=None): enc_output = self.src_embed(src) for layer in self.encoder: enc_output = layer(enc_output, src_mask) dec_output = self.tgt_embed(tgt) for layer in self.decoder: dec_output = layer(dec_output, enc_output, src_mask, tgt_mask) output = self.out(dec_output) return output # Example usage src_vocab_size = 1000 tgt_vocab_size = 1000 d_model = 512 n_heads = 8 d_ff = 2048 n_layers = 6 dropout = 0.1 model = Transformer(src_vocab_size, tgt_vocab_size, d_model, n_heads, d_ff, n_layers, dropout) {/pre} Another version, very verbatim implementation... uses the popular Squad Q&A data set in a directory named ./datasets, train-v2.0.json and dev-v2.0.json {pre} import os import json import math import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, Dataset from transformers import BertTokenizer, squad_convert_examples_to_features, SquadV2Processor from tqdm import tqdm def scaled_dot_product_attention(q, k, v, mask=None): matmul_qk = torch.matmul(q, k.transpose(-2, -1)) d_k = q.size(-1) scaled_attention_logits = matmul_qk / torch.sqrt(torch.tensor(d_k, dtype=torch.float32)) if mask is not None: scaled_attention_logits = scaled_attention_logits.masked_fill(mask == 0, float('-inf')) attention_weights = torch.softmax(scaled_attention_logits, dim=-1) output = torch.matmul(attention_weights, v) return output, attention_weights class MultiHeadAttention(nn.Module): def __init__(self, d_model, n_heads): super(MultiHeadAttention, self).__init__() assert d_model % n_heads == 0, "d_model must be divisible by n_heads" self.d_model = d_model self.n_heads = n_heads self.d_k = d_model // n_heads self.W_q = nn.Linear(d_model, d_model) self.W_k = nn.Linear(d_model, d_model) self.W_v = nn.Linear(d_model, d_model) self.W_o = nn.Linear(d_model, d_model) def forward(self, Q, K, V, mask=None): batch_size = Q.size(0) Q = self.W_q(Q).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) K = self.W_k(K).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) V = self.W_v(V).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2) output, _ = scaled_dot_product_attention(Q, K, V, mask) output = output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) output = self.W_o(output) return output class PositionwiseFeedForward(nn.Module): def __init__(self, d_model, d_ff): super(PositionwiseFeedForward, self).__init__() self.linear1 = nn.Linear(d_model, d_ff) self.linear2 = nn.Linear(d_ff, d_model) def forward(self, x): return self.linear2(torch.relu(self.linear1(x))) class EncoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(EncoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) def forward(self, x, mask=None): attn_output = self.self_attn(x, x, x, mask) x = self.norm1(x + self.dropout1(attn_output)) ff_output = self.feed_forward(x) x = self.norm2(x + self.dropout2(ff_output)) return x class DecoderLayer(nn.Module): def __init__(self, d_model, n_heads, d_ff, dropout=0.1): super(DecoderLayer, self).__init__() self.self_attn = MultiHeadAttention(d_model, n_heads) self.cross_attn = MultiHeadAttention(d_model, n_heads) self.feed_forward = PositionwiseFeedForward(d_model, d_ff) self.norm1 = nn.LayerNorm(d_model) self.norm2 = nn.LayerNorm(d_model) self.norm3 = nn.LayerNorm(d_model) self.dropout1 = nn.Dropout(dropout) self.dropout2 = nn.Dropout(dropout) self.dropout3 = nn.Dropout(dropout) def forward(self, x, enc_output, src_mask=None, tgt_mask=None): attn_output = self.self_attn(x, x, x, tgt_mask) x = self.norm1(x + self.dropout1(attn_output)) attn_output = self.cross_attn(x, enc_output, enc_output, src_mask) x = self.norm2(x + self.dropout2(attn_output)) ff_output = self.feed_forward(x) x = self.norm3(x + self.dropout3(ff_output)) return x class Transformer(nn.Module): def __init__(self, vocab_size, d_model, nhead, num_encoder_layers, num_decoder_layers, dim_feedforward, dropout=0.1): super(Transformer, self).__init__() self.embedding = nn.Embedding(vocab_size, d_model) self.pos_encoder = self.generate_positional_encoding(d_model, 512) # Max sequence length of 512 self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_encoder_layers)]) self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, nhead, dim_feedforward, dropout) for _ in range(num_decoder_layers)]) self.linear_start = nn.Linear(d_model, 1) self.linear_end = nn.Linear(d_model, 1) def generate_positional_encoding(self, d_model, max_len): pe = torch.zeros(max_len, d_model) position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model)) pe[:, 0::2] = torch.sin(position * div_term) pe[:, 1::2] = torch.cos(position * div_term) pe = pe.unsqueeze(0).transpose(0, 1) return pe def forward(self, input_ids, attention_mask): seq_length = input_ids.size(1) pos = torch.arange(0, seq_length, dtype=torch.long, device=input_ids.device).unsqueeze(0).expand_as(input_ids) enc_output = self.embedding(input_ids) + self.pos_encoder[:seq_length, :].squeeze(1).to(input_ids.device) # Create encoder self-attention mask src_mask = attention_mask.unsqueeze(1).unsqueeze(2) for layer in self.encoder_layers: enc_output = layer(enc_output, src_mask) # Use the same input for decoder (this is a simplification, you might want to modify this for your specific use case) dec_output = enc_output # Create decoder self-attention mask (causal mask) tgt_mask = self.generate_square_subsequent_mask(seq_length).to(input_ids.device) for layer in self.decoder_layers: dec_output = layer(dec_output, enc_output, src_mask, tgt_mask) start_logits = self.linear_start(dec_output).squeeze(-1) end_logits = self.linear_end(dec_output).squeeze(-1) return start_logits, end_logits def generate_square_subsequent_mask(self, sz): mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1) mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0)) return mask class SquadDataset(Dataset): def __init__(self, features): self.features = features def __len__(self): return len(self.features) def __getitem__(self, idx): feature = self.features[idx] return { 'input_ids': torch.tensor(feature.input_ids, dtype=torch.long), 'attention_mask': torch.tensor(feature.attention_mask, dtype=torch.long), 'token_type_ids': torch.tensor(feature.token_type_ids, dtype=torch.long), 'start_positions': torch.tensor(feature.start_position, dtype=torch.long), 'end_positions': torch.tensor(feature.end_position, dtype=torch.long), } def train_epoch(model, dataloader, optimizer, scheduler, device): model.train() total_loss = 0 for batch in tqdm(dataloader, desc="Training"): input_ids, attention_mask, token_type_ids, start_positions, end_positions = [b.to(device) for b in batch] optimizer.zero_grad() start_logits, end_logits = model(input_ids, attention_mask) loss_fct = nn.CrossEntropyLoss() start_loss = loss_fct(start_logits, start_positions) end_loss = loss_fct(end_logits, end_positions) loss = (start_loss + end_loss) / 2 loss.backward() optimizer.step() scheduler.step() total_loss += loss.item() return total_loss / len(dataloader) def evaluate(model, dataloader, device): model.eval() total_loss = 0 with torch.no_grad(): for batch in tqdm(dataloader, desc="Evaluating"): input_ids, attention_mask, token_type_ids, start_positions, end_positions = [b.to(device) for b in batch] decoder_input_ids = input_ids[:, :-1] src_mask = attention_mask.unsqueeze(1).unsqueeze(2) tgt_mask = attention_mask.unsqueeze(1).unsqueeze(2)[:, :, :-1, :-1] start_logits, end_logits = model(input_ids, decoder_input_ids, src_mask, tgt_mask) loss_fct = nn.CrossEntropyLoss() start_loss = loss_fct(start_logits, start_positions) end_loss = loss_fct(end_logits, end_positions) loss = (start_loss + end_loss) / 2 total_loss += loss.item() return total_loss / len(dataloader) def get_cosine_schedule_with_warmup(optimizer, num_warmup_steps, num_training_steps, num_cycles=0.5): def lr_lambda(current_step): if current_step < num_warmup_steps: return float(current_step) / float(max(1, num_warmup_steps)) progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps)) return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))) return torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda) def custom_collate(batch): input_ids = torch.stack([item['input_ids'] for item in batch]) attention_mask = torch.stack([item['attention_mask'] for item in batch]) token_type_ids = torch.stack([item['token_type_ids'] for item in batch]) start_positions = torch.stack([item['start_positions'] for item in batch]) end_positions = torch.stack([item['end_positions'] for item in batch]) return input_ids, attention_mask, token_type_ids, start_positions, end_positions def main(): squad_data_dir = 'datasets' train_file = 'train-v2.0.json' dev_file = 'dev-v2.0.json' processor = SquadV2Processor() tokenizer = BertTokenizer.from_pretrained('bert-base-uncased') # Load and process training examples train_examples = processor.get_train_examples(squad_data_dir, filename=train_file) train_features = squad_convert_examples_to_features( examples=train_examples, tokenizer=tokenizer, max_seq_length=384, doc_stride=128, max_query_length=64, is_training=True, return_dataset=False, # Changed back to False to get the list of features ) train_dataset = SquadDataset(train_features) train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=custom_collate) # Load and process validation examples val_examples = processor.get_dev_examples(squad_data_dir, filename=dev_file) val_features = squad_convert_examples_to_features( examples=val_examples, tokenizer=tokenizer, max_seq_length=384, doc_stride=128, max_query_length=64, is_training=False, return_dataset=False, # Changed back to False to get the list of features ) val_dataset = SquadDataset(val_features) val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=False, collate_fn=custom_collate) model = Transformer(vocab_size=30522, d_model=768, nhead=12, num_encoder_layers=6, num_decoder_layers=6, dim_feedforward=2048) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model.to(device) optimizer = optim.Adam(model.parameters(), lr=5e-5, betas=(0.9, 0.98), eps=1e-9) scheduler = get_cosine_schedule_with_warmup(optimizer, num_warmup_steps=4000, num_training_steps=100000) for epoch in range(3): print(f"Epoch {epoch + 1}") train_loss = train_epoch(model, train_dataloader, optimizer, scheduler, device) print(f"Train Loss: {train_loss}") val_loss = evaluate(model, val_dataloader, device) print(f"Validation Loss: {val_loss}") # Save the model torch.save(model.state_dict(), 'transformer_squad.pt') if __name__ == "__main__": main() {/pre}
Password
Summary of changes
📜
⏱️
⬆️