import enum from dataclasses import dataclass from typing import Any, List from colossalai.logging import get_dist_logger logger = get_dist_logger(__name__) """ The abstraction of request and sequence are defined here. """ class RequestStatus(enum.Enum): """ The status of Sentences """ # running status WAITING = enum.auto() RUNNING = enum.auto() ABORTED = enum.auto() # completion status OVERLENGTH = enum.auto() COMPLETED = enum.auto() LENGTH_CAPPED = enum.auto() # recycle status RECYCLED = enum.auto() @staticmethod def is_finished(status: "RequestStatus") -> bool: return status in [ RequestStatus.OVERLENGTH, RequestStatus.COMPLETED, RequestStatus.LENGTH_CAPPED, ] @staticmethod def is_running(status: "RequestStatus") -> bool: return status == RequestStatus.RUNNING @staticmethod def is_waiting(status: "RequestStatus") -> bool: return status == RequestStatus.WAITING @dataclass class Sequence: """Store information of input sequence. Args: request_id (int): The ID of input sequence. prompt (str): The prompt of input sequence. input_token_id (List[int]): The tokens ID of input sequence. block_size (int): The block size of input sequence. sample_params (SampleParams): The sample_params of input sequence. block_table (torch.Tensor): The index of input sequence in block_table. eos_token_id (int): The eos token id for this inference process. pad_token_id (int): The pad token id for this inference process. max_output_len (int): Maximum output length. ignore_eos(bool): Whether to ignore the EOS token and continue generating tokens when encountering the EOS token. output(str): The output of sequence """ request_id: int prompt: str input_token_id: List[int] block_size: int sample_params: Any # SampleParams needs to be imported later. eos_token_id: int pad_token_id: int max_output_len: int = 256 # NOTE(caidi) This is a temporary solution. It's better to move the logic to turn on or off the flag in sampling module in future. ignore_eos: bool = False output: str = None def __post_init__(self): self.output_token_id = [] self.status = RequestStatus.WAITING @property def sentence_len(self) -> int: """ Get length of current sentence. """ return len(self.input_token_id) + len(self.output_token_id) @property def input_len(self) -> int: """ Get length of input sentence. """ return len(self.input_token_id) @property def output_len(self) -> int: """ Get length of output sentence. """ return len(self.output_token_id) def check_finish(self) -> bool: """ Check whether the inference is finished. Returns: bool: Whether the inference is finished. """ if RequestStatus.is_finished(self.status): return True if self.output_token_id: if ( self.output_token_id[-1] == self.eos_token_id and not self.ignore_eos ) or self.output_len >= self.max_output_len: self.status = RequestStatus.COMPLETED return True return False def revoke_finished_status(self) -> None: """ Revoke the finished status of the sequence. This is only used by speculative decoding for now. """ if RequestStatus.is_finished(self.status): self.status = RequestStatus.RUNNING def __hash__(self): return hash(self.request_id) def mark_running(self) -> None: """ Set status for prefill reqs. """ assert ( self.status == RequestStatus.WAITING or RequestStatus.RECYCLED ), "Sequence is not in WAITTING/RECYCLED STATUS" self.status = RequestStatus.RUNNING def mark_finished(self) -> None: """ Set status for finished reqs. """ self.status = RequestStatus.COMPLETED def mark_aborted(self) -> None: """ Set status for aborted reqs. """ self.status = RequestStatus.ABORTED def recycle(self) -> None: """ Recycle a running sequnce to waiitting list """ assert ( not self.check_finish() and not self.status == RequestStatus.ABORTED ), "The running sequence \ is already done but it still in running list" self.status = RequestStatus.RECYCLED def __repr__(self) -> str: return ( f"(request_id={self.request_id}, " f"prompt={self.prompt},\n" f"output_token_id={self.output_token_id},\n" f"output={self.output},\n" f"status={self.status.name},\n" f"sample_params={self.sample_params},\n" f"input_len={self.input_len},\n" f"output_len={self.output_len})\n" ) def _pad_to_max(x: List[int], max_len: int, pad: int) -> List[int]: assert len(x) <= max_len return [pad] * (max_len - len(x)) + x