mirror of https://github.com/hpcaitech/ColossalAI
109 lines
3.9 KiB
Python
109 lines
3.9 KiB
Python
from typing import Optional, Union
|
|
|
|
import loralib as lora
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.nn.functional as F
|
|
|
|
|
|
def compute_approx_kl(log_probs: torch.Tensor,
|
|
log_probs_base: torch.Tensor,
|
|
action_mask: Optional[torch.Tensor] = None) -> torch.Tensor:
|
|
"""
|
|
Compute the approximate KL divergence between two distributions.
|
|
Schulman blog: http://joschu.net/blog/kl-approx.html
|
|
|
|
Args:
|
|
log_probs: Log probabilities of the new distribution.
|
|
log_probs_base: Log probabilities of the base distribution.
|
|
action_mask: Mask for actions.
|
|
"""
|
|
|
|
log_ratio = log_probs - log_probs_base
|
|
approx_kl = (log_ratio.exp() - 1) - log_ratio
|
|
if action_mask is not None:
|
|
approx_kl = masked_mean(approx_kl, action_mask, dim=1)
|
|
return approx_kl
|
|
approx_kl = approx_kl.mean(dim=1)
|
|
return approx_kl
|
|
|
|
|
|
def compute_reward(r: Union[torch.Tensor, float],
|
|
kl_coef: float,
|
|
log_probs: torch.Tensor,
|
|
log_probs_base: torch.Tensor,
|
|
action_mask: Optional[torch.Tensor] = None) -> torch.Tensor:
|
|
if kl_coef <= 0.0:
|
|
return r
|
|
kl = compute_approx_kl(log_probs, log_probs_base, action_mask=action_mask)
|
|
reward = r - kl_coef * kl
|
|
return reward
|
|
|
|
|
|
def log_probs_from_logits(logits: torch.Tensor, labels: torch.Tensor) -> torch.Tensor:
|
|
log_probs = F.log_softmax(logits, dim=-1)
|
|
log_probs_labels = log_probs.gather(dim=-1, index=labels.unsqueeze(-1))
|
|
return log_probs_labels.squeeze(-1)
|
|
|
|
|
|
def calc_action_log_probs(output: torch.Tensor, sequences: torch.LongTensor, num_actions: int) -> torch.Tensor:
|
|
"""Calculate action log probs.
|
|
|
|
Args:
|
|
output (torch.Tensor): Output tensor of Actor.forward.
|
|
sequences (torch.LongTensor): Input sequences.
|
|
num_actions (int): Number of actions.
|
|
|
|
Returns:
|
|
torch.Tensor: Action log probs.
|
|
"""
|
|
logits = output['logits']
|
|
log_probs = log_probs_from_logits(logits[:, :-1, :], sequences[:, 1:])
|
|
return log_probs[:, -num_actions:]
|
|
|
|
|
|
def masked_mean(tensor: torch.Tensor, mask: torch.Tensor, dim: int = 1) -> torch.Tensor:
|
|
tensor = tensor * mask
|
|
tensor = tensor.sum(dim=dim)
|
|
mask_sum = mask.sum(dim=dim)
|
|
mean = tensor / (mask_sum + 1e-8)
|
|
return mean
|
|
|
|
|
|
def masked_normalize(tensor: torch.Tensor, mask: torch.Tensor, dim: int = 1, eps: float = 1e-8) -> torch.Tensor:
|
|
tensor = tensor * mask
|
|
mean = masked_mean(tensor, mask, dim=dim)
|
|
mean_centered = tensor - mean
|
|
var = masked_mean(mean_centered**2, mask, dim=dim)
|
|
return mean_centered * var.clamp(min=eps).rsqrt()
|
|
|
|
|
|
def normalize(tensor: torch.Tensor, dim: int = 0, eps: float = 1e-8) -> torch.Tensor:
|
|
mean = tensor.mean(dim)
|
|
mean_centered = tensor - mean
|
|
var = (mean_centered**2).mean(dim)
|
|
norm = mean_centered * var.clamp(min=eps).rsqrt()
|
|
return norm
|
|
|
|
|
|
def convert_to_lora(model: nn.Module,
|
|
input_size: int,
|
|
output_size: int,
|
|
lora_rank: int = 16,
|
|
lora_alpha: int = 1,
|
|
lora_dropout: float = 0.,
|
|
fan_in_fan_out: bool = False,
|
|
merge_weights: bool = True):
|
|
if lora_rank > min(input_size, output_size):
|
|
raise ValueError(f"LoRA rank {lora_rank} must be less or equal than {min(input_size, output_size)}")
|
|
|
|
for name, module in model.named_modules():
|
|
if isinstance(module, nn.Linear):
|
|
module._modules[name] = lora.Linear(input_size,
|
|
output_size,
|
|
r=lora_rank,
|
|
lora_alpha=lora_alpha,
|
|
lora_dropout=lora_dropout,
|
|
fan_in_fan_out=fan_in_fan_out,
|
|
merge_weights=merge_weights)
|