Merge branch 'main' of github.com:hpcaitech/ColossalAI into prefetch

pull/5751/head
hxwang 2024-05-24 04:05:07 +00:00
commit ff507b755e
16 changed files with 192 additions and 82 deletions

View File

@ -362,6 +362,7 @@ class GeminiPlugin(DPPluginBase):
enable_sequence_parallelism: bool = False,
enable_jit_fused: bool = False,
enable_sequence_overlap: bool = False,
enable_async_reduce: bool = True,
verbose: bool = False,
) -> None:
super().__init__()
@ -388,6 +389,7 @@ class GeminiPlugin(DPPluginBase):
mixed_precision=PRECISION_STR_TO_DTYPE[precision],
master_weights=master_weights,
max_prefetch=max_prefetch,
enable_async_reduce=enable_async_reduce,
)
self.zero_optim_config = dict(
gpu_margin_mem_ratio=gpu_margin_mem_ratio,

View File

@ -207,13 +207,13 @@ Learnt from [PagedAttention](https://arxiv.org/abs/2309.06180) by [vLLM](https:/
Request handler is responsible for managing requests and scheduling a proper batch from exisiting requests. Based on [Orca's](https://www.usenix.org/conference/osdi22/presentation/yu) and [vLLM's](https://github.com/vllm-project/vllm) research and work on batching requests, we applied continuous batching with unpadded sequences, which enables various number of sequences to pass projections (i.e. Q, K, and V) together in different steps by hiding the dimension of number of sequences, and decrement the latency of incoming sequences by inserting a prefill batch during a decoding step and then decoding together.
<p align="center">
<img src="https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/inference/continuous_batching.png" width="800"/>
<img src="https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/inference/naive_batching.png" width="800"/>
<br/>
<em>Naive Batching: decode until each sequence encounters eos in a batch</em>
</p>
<p align="center">
<img src="https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/inference/naive_batching.png" width="800"/>
<img src="https://raw.githubusercontent.com/hpcaitech/public_assets/main/colossalai/img/inference/continuous_batching.png" width="800"/>
<br/>
<em>Continuous Batching: dynamically adjust the batch size by popping out finished sequences and inserting prefill batch</em>
</p>
@ -222,6 +222,54 @@ Request handler is responsible for managing requests and scheduling a proper bat
Modeling contains models, layers, and policy, which are hand-crafted for better performance easier usage. Integrated with `shardformer`, users can define their own policy or use our preset policies for specific models. Our modeling files are aligned with [Transformers](https://github.com/huggingface/transformers). For more details about the usage of modeling and policy, please check `colossalai/shardformer`.
## Online Service
Colossal-Inference supports fast-api based online service. Simple completion and chat are both supported. Follow the commands below and you can simply construct a server with both completion and chat functionalities. For now we support `Llama2`,`Llama3` and `Baichuan2` model, etc. we will fullfill the blank quickly.
### API
- GET '/ping':
Ping is used to check if the server can receive and send information.
- GET '/engine_check':
Check is the background engine is working.
- POST '/completion':
Completion api is used for single sequence request, like answer a question or complete words.
- POST '/chat':
Chat api is used for conversation-style request, which often includes dialogue participants(i.e. roles) and corresponding words. Considering the input data are very different from normal inputs, we introduce Chat-Template to match the data format in chat models.
#### chat-template
Followed `transformers`, we add the chat-template argument. As chat models have been trained with very different formats for converting conversations into a single tokenizable string. Using a format that matches the training data is extremely important. This attribute(chat_template) is inclueded in HuggingFace tokenizers, containing a Jinja template that converts conversation histories into a correctly formatted string. You can refer to the [HuggingFace-blog](https://huggingface.co/blog/chat-templates) for more information. We also provide a simple example temlate bellow. Both str or file style chat template are supported.
### Usage
#### Args for customizing your server
The configuration for api server contains both serving interface and engine backend.
For Interface:
- `--host`: The host url on your device for the server.
- `--port`: The port for service
- `--model`: The model that backend engine uses, both path and transformers model card are supported.
- `--chat-template` The file path of chat template or the template string.
- `--response-role` The role that colossal-inference plays.
For Engine Backend:
- `--block_size`: The memory usage for each block.
- `--max_batch_size`: The max batch size for engine to infer. This changes the speed of inference,
- `--max_input_len`: The max input length of a request.
- `--max_output_len`: The output length of response.
- `--dtype` and `--use_cuda_kernel`: Deciding the precision and kernel usage.
For more detailed arguments, please refer to source code.
### Examples
```bash
# First, Lauch an API locally.
python3 -m colossalai.inference.server.api_server --model path of your model --chat-template "{% for message in messages %}{{'<|im_start|>'+message['role']+'\n'+message['content']+'<|im_end|>'+'\n'}}{% endfor %}"
# Second, you can turn to the page `http://127.0.0.1:8000/docs` to check the api
# For completion service, you can invoke it
curl -X POST http://127.0.0.1:8000/completion -H 'Content-Type: application/json' -d '{"prompt":"hello, who are you? "}'
# For chat service, you can invoke it
curl -X POST http://127.0.0.1:8000/chat -H 'Content-Type: application/json' -d '{"messages":[{"role":"system","content":"you are a helpful assistant"},{"role":"user","content":"what is 1+1?"}]}'
# You can check the engine status now
curl http://localhost:8000/engine_check
```
## 🌟 Acknowledgement
@ -229,7 +277,7 @@ This project was written from scratch but we learned a lot from several other gr
- [vLLM](https://github.com/vllm-project/vllm)
- [flash-attention](https://github.com/Dao-AILab/flash-attention)
- [HuggingFace](https://huggingface.co)
If you wish to cite relevant research papars, you can find the reference below.
```bibtex

View File

@ -1,27 +0,0 @@
# Online Service
Colossal-Inference supports fast-api based online service. Simple completion and chat are both supported. Follow the commands below and
you can simply construct a server with both completion and chat functionalities. For now we only support `Llama` model, we will fullfill
the blank quickly.
# Usage
```bash
# First, Lauch an API locally.
python3 -m colossalai.inference.server.api_server --model path of your llama2 model --chat_template "{% for message in messages %}
{{'<|im_start|>' + message['role'] + '\n' + message['content'] + '<|im_end|>' + '\n'}}{% endfor %}"
# Second, you can turn to the page `http://127.0.0.1:8000/docs` to check the api
# For completion service, you can invoke it
curl -X POST http://127.0.0.1:8000/completion -H 'Content-Type: application/json' -d '{"prompt":"hello, who are you? ","stream":"False"}'
# For chat service, you can invoke it
curl -X POST http://127.0.0.1:8000/completion -H 'Content-Type: application/json' -d '{"converation":
[{"role": "system", "content": "you are a helpful assistant"},
{"role": "user", "content": "what is 1+1?"},],
"stream": "False",}'
# If you just want to test a simple generation, turn to generate api
curl -X POST http://127.0.0.1:8000/generate -H 'Content-Type: application/json' -d '{"prompt":"hello, who are you? ","stream":"False"}'
```
We also support streaming output, simply change the `stream` to `True` in the request body.

View File

@ -30,7 +30,6 @@ from colossalai.inference.utils import find_available_ports
from colossalai.inference.core.async_engine import AsyncInferenceEngine, InferenceEngine # noqa
TIMEOUT_KEEP_ALIVE = 5 # seconds.
supported_models_dict = {"Llama_Models": ("llama2-7b",)}
prompt_template_choices = ["llama", "vicuna"]
async_engine = None
chat_serving = None
@ -39,15 +38,25 @@ completion_serving = None
app = FastAPI()
# NOTE: (CjhHa1) models are still under development, need to be updated
@app.get("/models")
def get_available_models() -> Response:
return JSONResponse(supported_models_dict)
@app.get("/ping")
def health_check() -> JSONResponse:
"""Health Check for server."""
return JSONResponse({"status": "Healthy"})
@app.get("/engine_check")
def engine_check() -> bool:
"""Check if the background loop is running."""
loop_status = async_engine.background_loop_status
if loop_status == False:
return JSONResponse({"status": "Error"})
return JSONResponse({"status": "Running"})
@app.post("/generate")
async def generate(request: Request) -> Response:
"""Generate completion for the request.
NOTE: THIS API IS USED ONLY FOR TESTING, DO NOT USE THIS IF YOU ARE IN ACTUAL APPLICATION.
A request should be a JSON object with the following fields:
- prompts: the prompts to use for the generation.
@ -133,7 +142,7 @@ def add_engine_config(parser):
# Parallel arguments not supported now
# KV cache arguments
parser.add_argument("--block-size", type=int, default=16, choices=[8, 16, 32], help="token block size")
parser.add_argument("--block_size", type=int, default=16, choices=[16, 32], help="token block size")
parser.add_argument("--max_batch_size", type=int, default=8, help="maximum number of batch size")

View File

@ -164,6 +164,8 @@ class Chunk:
self.l2_norm = None
self.grad_chunk = None
# the async all-reduce/reduce-scatter work of this grad chunk (None means sync)
self.grad_reduce_work = None
@property
def memory_usage(self) -> Dict[str, int]:
@ -244,7 +246,7 @@ class Chunk:
assert self.cuda_shard is not None # only check on CUDA
valid_tensor = self.cuda_shard[: self.valid_end]
return torch.isinf(valid_tensor).any().item() | torch.isnan(valid_tensor).any().item()
return torch.isinf(valid_tensor).any() | torch.isnan(valid_tensor).any()
def set_l2_norm(self) -> None:
"""Record l2 norm of this chunks on CUDA."""
@ -375,37 +377,49 @@ class Chunk:
if self.is_gathered:
self.__scatter()
def reduce(self):
def reduce(self, async_op: bool = False):
"""Reduce scatter all the gradients. It's an operation done in CUDA."""
# sanity check
assert self.is_gathered
assert self.grad_reduce_work is None
if self.pg_size == 1:
# tricky code here
# just move cuda_global_chunk to cuda_shard
# the communication is not necessary
self.__scatter()
if self.extra_dp_group is not None:
dist.all_reduce(self.cuda_shard, group=self.extra_dp_group)
self.grad_reduce_work = dist.all_reduce(self.cuda_shard, group=self.extra_dp_group, async_op=async_op)
elif self.keep_gathered:
# we use all-reduce here
dist.all_reduce(self.cuda_global_chunk, group=self.torch_pg)
if self.extra_dp_group is not None:
dist.all_reduce(self.cuda_global_chunk, group=self.extra_dp_group)
self.grad_reduce_work = dist.all_reduce(self.cuda_global_chunk, group=self.torch_pg, async_op=async_op)
if self.extra_dp_group is not None: # cannot guranatee the order of multiple all-reduce
self.wait_async_reduce()
self.grad_reduce_work = dist.all_reduce(
self.cuda_global_chunk, group=self.extra_dp_group, async_op=async_op
)
else:
self.cuda_shard = torch.empty(
self.shard_size, dtype=self.dtype, device=get_accelerator().get_current_device()
)
input_list = list(torch.chunk(self.cuda_global_chunk, chunks=self.pg_size, dim=0))
dist.reduce_scatter(self.cuda_shard, input_list, group=self.torch_pg)
self.grad_reduce_work = dist.reduce_scatter(
self.cuda_shard, input_list, group=self.torch_pg, async_op=async_op
)
if self.extra_dp_group is not None:
dist.all_reduce(self.cuda_shard, group=self.extra_dp_group)
self.wait_async_reduce()
self.grad_reduce_work = dist.all_reduce(self.cuda_shard, group=self.extra_dp_group, async_op=async_op)
free_storage(self.cuda_global_chunk)
self.is_gathered = False
self.__update_tensors_state(TensorState.HOLD)
def wait_async_reduce(self) -> None:
if self.grad_reduce_work is not None:
self.grad_reduce_work.wait()
self.grad_reduce_work = None
def tensor_trans_state(self, tensor: torch.Tensor, tensor_state: TensorState) -> None:
"""
Make a transition of the tensor into the next state.

View File

@ -41,7 +41,7 @@ class ChunkManager:
self.reuse_fp16_chunk = reuse_fp16_chunk
# Whether model is accumulating gradients,
self.accumulating_grads = False
self.overflow_counter = 0
self.overflow_counter = torch.tensor([0], dtype=torch.int, device=get_accelerator().get_current_device())
def register_tensor(
self,
@ -144,12 +144,12 @@ class ChunkManager:
chunk = self.tensor_chunk_map[tensor]
chunk.tensor_trans_state(tensor, state)
def reduce_chunk(self, chunk: Chunk) -> bool:
def reduce_chunk(self, chunk: Chunk, async_op: bool = False) -> bool:
"""Reduce or all reduce the chunk."""
if not chunk.can_reduce:
return False
self.__sub_memory_usage(chunk.memory_usage)
chunk.reduce()
chunk.reduce(async_op=async_op)
self.__sub_accessed_chunk(chunk)
self.__add_memory_usage(chunk.memory_usage)
return True
@ -274,7 +274,7 @@ class ChunkManager:
return grad_chunk
def rearrange_accumulated_grad_chunk(self, chunk: Chunk) -> Chunk:
"""Rearrange gradients accumulated in chunk.grad_chunk, and getP prepared for gradient reduction."""
"""Rearrange gradients accumulated in chunk.grad_chunk, and get prepared for gradient reduction."""
assert chunk.grad_chunk is not None

View File

@ -97,6 +97,7 @@ class GeminiDDP(ModelWrapper):
master_weights: bool = True,
extra_dp_group: Optional[ProcessGroup] = None,
verbose: bool = False,
enable_async_reduce: bool = True,
) -> None:
assert mixed_precision in (torch.float16, torch.bfloat16)
reuse_fp16_chunk = master_weights if not enable_gradient_accumulation else False
@ -180,6 +181,7 @@ class GeminiDDP(ModelWrapper):
if is_ddp_ignored(p):
continue
if p.requires_grad:
assert not hasattr(p, "_grad_handle")
p._grad_handle = p.register_hook(
partial(
GeminiDDP.grad_handle,
@ -189,6 +191,7 @@ class GeminiDDP(ModelWrapper):
master_weights=self.master_weights,
enable_gradient_accumulation=self.enable_gradient_accumulation,
p=p,
async_reduce=enable_async_reduce,
)
)
@ -336,6 +339,11 @@ class GeminiDDP(ModelWrapper):
setattr(param, "_gemini_reduced", False)
def _post_backward(self):
for param in self.param2name:
if hasattr(param, "_release_grad_chunk_cb"):
param._release_grad_chunk_cb()
delattr(param, "_release_grad_chunk_cb")
if self.chunk_manager.accessed_mem != 0:
error_params = ["Reduction failed at followed parameters:"]
for param in self.param2name:
@ -373,6 +381,7 @@ class GeminiDDP(ModelWrapper):
master_weights: bool,
enable_gradient_accumulation: bool,
p: nn.Parameter,
async_reduce: bool,
):
setattr(p, "_gemini_reduced", True)
empty_grad = torch.empty_like(grad)
@ -408,31 +417,57 @@ class GeminiDDP(ModelWrapper):
grad_chunk.copy_tensor_to_chunk_slice(p, grad, update_ptr=chunk_manager.reuse_fp16_chunk)
else:
grad_chunk.add_tensor_to_chunk_slice(p, grad)
reduced = chunk_manager.reduce_chunk(grad_chunk)
if reduced:
if not chunk_manager.reuse_fp16_chunk:
if chunk.keep_gathered:
chunk_manager.fake_release_chunk(chunk)
else:
chunk_manager.release_chunk(chunk)
if grad_chunk.is_gathered:
grad_chunk.cuda_global_chunk.div_(chunk.pg_size)
if chunk.extra_dp_group is not None:
grad_chunk.cuda_global_chunk.div_(chunk.extra_dp_size)
reduced = chunk_manager.reduce_chunk(grad_chunk, async_op=async_reduce)
if reduced: # if not async, can release immediately, else release in when work finished
if async_reduce:
# dirty fix by installing callback
assert not hasattr(p, "_release_grad_chunk_cb")
def _release_grad_chunk_cb():
grad_chunk.wait_async_reduce()
GeminiDDP.release_grad_chunk_handle(
chunk_manager,
grads_device,
master_weights,
enable_gradient_accumulation,
p,
chunk,
grad_chunk,
)
p._release_grad_chunk_cb = _release_grad_chunk_cb
else:
grad_chunk.cuda_shard.div_(chunk.pg_size)
if chunk.extra_dp_group is not None:
grad_chunk.cuda_shard.div_(chunk.extra_dp_size)
# check overflow elements
chunk_manager.overflow_counter += grad_chunk.has_inf_or_nan
# record l2 norm for gradient clipping. flag is bound to fp16 chunk
if chunk.l2_norm_flag:
grad_chunk.set_l2_norm()
chunk_manager.move_chunk(grad_chunk, grads_device[p], force_copy=True)
if not (master_weights) or (enable_gradient_accumulation):
chunk_manager.move_chunk(chunk, grads_device[p], force_copy=True)
GeminiDDP.release_grad_chunk_handle(
chunk_manager, grads_device, master_weights, enable_gradient_accumulation, p, chunk, grad_chunk
)
return empty_grad
@staticmethod
def release_grad_chunk_handle(
chunk_manager, grads_device, master_weights, enable_gradient_accumulation, p, chunk, grad_chunk
):
if not chunk_manager.reuse_fp16_chunk:
if chunk.keep_gathered:
chunk_manager.fake_release_chunk(chunk)
else:
chunk_manager.release_chunk(chunk)
if grad_chunk.is_gathered:
grad_chunk.cuda_global_chunk.div_(chunk.pg_size)
if chunk.extra_dp_group is not None:
grad_chunk.cuda_global_chunk.div_(chunk.extra_dp_size)
else:
grad_chunk.cuda_shard.div_(chunk.pg_size)
if chunk.extra_dp_group is not None:
grad_chunk.cuda_shard.div_(chunk.extra_dp_size)
# check overflow elements
chunk_manager.overflow_counter += grad_chunk.has_inf_or_nan
# record l2 norm for gradient clipping. flag is bound to fp16 chunk
if chunk.l2_norm_flag:
grad_chunk.set_l2_norm()
chunk_manager.move_chunk(grad_chunk, grads_device[p], force_copy=True)
if not (master_weights) or (enable_gradient_accumulation):
chunk_manager.move_chunk(chunk, grads_device[p], force_copy=True)
def zero_grad(self, set_to_none: bool = False) -> None:
self.module.zero_grad(set_to_none=True)

View File

@ -62,10 +62,10 @@ class GeminiFP16MixedPrecisionMixin(FP16MixedPrecisionMixin):
self.module = module
def check_local_overflow(self) -> bool:
return self.module.chunk_manager.overflow_counter > 0
return self.module.chunk_manager.overflow_counter.item() > 0
def pre_zero_grad(self) -> None:
self.module.chunk_manager.overflow_counter = 0
self.module.chunk_manager.overflow_counter.zero_()
class GeminiOptimizer(OptimizerWrapper):

View File

@ -20,7 +20,7 @@ class QuickstartUser(HttpUser):
self.client.post(
"/chat",
json={
"converation": [
"messages": [
{"role": "system", "content": "you are a helpful assistant"},
{"role": "user", "content": "what is 1+1?"},
],
@ -34,7 +34,7 @@ class QuickstartUser(HttpUser):
self.client.post(
"/chat",
json={
"converation": [
"messages": [
{"role": "system", "content": "you are a helpful assistant"},
{"role": "user", "content": "what is 1+1?"},
],
@ -42,6 +42,7 @@ class QuickstartUser(HttpUser):
},
)
# offline-generation is only for showing the usage, it will never be used in actual serving.
@tag("offline-generation")
@task(5)
def generate_streaming(self):
@ -54,5 +55,5 @@ class QuickstartUser(HttpUser):
@tag("online-generation", "offline-generation")
@task
def get_models(self):
self.client.get("/models")
def health_check(self):
self.client.get("/ping")

View File

@ -78,6 +78,8 @@ def main():
parser.add_argument("--zero", type=int, default=0, help="Zero Stage when hybrid plugin is enabled")
parser.add_argument("--custom-ckpt", action="store_true", help="Customize checkpoint", default=False)
parser.add_argument("--profile", action="store_true", help="Enable profiling", default=False)
parser.add_argument("--disable-async-reduce", action="store_true", help="Customize checkpoint", default=False)
args = parser.parse_args()
colossalai.launch_from_torch()
@ -113,6 +115,7 @@ def main():
enable_fused_normalization=torch.cuda.is_available(),
enable_flash_attention=args.xformers,
max_prefetch=10,
enable_async_reduce=not args.disable_async_reduce,
)
elif args.plugin == "gemini_auto":
plugin = GeminiPlugin(

View File

@ -20,4 +20,6 @@ transformers==4.36.2
peft>=0.7.1
bitsandbytes>=0.39.0
rpyc==6.0.0
fastapi
uvicorn==0.29.0
galore_torch

View File

@ -34,7 +34,8 @@ def check_equal(param, param_cp):
@parameterize("init_device", [None, torch.device("cpu")])
@parameterize("keep_gathered", [True, False])
@parameterize("pin_memory", [True, False])
def exam_chunk_basic(init_device, keep_gathered, pin_memory):
@parameterize("async_op", [True, False])
def exam_chunk_basic(init_device, keep_gathered, pin_memory, async_op):
world_size = torch.distributed.get_world_size()
pg = _get_default_group()
my_chunk = Chunk(
@ -94,9 +95,12 @@ def exam_chunk_basic(init_device, keep_gathered, pin_memory):
assert my_chunk.tensor_state_cnter[TensorState.READY_FOR_REDUCE] == 4
assert my_chunk.can_reduce
my_chunk.reduce()
my_chunk.reduce(async_op)
assert my_chunk.tensor_state_cnter[TensorState.HOLD] == 4
if async_op:
my_chunk.wait_async_reduce()
if keep_gathered is False:
assert my_chunk.cuda_shard.size(0) == 1024 // world_size
assert my_chunk.device_type == "cuda"

View File

@ -41,6 +41,7 @@ def check_grad(model: GeminiDDP, torch_model: torch.nn.Module):
@parameterize("use_grad_checkpoint", [False, True])
@parameterize("master_weights", [False, True])
@parameterize("max_prefetch", [0, 1, 4])
@parameterize("enable_async_reduce", [False, True])
def exam_gpt_fwd_bwd(
placement_config,
keep_gather,
@ -48,6 +49,7 @@ def exam_gpt_fwd_bwd(
use_grad_checkpoint: bool = False,
master_weights: bool = True,
max_prefetch: int = 0,
enable_async_reduce=True,
):
init_device = get_accelerator().get_current_device()
model_builder, data_gen_fn, output_transform_fn, loss_fn, *_ = next(
@ -78,6 +80,7 @@ def exam_gpt_fwd_bwd(
**placement_config,
master_weights=master_weights,
max_prefetch=max_prefetch,
enable_async_reduce=enable_async_reduce,
)
optimizer = HybridAdam(model.parameters(), lr=1e-3)
zero_optim = GeminiOptimizer(optimizer, model, initial_scale=1)

View File

@ -51,6 +51,7 @@ def check_grad(model: GeminiDDP, torch_model: torch.nn.Module):
@parameterize("master_weights", [False, True])
@parameterize("use_grad_checkpoint", [False, True])
@parameterize("max_prefetch", [0, 1, 4])
@parameterize("enable_async_reduce", [False, True])
def exam_gemini_grad_acc(
placement_config,
keep_gathered: bool,
@ -58,6 +59,7 @@ def exam_gemini_grad_acc(
master_weights: bool,
use_grad_checkpoint: bool,
max_prefetch: int,
enable_async_reduce: bool,
):
init_device = get_accelerator().get_current_device()
model_builder, data_gen_fn, output_transform_fn, loss_fn, *_ = next(
@ -88,10 +90,13 @@ def exam_gemini_grad_acc(
enable_gradient_accumulation=True,
master_weights=master_weights,
max_prefetch=max_prefetch,
enable_async_reduce=enable_async_reduce,
**placement_config,
)
optimizer = HybridAdam(gemini_model.parameters(), lr=1e-3)
gemini_optim = GeminiOptimizer(optimizer, gemini_model, initial_scale=1, max_norm=1.0)
gemini_optim = GeminiOptimizer(
optimizer, gemini_model, initial_scale=1, max_norm=1.0, enable_async_reduce=enable_async_reduce
)
rank = dist.get_rank()

View File

@ -53,7 +53,10 @@ def check_param(model: GeminiDDP, torch_model: torch.nn.Module):
@parameterize("model_name", ["transformers_gpt_lm"])
@parameterize("master_weights", [True, False])
@parameterize("max_prefetch", [0, 1, 4])
def exam_grad_clipping(placement_config, model_name: str, master_weights: bool, max_prefetch: int):
@parameterize("enable_async_reduce", [False, True])
def exam_grad_clipping(
placement_config, model_name: str, master_weights: bool, max_prefetch: int, enable_async_reduce: bool
):
set_seed(1912)
model_builder, data_gen_fn, output_transform_fn, loss_fn, *_ = next(
iter(model_zoo.get_sub_registry(model_name).values())
@ -86,6 +89,7 @@ def exam_grad_clipping(placement_config, model_name: str, master_weights: bool,
pin_memory=True,
master_weights=master_weights,
max_prefetch=max_prefetch,
enable_async_reduce=enable_async_reduce,
**placement_config,
)

View File

@ -72,8 +72,14 @@ def check_param(model: GeminiDDP, torch_model: torch.nn.Module, dtype: torch.dty
@parameterize("mixed_precision", [torch.half, torch.bfloat16])
@parameterize("master_weights", [True, False])
@parameterize("max_prefetch", [0, 1, 4])
@parameterize("enable_async_reduce", [False, True])
def exam_model_step(
placement_config, model_name: str, mixed_precision: torch.dtype, master_weights: bool, max_prefetch: int
placement_config,
model_name: str,
mixed_precision: torch.dtype,
master_weights: bool,
max_prefetch: int,
enable_async_reduce=True,
):
set_seed(42)
model_builder, data_gen_fn, output_transform_fn, loss_fn, *_ = next(
@ -103,6 +109,7 @@ def exam_model_step(
mixed_precision=mixed_precision,
master_weights=master_weights,
max_prefetch=max_prefetch,
enable_async_reduce=enable_async_reduce,
)
optimizer = HybridAdam(model.parameters(), lr=1e-3)