class Register:
def __init__(self, name):
self.name = name
self.module_dict = dict()
def register_module(self, module_class=None, name=None):
if module_class is not None:
key = module_class.__name__ if name is None else name
self.module_dict[key] = module_class
return module_class
def wrapper(cls):
key = cls.__name__ if name is None else name
self.module_dict[key] = cls
return cls
return wrapper
def build(self, cfg):
args = cfg.copy()
name = args.pop("type")
assert name in self.module_dict
return self.module_dict[name](**args)
REG = Register("reg")
@REG.register_module(name="a")
class A:
def __init__(self, a, b):
self.a = a
self.b = b
class B:
def __init__(self, c):
self.c = c
B = REG.register_module(B)
print(REG.module_dict)
print(REG.build({"type": "a", "a": 1, "b": 2}))
print(REG.build({"type": "B", "c": 3}))
import inspect
import warnings
from functools import partial
from .misc import is_seq_of
def build_from_cfg(cfg, registry, default_args=None):
if not isinstance(cfg, dict):
raise TypeError(f'cfg must be a dict, but got {type(cfg)}')
if 'type' not in cfg:
if default_args is None or 'type' not in default_args:
raise KeyError('...')
if not isinstance(registry, Registry):
raise TypeError('...')
if not (isinstance(default_args, dict) or default_args is None):
raise TypeError('...')
args = cfg.copy()
if default_args is not None:
for name, value in default_args.items():
args.setdefault(name, value)
obj_type = args.pop('type')
if isinstance(obj_type, str):
obj_cls = registry.get(obj_type)
if obj_cls is None:
raise KeyError('')
elif inspect.isclass(obj_type):
obj_cls = obj_type
else:
raise TypeError('')
try:
return obj_cls(**args)
except Exception as e:
raise type(e)(f'{obj_cls.__name__}: {e}')
class Registry:
"""
scope (str, optional): The scope of registry. It is the key to search
for children registry. If not specified, scope will be the name of
the package where class is defined, e.g. mmdet, mmcls, mmseg.
Default: None.
"""
def __init__(self, name, build_func=None, parent=None, scope=None):
self._name = name
self._module_dict = dict()
self._children = dict()
self._scope = self.infer_scope() if scope is None else scope
if build_func is None:
if parent is not None:
self.build_func = parent.build_func
else:
self.build_func = build_from_cfg
else:
self.build_func = build_func
if parent is not None:
assert isinstance(parent, Registry)
parent._add_children(self)
self.parent = parent
else:
self.parent = None
def __len__(self):
return len(self._module_dict)
def __contains__(self, key):
return self.get(key) is not None
def __repr__(self):
pass # omit
@staticmethod
def infer_scope():
filename = inspect.getmodule(inspect.stack()[2][0]).__name__
split_filename = filename.split('.')
return split_filename[0]
@staticmethod
def split_scope_key(key):
"""Examples:
>>> Registry.split_scope_key('mmdet.ResNet')
'mmdet', 'ResNet'
>>> Registry.split_scope_key('ResNet')
None, 'ResNet'
"""
split_index = key.find('.')
if split_index != -1:
return key[:split_index], key[split_index + 1:]
else:
return None, key
@property
def name(self):
return self._name
@property
def scope(self):
return self._scope
@property
def module_dict(self):
return self._module_dict
@property
def children(self):
return self._children
def get(self, key):
"""Get the registry record.
Args:
key (str): The class name in string format.
Returns:
class: The corresponding class.
"""
scope, real_key = self.split_scope_key(key)
if scope is None or scope == self._scope:
# get from self
if real_key in self._module_dict:
return self._module_dict[real_key]
else:
# get from self._children
if scope in self._children:
return self._children[scope].get(real_key)
else:
# goto root
parent = self.parent
while parent.parent is not None:
parent = parent.parent
return parent.get(key)
def build(self, *args, **kwargs):
return self.build_func(*args, **kwargs, registry=self)
def _add_children(self, registry):
"""Add children for a registry.
The ``registry`` will be added as children based on its scope.
The parent registry could build objects from children registry.
Example:
>>> models = Registry('models')
>>> mmdet_models = Registry('models', parent=models)
>>> @mmdet_models.register_module()
>>> class ResNet:
>>> pass
>>> resnet = models.build(dict(type='mmdet.ResNet'))
"""
assert isinstance(registry, Registry)
assert registry.scope is not None
assert registry.scope not in self.children, \
f'scope {registry.scope} exists in {self.name} registry'
self.children[registry.scope] = registry
def _register_module(self, module_class, module_name=None, force=False):
if not inspect.isclass(module_class):
raise TypeError('module must be a class, '
f'but got {type(module_class)}')
if module_name is None:
module_name = module_class.__name__
if isinstance(module_name, str):
module_name = [module_name]
for name in module_name:
if not force and name in self._module_dict:
raise KeyError(f'{name} is already registered '
f'in {self.name}')
self._module_dict[name] = module_class
def deprecated_register_module(self, cls=None, force=False):
warnings.warn(
'The old API of register_module(module, force=False) '
'is deprecated and will be removed, please use the new API '
'register_module(name=None, force=False, module=None) instead.')
if cls is None:
return partial(self.deprecated_register_module, force=force)
self._register_module(cls, force=force)
return cls
def register_module(self, name=None, force=False, module=None):
"""Register a module.
A record will be added to `self._module_dict`, whose key is the class
name or the specified name, and value is the class itself.
It can be used as a decorator or a normal function.
Example:
>>> backbones = Registry('backbone')
>>> @backbones.register_module()
>>> class ResNet:
>>> pass
>>> backbones = Registry('backbone')
>>> @backbones.register_module(name='mnet')
>>> class MobileNet:
>>> pass
>>> backbones = Registry('backbone')
>>> class ResNet:
>>> pass
>>> backbones.register_module(ResNet)
Args:
name (str | None): The module name to be registered. If not
specified, the class name will be used.
force (bool, optional): Whether to override an existing class with
the same name. Default: False.
module (type): Module class to be registered.
"""
if not isinstance(force, bool):
raise TypeError(f'force must be a boolean, but got {type(force)}')
# NOTE: This is a walkaround to be compatible with the old api,
# while it may introduce unexpected bugs.
if isinstance(name, type):
return self.deprecated_register_module(name, force=force)
# raise the error ahead of time
if not (name is None or isinstance(name, str) or is_seq_of(name, str)):
raise TypeError(
'name must be either of None, an instance of str or a sequence'
f' of str, but got {type(name)}')
# use it as a normal method: x.register_module(module=SomeClass)
if module is not None:
self._register_module(
module_class=module, module_name=name, force=force)
return module
# use it as a decorator: @x.register_module()
def _register(cls):
self._register_module(
module_class=cls, module_name=name, force=force)
return cls
return _register
def build_model_from_cfg(cfg, registry, default_args=None):
if isinstance(cfg, list):
modules = [
build_from_cfg(cfg_, registry, default_args) for cfg_ in cfg
]
return Sequential(*modules)
else:
return build_from_cfg(cfg, registry, default_args)
model = build_detector(cfg.model, train_cfg=cfg.get('train_cfg'), test_cfg=cfg.get('test_cfg'))
def build_detector(cfg, train_cfg=None, test_cfg=None):
"""Build detector."""
if train_cfg is not None or test_cfg is not None:
warnings.warn(
'train_cfg and test_cfg is deprecated, '
'please specify them in model', UserWarning)
assert cfg.get('train_cfg') is None or train_cfg is None, \
'train_cfg specified in both outer field and model field '
assert cfg.get('test_cfg') is None or test_cfg is None, \
'test_cfg specified in both outer field and model field '
return DETECTORS.build(
cfg, default_args=dict(train_cfg=train_cfg, test_cfg=test_cfg))
class Runner:
def train(self, data_loader, **kwargs):
self.model.train()
self.mode = 'train'
self.data_loader = data_loader
self._max_iters = self._max_epochs * len(self.data_loader)
self.call_hook('before_train_epoch')
time.sleep(2) # Prevent possible deadlock during epoch transition
for i, data_batch in enumerate(self.data_loader):
self._inner_iter = i
self.call_hook('before_train_iter')
self.run_iter(data_batch, train_mode=True, **kwargs)
self.call_hook('after_train_iter')
self._iter += 1
self.call_hook('after_train_epoch')
self._epoch += 1
def run_iter(self, data_batch, train_mode, **kwargs):
if self.batch_processor is not None:
outputs = self.batch_processor(
self.model, data_batch, train_mode=train_mode, **kwargs)
elif train_mode:
# train_step在BaseDetector中定义, 且未被TwoStageDetector覆盖
outputs = self.model.train_step(data_batch, self.optimizer,
**kwargs)
else:
outputs = self.model.val_step(data_batch, self.optimizer, **kwargs)
if not isinstance(outputs, dict):
raise TypeError('"batch_processor()" or "model.train_step()"'
'and "model.val_step()" must return a dict')
if 'log_vars' in outputs:
self.log_buffer.update(outputs['log_vars'], outputs['num_samples'])
self.outputs = outputs
class BaseDetector(BaseModule, metaclass=ABCMeta):
# ...
def train_step(self, data, optimizer):
losses = self(**data) # 调用 forward
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img_metas']))
return outputs
@auto_fp16(apply_to=('img', )) # 重点
def forward(self, img, img_metas, return_loss=True, **kwargs):
pass
def wrap_fp16_model(model):
"""Wrap the FP32 model to FP16.
If you are using PyTorch >= 1.6, torch.cuda.amp is used as the
backend, otherwise, original mmcv implementation will be adopted.
For PyTorch >= 1.6, this function will
1. Set fp16 flag inside the model to True.
Otherwise:
1. Convert FP32 model to FP16.
2. Remain some necessary layers to be FP32, e.g., normalization layers.
3. Set `fp16_enabled` flag inside the model to True.
Args:
model (nn.Module): Model in FP32.
"""
if (TORCH_VERSION == 'parrots'
or digit_version(TORCH_VERSION) < digit_version('1.6.0')):
# convert model to fp16
model.half()
# patch the normalization layers to make it work in fp32 mode
patch_norm_fp32(model)
# set `fp16_enabled` flag
for m in model.modules():
if hasattr(m, 'fp16_enabled'):
m.fp16_enabled = True
# 这个flag用在
# @auto_fp16(apply_to=('img', ))
# def forward(self, img, img_metas, return_loss=True, **kwargs):
# 这个auto_fp16装饰器如果检测到模型fp16_enabled=True, 则自动对输入进行转换
# auto_fp16的关键代码如下: 此处args为fp32类型, new_args为fp16类型
# if not (hasattr(args[0], 'fp16_enabled') and args[0].fp16_enabled):
# return old_func(*args, **kwargs)
# if (TORCH_VERSION != 'parrots' and
# digit_version(TORCH_VERSION) >= digit_version('1.6.0')):
# with autocast(enabled=True):
# output = old_func(*new_args, **new_kwargs)#在autocast下,无所谓输入为fp16与fp32
# else:
# output = old_func(*new_args, **new_kwargs)#
# pytorch >= 1.6.0
@HOOKS.register_module()
class Fp16OptimizerHook(OptimizerHook):
def before_run(self, runner):
"""Preparing steps before Mixed Precision Training."""
# wrap model mode to fp16
wrap_fp16_model(runner.model)
# resume from state dict
if 'fp16' in runner.meta and 'loss_scaler' in runner.meta['fp16']:
scaler_state_dict = runner.meta['fp16']['loss_scaler']
self.loss_scaler.load_state_dict(scaler_state_dict)
def after_train_iter(self, runner):
"""Backward optimization steps for Mixed Precision Training. For
dynamic loss scaling, please refer to
https://pytorch.org/docs/stable/amp.html#torch.cuda.amp.GradScaler.
1. Scale the loss by a scale factor.
2. Backward the loss to obtain the gradients.
3. Unscale the optimizer’s gradient tensors.
4. Call optimizer.step() and update scale factor.
5. Save loss_scaler state_dict for resume purpose.
"""
# clear grads of last iteration
runner.model.zero_grad()
runner.optimizer.zero_grad()
self.loss_scaler.scale(runner.outputs['loss']).backward()
self.loss_scaler.unscale_(runner.optimizer)
# grad clip
if self.grad_clip is not None:
grad_norm = self.clip_grads(runner.model.parameters())
if grad_norm is not None:
# Add grad norm to the logger
runner.log_buffer.update({'grad_norm': float(grad_norm)},
runner.outputs['num_samples'])
# backward and update scaler
self.loss_scaler.step(runner.optimizer)
self.loss_scaler.update(self._scale_update_param)
# save state_dict of loss_scaler
runner.meta.setdefault(
'fp16', {})['loss_scaler'] = self.loss_scaler.state_dict()
@RUNNERS.register_module()
class EpochBasedRunner(BaseRunner):
"""Epoch-based Runner.
This runner train models epoch by epoch.
"""
def run_iter(self, data_batch, train_mode, **kwargs):
if self.batch_processor is not None:
outputs = self.batch_processor(
self.model, data_batch, train_mode=train_mode, **kwargs)
elif train_mode:
# train_step在BaseDetector中定义, 且未被TwoStageDetector覆盖
outputs = self.model.train_step(data_batch, self.optimizer,
**kwargs)
else:
outputs = self.model.val_step(data_batch, self.optimizer, **kwargs)
if not isinstance(outputs, dict):
raise TypeError('"batch_processor()" or "model.train_step()"'
'and "model.val_step()" must return a dict')
if 'log_vars' in outputs:
self.log_buffer.update(outputs['log_vars'], outputs['num_samples'])
self.outputs = outputs
def train(self, data_loader, **kwargs):
self.model.train()
self.mode = 'train'
self.data_loader = data_loader
self._max_iters = self._max_epochs * len(self.data_loader)
self.call_hook('before_train_epoch')
time.sleep(2) # Prevent possible deadlock during epoch transition
for i, data_batch in enumerate(self.data_loader):
self._inner_iter = i
self.call_hook('before_train_iter')
self.run_iter(data_batch, train_mode=True, **kwargs)
self.call_hook('after_train_iter')
self._iter += 1
self.call_hook('after_train_epoch')
self._epoch += 1
@torch.no_grad()
def val(self, data_loader, **kwargs):
pass # omit
def run(self, ...):
self.call_hook('before_run')
for i, flow in enumerate(workflow):
# call `self.train` or `self.eval`
pass
self.call_hook('after_run')
class BaseDetector(BaseModule, metaclass=ABCMeta):
# ...
def train_step(self, data, optimizer):
losses = self(**data) # 调用 forward
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img_metas']))
return outputs
def val_step(self, data, optimizer=None):
losses = self(**data) # 调用 forward
loss, log_vars = self._parse_losses(losses)
outputs = dict(
loss=loss, log_vars=log_vars, num_samples=len(data['img_metas']))
return outputs
@auto_fp16(apply_to=('img', ))
def forward(self, img, img_metas, return_loss=True, **kwargs):
"""Calls either :func:`forward_train` or :func:`forward_test` depending
on whether ``return_loss`` is ``True``.
Note this setting will change the expected inputs. When
``return_loss=True``, img and img_meta are single-nested (i.e. Tensor
and List[dict]), and when ``resturn_loss=False``, img and img_meta
should be double nested (i.e. List[Tensor], List[List[dict]]), with
the outer list indicating test time augmentations.
"""
if torch.onnx.is_in_onnx_export():
assert len(img_metas) == 1
return self.onnx_export(img[0], img_metas[0])
if return_loss:
return self.forward_train(img, img_metas, **kwargs)
else:
return self.forward_test(img, img_metas, **kwargs)
class BaseModule(nn.Module, metaclass=ABCMeta):
"""Base module for all modules in openmmlab."""
def __init__(self, init_cfg=None):
"""Initialize BaseModule, inherited from `torch.nn.Module`
Args:
init_cfg (dict, optional): Initialization config dict.
"""
# NOTE init_cfg can be defined in different levels, but init_cfg
# in low levels has a higher priority.
super(BaseModule, self).__init__()
# define default value of init_cfg instead of hard code
# in init_weight() function
self._is_init = False
self.init_cfg = init_cfg
# Backward compatibility in derived classes
# if pretrained is not None:
# warnings.warn('DeprecationWarning: pretrained is a deprecated \
# key, please consider using init_cfg')
# self.init_cfg = dict(type='Pretrained', checkpoint=pretrained)
@property
def is_init(self):
return self._is_init
def init_weights(self):
"""Initialize the weights."""
from ..cnn import initialize
if not self._is_init:
if self.init_cfg:
initialize(self, self.init_cfg)
if isinstance(self.init_cfg, (dict, ConfigDict)):
# Avoid the parameters of the pre-training model
# being overwritten by the init_weights
# of the children.
if self.init_cfg['type'] == 'Pretrained':
return
for m in self.children():
if hasattr(m, 'init_weights'):
m.init_weights()
self._is_init = True
else:
warnings.warn(f'init_weights of {self.__class__.__name__} has '
f'been called more than once.')
def __repr__(self):
s = super().__repr__()
if self.init_cfg:
s += f'\ninit_cfg={self.init_cfg}'
return s