from typing import Dict, Iterator, List, Tuple, Union

import torch
import torch.nn as nn

from colossalai.tensor.colo_tensor import ColoTensor


def all_gather_simulator(target_pair):
    """
    Simulating all-gather operation, analyze the communication cost
    and simulate the influence of the DimSpec.

    We don't allow uncontiguous layout, such as all-gather(S012)->S02 is NOT allowed.
    Therefore, all gather operation just remove the last element in shard list,
    e.g.:
        all-gather(S01) -> S0

    Argument:
        target_pair(Tuple[int, List[int]]): The first element is the dimension of tensor to be sharded,
        and the second element describes which logical axis will be sharded in that dimension.
    """
    _, shard_list = target_pair
    new_shard_list = shard_list[:-1]

    return new_shard_list


def all_to_all_simulator(f_target_pair, b_target_pair):
    """
    Simulating all-to-all operation, analyze the communication cost
    and simulate the influence of the DimSpec.

    We BANNED all representations which shard_list in decreasing order,
    such as S10, so all-to-all(S0, S1) -> RS01 is NOT allowed.
    Therefore, if the behind shard_list is not None, we just extend it to the front shard_list.
    Argument:
        target_pair(Tuple[int, List[int]]): The first element is the dimension of tensor to be sharded,
        and the second element describes which logical axis will be sharded in that dimension.
    e.g.:
        all-to-all(S0, S1) -> [S01, R]
        all-to-all(S0, R) -> [R, S0]
    Otherwise, we extend the front shard_list to behind.
    e.g.:
        all-to-all(R, S1) -> [S1, R]

    Argument:
        target_pair(Tuple[int, List[int]]): The first element is the dimension of tensor to be sharded,
        and the second element describes which logical axis will be sharded in that dimension.
    """
    _, f_shard_list = f_target_pair
    _, b_shard_list = b_target_pair
    if not len(b_shard_list):
        b_shard_list.extend(f_shard_list)
        f_shard_list = []
    else:
        f_shard_list.extend(b_shard_list)
        b_shard_list = []

    return f_shard_list, b_shard_list


def shard_simulator(target_pair, legal_sharding_dims):
    """
    Simulating shard operation, analyze the communication cost(always ZERO)
    and simulate the influence of the DimSpec.

    We don't allow uncontiguous layout, such as shard(S0)->S02 is NOT allowed.
    In addition, We BANNED all representations which shard_list in decreasing order,
    such as S10, so shard(S0) -> S10 is NOT allowed.
    Therefore, for the R dimension, we could just append any legal sharding dim on it.
    e.g.:
        shard(R) -> S0
    For the S dimension, we need to make sure the shard_list after sharding still keep rising order.
    e.g:
        shard(S0) -> S01

    Argument:
        target_pair(Tuple[int, List[int]]): The first element is the dimension of tensor to be sharded,
        and the second element describes which logical axis will be sharded in that dimension.
    """
    _, shard_list = target_pair
    shard_list_list = []
    for dim in legal_sharding_dims:
        if len(shard_list) != 0 and dim <= shard_list[-1]:
            continue
        new_shard_list = shard_list + [dim]
        shard_list_list.append(new_shard_list)

    return shard_list_list


def mix_gather_simulator(f_target_pair, b_target_pair):
    """
    Assume index of f and b target pairs are 'f' and 'b'
    S0S1 => Input: (f, [0]), (b, [1]) Output: [b, f], (1, 0)
    S1S0 => Input: (f, [1]), (b, [0]) Output: [b, f], (0, 1)
    S01R => Input: (f, [0, 1]), (b, []) Output: [f], (1, 1)
    RS01 => Input: (f, []), (b, [0, 1]) Output: [b], (1, 1)
    S10R => Input: (f, [0, 1]), (b, []) Output: [f], (0, 0)
    RS10 => Input: (f, []), (b, [0, 1]) Output: [b], (0, 0)
    """
    if f_target_pair[1] and b_target_pair[1]:
        leading_dim = b_target_pair[1] > f_target_pair[1]
        return [b_target_pair[0], f_target_pair[0]], [int(leading_dim), int(leading_dim ^ 1)]
    if f_target_pair[1]:
        leading_dim = f_target_pair[1][0] < f_target_pair[1][1]
        return [
            f_target_pair[0],
        ], [int(leading_dim), int(leading_dim)]
    if b_target_pair[1]:
        leading_dim = b_target_pair[1][0] < b_target_pair[1][1]
        return [
            b_target_pair[0],
        ], [int(leading_dim), int(leading_dim)]


