From 1ed7c24c023a58dca575c526057a843d65892172 Mon Sep 17 00:00:00 2001 From: HELSON <72907851+1SAA@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:24:57 +0800 Subject: [PATCH] Added PCIE profiler to dectect data transmission (#373) --- colossalai/utils/profiler/__init__.py | 1 + colossalai/utils/profiler/comm_profiler.py | 40 +------ colossalai/utils/profiler/pcie_profiler.py | 129 +++++++++++++++++++++ colossalai/utils/profiler/prof_utils.py | 38 ++++++ 4 files changed, 169 insertions(+), 39 deletions(-) create mode 100644 colossalai/utils/profiler/pcie_profiler.py diff --git a/colossalai/utils/profiler/__init__.py b/colossalai/utils/profiler/__init__.py index 6ec3a29c3..0223e732f 100644 --- a/colossalai/utils/profiler/__init__.py +++ b/colossalai/utils/profiler/__init__.py @@ -1,2 +1,3 @@ from .comm_profiler import CommProfiler +from .pcie_profiler import PcieProfiler from .prof_utils import ProfilerContext diff --git a/colossalai/utils/profiler/comm_profiler.py b/colossalai/utils/profiler/comm_profiler.py index b2356e54e..80f496c47 100644 --- a/colossalai/utils/profiler/comm_profiler.py +++ b/colossalai/utils/profiler/comm_profiler.py @@ -6,7 +6,7 @@ from torch.autograd.profiler import profile import torch.distributed as dist from torch.distributed import ReduceOp from colossalai.utils import get_current_device -from .prof_utils import BaseProfiler +from .prof_utils import BaseProfiler, _format_time, _format_memory, _format_bandwith from typing import List, Optional @@ -22,44 +22,6 @@ def _get_code_location(depth: int): return ret -# copied from high version pytorch to support low version -def _format_time(time_us): - """Defines how to format time in FunctionEvent""" - US_IN_SECOND = 1000.0 * 1000.0 - US_IN_MS = 1000.0 - if time_us >= US_IN_SECOND: - return '{:.3f}s'.format(time_us / US_IN_SECOND) - if time_us >= US_IN_MS: - return '{:.3f}ms'.format(time_us / US_IN_MS) - return '{:.3f}us'.format(time_us) - - -# copied from high version pytorch to support low version -def _format_memory(nbytes): - """Returns a formatted memory size string""" - KB = 1024 - MB = 1024 * KB - GB = 1024 * MB - if (abs(nbytes) >= GB): - return '{:.2f} GB'.format(nbytes * 1.0 / GB) - elif (abs(nbytes) >= MB): - return '{:.2f} MB'.format(nbytes * 1.0 / MB) - elif (abs(nbytes) >= KB): - return '{:.2f} KB'.format(nbytes * 1.0 / KB) - else: - return str(nbytes) + ' b' - - -def _format_bandwith(volme: float, time_us: int): - sec_div_mb = (1000.0 / 1024.0)**2 - mb_per_sec = volme / time_us * sec_div_mb - - if mb_per_sec >= 1024.0: - return '{:.3f} GB/s'.format(mb_per_sec / 1024.0) - else: - return '{:.3f} MB/s'.format(mb_per_sec) - - torch_all_reduce = dist.all_reduce torch_all_gather = dist.all_gather torch_reduce_scatter = dist.reduce_scatter diff --git a/colossalai/utils/profiler/pcie_profiler.py b/colossalai/utils/profiler/pcie_profiler.py new file mode 100644 index 000000000..0724dc10e --- /dev/null +++ b/colossalai/utils/profiler/pcie_profiler.py @@ -0,0 +1,129 @@ +from pathlib import Path +from torch.autograd.profiler import profile +from .prof_utils import BaseProfiler, _format_time, _format_memory, _format_bandwith +from typing import List + + +def _get_size(dtype: str): + if dtype == "fp16": + return 2 + elif dtype == "fp32": + return 4 + else: + raise NotImplementedError + + +def _get_numel(my_list: List[int]) -> int: + from functools import reduce + from operator import mul + return reduce(mul, my_list) + + +def _reduce_location(locations: List[str]) -> str: + ret = [] + for lo in locations: + ret.append(lo) + ret.append("\n") + return ''.join(ret) + + +class PcieEvent(object): + """Pcie Event. + """ + + def __init__(self, count: int = 0, pcie_vol: int = 0, cuda_time: int = 0): + self.count = count + self.pcie_vol = pcie_vol + self.cuda_time = cuda_time + + def add(self, rhs): + self.count += rhs.count + self.pcie_vol += rhs.pcie_vol + self.cuda_time += rhs.cuda_time + + +class PcieProfiler(BaseProfiler): + """Pcie profiler. Records all data transmission between CPU and GPU. + + TODO: Merge pcie profiler into communication profiler + """ + + def __init__(self, + dtype: str = "fp32", + depth: int = 1, + total_count: int = 0, + total_pcie_vol: int = 0, + total_cuda_time: int = 0): + super().__init__(profiler_name="Pcie", priority=10) + self.depth = depth + self.data_size = _get_size(dtype) + self.total_count = total_count + self.total_pcie_vol = total_pcie_vol + self.total_cuda_time = total_cuda_time + + self.ops_record = dict() + self.profiler = None + + def enable(self): + self.profiler = profile(enabled=True, + use_cuda=True, + use_cpu=True, + use_kineto=True, + record_shapes=True, + with_stack=True) + self.profiler.__enter__() + + def disable(self): + self.profiler.__exit__(None, None, None) + + if self.profiler.enabled: + events = self.profiler.function_events + for event in events: + if event.name == "aten::_to_copy": + current_comm_event = PcieEvent(1, self.data_size * _get_numel(event.input_shapes[0]), + event.cuda_time_total) + self.total_count += current_comm_event.count + self.total_pcie_vol += current_comm_event.pcie_vol + self.total_cuda_time += current_comm_event.cuda_time + code_location = _reduce_location(event.stack[:self.depth]) + if code_location in self.ops_record: + self.ops_record[code_location].add(current_comm_event) + else: + self.ops_record[code_location] = current_comm_event + + self.profiler = None + + def to_tensorboard(self, writer): + writer.add_text(tag="Data Transmission", text_string=self.result_list("\n\n")) + + def to_file(self, filename: Path): + with open(filename, "w") as f: + f.write(self.result_list()) + + def show(self): + print(self.result_list()) + + def result_list(self, sep: str = "\n"): + res = [] + + def append(s: str): + res.append(s) + res.append(sep) + + append("Pcie profiling result:") + append("total cuda time: {}".format(_format_time(self.total_cuda_time))) + append("average bandwith: {}".format(_format_bandwith(self.total_pcie_vol, self.total_cuda_time))) + append("total number of calls: {}".format(self.total_count)) + append("All events:\n----------------------------------------") + + show_list = sorted(self.ops_record.items(), key=lambda kv: -kv[1].cuda_time) + for location, event in show_list: + append(location) + append("cuda time: {}".format(_format_time(event.cuda_time))) + append("{:.1f}% of total pcie time".format(event.cuda_time / self.total_cuda_time * 100.0)) + append("pcie volme: {}".format(_format_memory(event.pcie_vol))) + append("average bandwith: {}".format(_format_bandwith(event.pcie_vol, event.cuda_time))) + append("number of calls: {}".format(event.count)) + append("----------------------------------------") + + return ''.join(res) diff --git a/colossalai/utils/profiler/prof_utils.py b/colossalai/utils/profiler/prof_utils.py index 01a08d483..d71906868 100644 --- a/colossalai/utils/profiler/prof_utils.py +++ b/colossalai/utils/profiler/prof_utils.py @@ -4,6 +4,44 @@ from typing import Union, List from colossalai.core import global_context as gpc +# copied from high version pytorch to support low version +def _format_time(time_us): + """Defines how to format time in FunctionEvent""" + US_IN_SECOND = 1000.0 * 1000.0 + US_IN_MS = 1000.0 + if time_us >= US_IN_SECOND: + return '{:.3f}s'.format(time_us / US_IN_SECOND) + if time_us >= US_IN_MS: + return '{:.3f}ms'.format(time_us / US_IN_MS) + return '{:.3f}us'.format(time_us) + + +# copied from high version pytorch to support low version +def _format_memory(nbytes): + """Returns a formatted memory size string""" + KB = 1024 + MB = 1024 * KB + GB = 1024 * MB + if (abs(nbytes) >= GB): + return '{:.2f} GB'.format(nbytes * 1.0 / GB) + elif (abs(nbytes) >= MB): + return '{:.2f} MB'.format(nbytes * 1.0 / MB) + elif (abs(nbytes) >= KB): + return '{:.2f} KB'.format(nbytes * 1.0 / KB) + else: + return str(nbytes) + ' B' + + +def _format_bandwith(volme: float or int, time_us: int): + sec_div_mb = (1000.0 / 1024.0)**2 + mb_per_sec = volme / time_us * sec_div_mb + + if mb_per_sec >= 1024.0: + return '{:.3f} GB/s'.format(mb_per_sec / 1024.0) + else: + return '{:.3f} MB/s'.format(mb_per_sec) + + class BaseProfiler(ABC): def __init__(self, profiler_name: str, priority: int):