import torch
import numpy as np
class MyDataset(torch.utils.data.Dataset):
def __init__(self, length):
self.length = length
def __len__(self):
return self.length
def __getitem__(self, idx):
return np.array([-idx*2, -idx*2])
class MySampler(torch.utils.data.Sampler):
def __init__(self, batch_nums):
self.batch_nums = batch_nums
def __iter__(self):
return self.foo()
def foo(self):
for i in range(self.batch_nums):
yield [i, i+1]
# def __len__(self):
# return self.batch_nums
def collect(samples):
print("passed to collect", samples)
return samples
dl = torch.utils.data.DataLoader(MyDataset(10),
batch_sampler=MySampler(20), collate_fn=collect)
for x in dl:
print(x)
AutoGrad、计算图、自定义算子
自定义算子
例子1:自己实现二维卷积
import torch
from torch.autograd.function import once_differentiable
import torch.nn.functional as F
def convolution_backward(grad_out, X, weight):
grad_input = F.conv2d(X.transpose(0, 1), grad_out.transpose(0, 1)).transpose(0, 1)
grad_X = F.conv_transpose2d(grad_out, weight)
return grad_X, grad_input
class Conv2D(torch.autograd.Function):
@staticmethod
def forward(ctx, X, weight):
ctx.save_for_backward(X, weight) # 这是torch.autograd.Function的方法, 保存数据供反向求导使用
return F.conv2d(X, weight)
# Use @once_differentiable by default unless we intend to double backward
@staticmethod
@once_differentiable
def backward(ctx, grad_out):
X, weight = ctx.saved_tensors
return convolution_backward(grad_out, X, weight)
weight = torch.rand(5, 3, 3, 3, requires_grad=True, dtype=torch.double)
X = torch.rand(10, 3, 7, 7, requires_grad=True, dtype=torch.double)
torch.autograd.gradcheck(Conv2D.apply, (X, weight)) # 梯度检查
梯度惩罚
for epoch in epochs:
for input, target in data:
optimizer.zero_grad()
output = model(input)
loss = loss_fn(output, target)
# Creates gradients
grad_params = torch.autograd.grad(outputs=loss,
inputs=model.parameters(),
create_graph=True)
# Computes the penalty term and adds it to the loss
grad_norm = 0
for grad in grad_params:
grad_norm += grad.pow(2).sum()
grad_norm = grad_norm.sqrt()
loss = loss + grad_norm
loss.backward()
# clip gradients here, if desired
optimizer.step()
# model is a instanse of torch.nn.Module
g0, g1, g2 = [], [], [] # optimizer parameter groups
for v in model.modules():
if hasattr(v, 'bias') and isinstance(v.bias, nn.Parameter):
g2.append(v.bias)
if isinstance(v, nn.BatchNorm2d):
g0.append(v.weight)
elif hasattr(v, 'weight') and isinstance(v.weight, nn.Parameter):
g1.append(v.weight)
optimizer = SGD(g0, lr=hyp['lr0'], momentum=hyp['momentum'], nesterov=True)
optimizer.add_param_group({'params': g1, 'weight_decay': hyp['weight_decay']})
optimizer.add_param_group({'params': g2})
# Pytorch 1.9.1 源码(有删减注释1) torch/nn/utils/clip_grad.py
def clip_grad_norm_(
parameters: _tensor_or_tensors, max_norm: float, norm_type: float = 2.0,
error_if_nonfinite: bool = False) -> torch.Tensor:
r"""
The norm is computed over all gradients together, as if they were
concatenated into a single vector. Gradients are modified in-place.
Returns:
Total norm of the parameters (viewed as a single vector).
"""
if isinstance(parameters, torch.Tensor):
parameters = [parameters]
parameters = [p for p in parameters if p.grad is not None]
max_norm = float(max_norm)
norm_type = float(norm_type)
if len(parameters) == 0:
return torch.tensor(0.)
device = parameters[0].grad.device
if norm_type == inf:
norms = [p.grad.detach().abs().max().to(device) for p in parameters]
total_norm = norms[0] if len(norms) == 1 else torch.max(torch.stack(norms))
else:
total_norm = torch.norm(torch.stack([torch.norm(p.grad.detach(), norm_type).to(device) for p in parameters]), norm_type)
if total_norm.isnan() or total_norm.isinf():
if error_if_nonfinite:
raise RuntimeError("")
else:
warnings.warn("", FutureWarning, stacklevel=2)
clip_coef = max_norm / (total_norm + 1e-6)
if clip_coef < 1:
for p in parameters:
p.grad.detach().mul_(clip_coef.to(p.grad.device))
return total_norm
# The flag below controls whether to allow TF32 on matmul. This flag defaults to True.
torch.backends.cuda.matmul.allow_tf32 = True
# The flag below controls whether to allow TF32 on cuDNN. This flag defaults to True.
torch.backends.cudnn.allow_tf32 = True
Asynchronous execution
CUDA streams
CUDA streams 是 Nvidia CUDA C 官方文档中所用的术语,一个 CUDA stream 表示的是一串在 GPU 上执行的命令。这一串命令将保证按次序执行,但用户可以创建多个 CUDA streams,不同的 CUDA stream 中的指令是并发执行的。在 Pytorch 中,每块 GPU 都有一个默认的 CUDA stream,其特别之处是会在必要的时候自动做同步。但用户也可以自己创建新的 CUDA stream,并将命令放在创建的 CUDA stream 中执行。此时,必要的同步操作需要用户自己做,例如下面的程序没有做好同步,因此计算出的 B 是错误的。
cuda = torch.device('cuda')
s = torch.cuda.Stream() # Create a new stream.
A = torch.ones((32, 64, 448, 448), device=cuda) # execute in default stream
weight = torch.rand((64, 64, 5, 5), device=cuda) # execute in default stream
A = torch.conv2d(A, weight, padding=2) # execute in default stream
A.zero_().normal_().zero_() # execute in default stream
with torch.cuda.stream(s):
# sum() may start execution before default stream finishes!
# torch.cuda.synchronize() # 加上这行可以避免错误
B = torch.sum(A)
torch.nn.DataParallel
使用
例子 1
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
class RandomDataset(Dataset):
def __init__(self, size, length):
self.len = length
self.data = torch.randn(length, size)
def __getitem__(self, index):
return self.data[index]
def __len__(self):
return self.len
rand_loader = DataLoader(dataset=RandomDataset(100, 5), batch_size=30, shuffle=True)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = Model(input_size, output_size)
if torch.cuda.device_count() > 1:
print("Let's use", torch.cuda.device_count(), "GPUs!")
# dim = 0 [30, xxx] -> [10, ...], [10, ...], [10, ...] on 3 GPUs
model = nn.DataParallel(model)
model.to(device) # 注意此处是设定主GPU
for data in rand_loader:
input = data.to(device) # 直接将数据放到主GPU上即可
output = model(input)
print("Outside: input size", input.size(), "output_size", output.size())
例如,对于 relu 函数,y=relu(x),y 对于 x 的局部导数可以直接通过 dy_dx=(y>0) 得到,而无需知道 x 的值。因此可以使用:
x = torch.nn.functional.relu(x, inplace=True)
另一个例子
import torch
class Square(torch.autograd.Function):
@staticmethod
def forward(ctx, x):
# Because we are saving one of the inputs use `save_for_backward`
# Save non-tensors and non-inputs/non-outputs directly on ctx
ctx.save_for_backward(x)
return x**2
@staticmethod
def backward(ctx, grad_out):
# A function support double backward automatically if autograd
# is able to record the computations performed in backward
x, = ctx.saved_tensors
return grad_out * 2 * x
# Use double precision because finite differencing method magnifies errors
x = torch.rand(3, 3, requires_grad=True, dtype=torch.double)
torch.autograd.gradcheck(Square.apply, x)
# Use gradcheck to verify second-order derivatives
torch.autograd.gradgradcheck(Square.apply, x)
detach
Returned Tensor shares the same storage with the original one. In-place modifications on either of them will be seen, and may trigger errors in correctness checks. IMPORTANT NOTE: Previously, in-place size / stride / storage changes (such as resize_ / resize_as_ / set_ / transpose_) to the returned tensor also update the original tensor. Now, these in-place changes will not update the original tensor anymore, and will instead trigger an error. For sparse tensors: In-place indices / values changes (such as zero_ / copy_ / add_) to the returned tensor will not update the original tensor anymore, and will instead trigger an error.
non-blocking 参数
小技巧
共享参数
某些文本生成模型会共享embedding层与输出层的参数, huggingface transformers 将这一操作成为 tie weight。
import torch
emb_layer = torch.nn.Embedding(3, 4)
linear = torch.nn.Linear(4, 3, bias=False)
emb_layer.weight = linear.weight
x = torch.randint(0, 3, (2, 15))
y = 1.0
z = linear(embed(x)).sum() - y
loss = y - z
loss.backward()
torch.allclose(embed.weight.grad, linear.weight.grad)
常用函数
cuda 检查相关
major, minor = torch.cuda.get_device_capability("cuda:0") # 显卡的架构(capability/compute)
torch.version.cuda # cuda 的版本号
torch.cuda.reset_max_memory_allocated() # 函数用于将程序从开始运行到目前为止GPU占用的最大内存设置为0, 配合下一个使用
torch.cuda.max_memory_allocated(device=None) # 输出程序从开始运行到目前为止GPU占用的最大内存
__call__ : Callable[..., Any] = _call_impl
def _call_impl(self, *input, **kwargs):
# 先调用全局的hook,后调用特有的hook
for hook in itertools.chain(
_global_forward_pre_hooks.values(), # _global_forward_pre_hooks是一个OrderedDict
self._forward_pre_hooks.values()):
result = hook(self, input)
if result is not None:
if not isinstance(result, tuple):
result = (result,)
input = result
if torch._C._get_tracing_state():
result = self._slow_forward(*input, **kwargs)
else:
result = self.forward(*input, **kwargs)
for hook in itertools.chain(
_global_forward_hooks.values(),
self._forward_hooks.values()):
hook_result = hook(self, input, result)
if hook_result is not None:
result = hook_result
if (len(self._backward_hooks) > 0) or (len(_global_backward_hooks) > 0):
var = result
while not isinstance(var, torch.Tensor):
if isinstance(var, dict):
var = next((v for v in var.values() if isinstance(v, torch.Tensor)))
else:
var = var[0]
grad_fn = var.grad_fn
if grad_fn is not None:
for hook in itertools.chain(
_global_backward_hooks.values(),
self._backward_hooks.values()):
wrapper = functools.partial(hook, self)
functools.update_wrapper(wrapper, hook)
grad_fn.register_hook(wrapper)
return result
Yes, you just need to install the NVIDIA drivers and the binaries will come with the other libs. If you want to build from source, you would need to install CUDA, cuDNN etc.