# The function is credited to PyTorch Team
def named_params_with_colotensor(
    module: nn.Module,
    prefix: str = "",
    recurse: bool = True,
) -> Iterator[Tuple[str, Union[nn.Parameter, ColoTensor]]]:
    r"""Returns an iterator over module parameters (together with the
    ColoTensor parameters), yielding both the name of the parameter
    as well as the parameter itself. This is typically passed to a
    :class:torchshard._shard.sharded_optim.ShardedOptimizer

    Args:
        prefix (str): prefix to prepend to all parameter names.
        recurse (bool): if True, then yields parameters of this module
            and all submodules. Otherwise, yields only parameters that
            are direct members of this module.

    Yields:
        (string, Union[Tensor, ColoTensor]): Tuple containing
            the name and parameter (or ColoTensor parameter)

    Example:

        >>> model = torch.nn.Linear(*linear_size)
        >>> delattr(model.weight)
        >>> setattr(model.weight, ColoTensor(...))
        >>> for name, param in named_params_with_colotensor(model):
        >>>    if name in ['weight']:
        >>>        print(param.size())

    """
    modules = module.named_modules(prefix=prefix) if recurse else [(prefix, module)]

    memo = set()
    for mod_prefix, mod in modules:
        # find all sharded tensor params
        for name, val in vars(mod).items():
            if isinstance(val, ColoTensor) and val not in memo:
                memo.add(val)
                name = mod_prefix + ("." if mod_prefix else "") + name
                yield name, val

    # find all nn.Parameters
    for name, val in module.named_parameters():
        yield name, val


def _convert_tensor(tensor: torch.Tensor) -> ColoTensor:
    return ColoTensor(tensor)


def convert_parameter(module: torch.nn.Module, param_name: str):
    # Perform some validation first.
    if not hasattr(module, param_name):
        raise ValueError(f"module: {module} does not have parameter with name: {param_name}")

    tensor = getattr(module, param_name)
    if not isinstance(tensor, torch.Tensor):
        raise ValueError(
            f"Expected {type(module).__name__}.{param_name} to be a Tensor, but found {type(tensor).__name__}"
        )

    if not tensor.is_contiguous():
        raise ValueError(f"param: {param_name} is not a contiguous Tensor")

    st = _convert_tensor(tensor)

    # Replace param with ColoTensor.

    # Need to delete the attribute first since param_name might be
    # torch.nn.Parameter and can't be replaced with ColoTensor which is
    # not torch.nn.Parameter.
    delattr(module, param_name)

    # Now we can set the attribute appropriately.
    setattr(module, param_name, st)


def convert_dim_partition_dict(dim_size: int, dim_partition_dict: Dict[int, List[int]]) -> Dict[int, List[int]]:
    """
    This method is used to convert the negative dim value to positive.
    """
    dims_to_convert = []
    for dim, mesh_list in dim_partition_dict.items():
        if dim < 0:
            dims_to_convert.append(dim)
    for dim in dims_to_convert:
        dim_partition_dict.pop(dim)
        dim_partition_dict[dim_size + dim] = mesh_list
    return dim_partition_dict


def merge_same_dim_mesh_list(dim_size: int, dim_partition_dict: Dict[int, List[int]]) -> Dict[int, List[int]]:
    """
    This method is used to merge the different key value which points to same physical position.

    For example:
        dim_partition_dict: {1 :[0], -1: [1]} or {1: [0], 1: [1]} for a 2d tensor, the dim 1 and -1 point same physical position.
        In this method, above dim_partition_dict will be converted to {1: [0, 1]}
    """
    converted_dim_partition_dict = {}
    for dim, mesh_list in dim_partition_dict.items():
        if dim < 0:
            dim = dim_size + dim
        if dim not in converted_dim_partition_dict:
            converted_dim_partition_dict[dim] = mesh_list
        else:
            converted_dim_partition_dict[dim].extend(mesh_list)

    return converted_dim_partition_dict