|
|
|
import torch
|
|
|
|
from contextlib import contextmanager
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
from typing import List, Tuple, Any
|
|
|
|
from colossalai.tensor.colo_tensor import ColoTensor
|
|
|
|
from colossalai.tensor import ColoTensorSpec
|
|
|
|
|
|
|
|
|
|
|
|
class ParamOpHook(ABC):
|
|
|
|
"""Hook which is triggered by each operation when operands contain ColoParameter.
|
|
|
|
To customize it, you must inherit this abstract class, and implement ``pre_forward``,
|
|
|
|
``post_forward``, ``pre_backward`` and ``post_backward``. These four methods take a list
|
|
|
|
of ColoParameter.
|
|
|
|
"""
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def pre_forward(self, params: List[torch.Tensor]) -> None:
|
|
|
|
pass
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def post_forward(self, params: List[torch.Tensor]) -> None:
|
|
|
|
pass
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def pre_backward(self, params: List[torch.Tensor]) -> None:
|
|
|
|
pass
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def post_backward(self, params: List[torch.Tensor]) -> None:
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class ParamOpHookManager:
|
|
|
|
"""Manage your param op hooks. It only has static methods.
|
|
|
|
The only static method you should call is ``use_hooks(*hooks)``.
|
|
|
|
"""
|
|
|
|
hooks: Tuple[ParamOpHook, ...] = tuple()
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
@contextmanager
|
|
|
|
def use_hooks(*hooks: ParamOpHook):
|
|
|
|
"""Change the param op hooks you use. Nested calling is allowed.
|
|
|
|
|
|
|
|
Example:
|
|
|
|
>>> with ParamOpHookManager.use_hooks(*hooks):
|
|
|
|
>>> do_something()
|
|
|
|
>>> with ParamOpHookManager.use_hooks():
|
|
|
|
>>> // clear hooks
|
|
|
|
>>> do_something()
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
old_param_op_hooks = ParamOpHookManager.hooks
|
|
|
|
ParamOpHookManager.hooks = hooks
|
|
|
|
yield
|
|
|
|
finally:
|
|
|
|
ParamOpHookManager.hooks = old_param_op_hooks
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _trigger_pre_forward(params: List[torch.Tensor]) -> None:
|
|
|
|
for hook in ParamOpHookManager.hooks:
|
|
|
|
hook.pre_forward(params)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _trigger_post_forward(params: List[torch.Tensor]) -> None:
|
|
|
|
for hook in ParamOpHookManager.hooks:
|
|
|
|
hook.post_forward(params)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _trigger_pre_backward(params: List[torch.Tensor]) -> None:
|
|
|
|
for hook in ParamOpHookManager.hooks:
|
|
|
|
hook.pre_backward(params)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _trigger_post_backward(params: List[torch.Tensor]) -> None:
|
|
|
|
for hook in ParamOpHookManager.hooks:
|
|
|
|
hook.post_backward(params)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def pre_op(params: List[torch.Tensor], *args: Any) -> list:
|
|
|
|
ParamOpHookManager._trigger_pre_forward(params)
|
|
|
|
args_info = _get_colo_tensors_info(*args)
|
|
|
|
rets = PreFwdPostBwd.apply(params, *args)
|
|
|
|
return _update_colo_tensors(args_info, *rets)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def post_op(params: List[torch.Tensor], arg: Any) -> Any:
|
|
|
|
ParamOpHookManager._trigger_post_forward(params)
|
|
|
|
arg_info = _get_colo_tensors_info(arg)
|
|
|
|
ret = PostFwdPreBwd.apply(params, arg)
|
|
|
|
return _unpack_args(_update_colo_tensors(arg_info, ret))
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def has_hook() -> bool:
|
|
|
|
return len(ParamOpHookManager.hooks) > 0
|
|
|
|
|
|
|
|
|
|
|
|
class PreFwdPostBwd(torch.autograd.Function):
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def forward(ctx, params, *args):
|
|
|
|
ctx.params = params
|
|
|
|
return _unpack_args(args)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def backward(ctx, *grads):
|
|
|
|
ParamOpHookManager._trigger_post_backward(ctx.params)
|
|
|
|
return (None,) + grads
|
|
|
|
|
|
|
|
|
|
|
|
class PostFwdPreBwd(torch.autograd.Function):
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def forward(ctx, params, args):
|
|
|
|
ctx.params = params
|
|
|
|
return args
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def backward(ctx, *grads):
|
|
|
|
ParamOpHookManager._trigger_pre_backward(ctx.params)
|
|
|
|
return (None,) + grads
|
|
|
|
|
|
|
|
|
|
|
|
def _unpack_args(args):
|
|
|
|
if len(args) == 1:
|
|
|
|
return args[0]
|
|
|
|
return args
|
|
|
|
|
|
|
|
|
|
|
|
def _get_colo_tensors_info(*args) -> list:
|
|
|
|
info = []
|
|
|
|
for arg in args:
|
|
|
|
if isinstance(arg, ColoTensor):
|
|
|
|
info.append((arg.__class__, ColoTensorSpec(arg.get_process_group(), arg.dist_spec, arg.compute_spec)))
|
|
|
|
else:
|
|
|
|
info.append(None)
|
|
|
|
return info
|
|
|
|
|
|
|
|
|
|
|
|
def _update_colo_tensors(info, *args) -> list:
|
|
|
|
ret = []
|
|
|
|
for t_info, arg in zip(info, args):
|
|
|
|
if t_info is not None:
|
|
|
|
t_cls, spec = t_info
|
|
|
|
arg = t_cls.from_torch_tensor(arg, spec=spec)
|
|
|
|
ret.append(arg)
|
|
|
|
return ret
|