1. 今日目标(same with day5)
以一条请求的生命周期为切入点,找到经典设计的代码入口。行业共识主要是三个设计:
- Continuous batching(连续批处理)
- KV cache(以存代算)
- Memory-aware Scheduling(内存感知调度)
根据 Day5 概述,今日先学习这两步:
HTTP Request(OpenAI Protocol)│ ▼ ① API Layer(FastAPI+OpenAI Serving)│ ▼ ② AsyncLLM Engine(Request Enqueuing)2. Phase1:进入 entrypoints
Entrypoints 这个文件夹下,其实就是 API Layer (请求入口) ,它的目的是将外部世界的请求或者命令等转换成内部 engine 调用。这一层会对接不同的服务,比如 openai、anthropic 等。
2.1 entrypoints/openai:兼容 openai 的核心文件:
- vllm/entrypoints/openai/api_server.py - FastAPI 应用入口
- vllm/entrypoints/openai/chat_completion/serving.py - OpenAIServingChat 类,处理 openai 格式的请求
- vllm/entrypoints/openai/chat_completion/api_router.py - 路由处理
什么是 openAI 协议,其实就是 HTTP + JSON,请求格式长得像 OpenAI API。某种程度上就是 openAI 的影响力定义了规则。
形如这种:
POST /v1/chat/completions { "model": "Qwen/Qwen2.5-3B-Instruct", "messages": [ {"role": "user", "content": "你好"} ] }2.2 兼容 Anthropic-style API
vllm/vllm/entrypoints/anthropic2.3 entrypoints/cli
命令行入口。像下面这些命令:
vllm serve vllm bench serve vllm bench throughput // 前面看的跑 benchmark vllm chat vllm complete vllm run-batch都会从 entrypoints/cli 这里进。
2.4 处理流程
以 openai 为例的处理流程:
- HTTP Request (POST /v1/chat/completions)
↓ - api_server.py: build_app() 创建 FastAPI 应用
↓ - api_router.py: create_chat_completion()
↓ - serving.py: openai_serving.render_chat() → Engine Inputs
↓ - engine_client.generate(prompt, sampling_params) → 异步提交请求
可以看下 create_chat_completion 代码:
@with_cancellation @load_aware_call async def create_chat_completion(request: ChatCompletionRequest, raw_request: Request): metrics_header_format = raw_request.headers.get( ENDPOINT_LOAD_METRICS_FORMAT_HEADER_LABEL, "" ) handler = chat(raw_request) if handler is None: raise NotImplementedError("The model does not support Chat Completions API") generator = await handler.create_chat_completion(request, raw_request) // 这里 handler 就是一个 OpenAIServingChat 对象 if isinstance(generator, ErrorResponse): return JSONResponse( content=generator.model_dump(), status_code=generator.error.code ) elif isinstance(generator, ChatCompletionResponse): return JSONResponse( content=generator.model_dump(), headers=metrics_header(metrics_header_format), ) return StreamingResponse(content=generator, media_type="text/event-stream")具体到 openai 的 create_chat_completion 实现:
把 openai 格式的输入处理成 vllm engine 识别的格式代码路径:
result = await self.render_chat_request(request) 对 conversation, engine_inputs = result 进行枚举,然后交给 engine: generator = self.engine_client.generate(engine_inputs, XX) // 这里返回的是一个异步生成器,后面流式/非流式都围绕着他消费。遗留问题:
1、注意到 create_chat_completion 函数里面有一个 logger,可以后续进行观察传递给 engine 的参数:
self._log_inputs( sub_request_id, engine_input, // 模型输入,已经由 chat messages 渲染而来 params=sampling_params, // 控制生成行为,比如 temperature、top_p、max_tokens、stop 等。 lora_request=lora_request, )2、观察 # stream/full generator 内部:看首 token、总生成时间
if request.stream: return self.chat_completion_stream_generator(3. Phase2: 进入 AsyncLLM Engine
接上面的例子,engine_client 就是一个 class EngineClient 类型。EngineClient(ABC) 只是个抽象基类,实际功能实现 class AsyncLLM(EngineClient)。这一步就是把请求放进输入队列中,等待 EngineCore(scheduler) 主循环取走,scheduler 的核心设计就是决定如何取。
3.1 基础流程
代码路径(这里做了版本管理):
- vllm/engine
- vllm-main/vllm/v1/engine
核心文件: /data1/lixizhang/AI/vllm-main/vllm/v1/engine/async_llm.py
class AsyncLLM: async def add_request( self, request_id: str, prompt_token_ids: list[int], params: SamplingParams, ) -> None: # 1. InputProcessor 处理输入 # 2. 添加到 EngineCoreRequest # 3. 通过 IPC 发送到 EngineCore async def generate( self, request: Union[str, list[int], PromptType], sampling_params: SamplingParams = None, request_id: str = None, multi_modal_data: MultiModalData = None, ) -> AsyncIterator[RequestOutput]: # 流式输出的生成器generate 函数核心:
1、input_processor:把外部请求加工成 EngineCoreRequest,输出 EngineCoreRequest 2、queue / client:把 EngineCoreRequest 发送到 EngineCore 进程 3、scheduler loop:EngineCore 取请求,scheduler.schedule() 组 batch 4、output_processor:把 EngineCoreOutputs 还原成 RequestOutput / streaming 输出,每个请求有自己的输出队列/stream注意这代码里面有个控制逻辑:is_pooling 这个是算向量,一次前项传播就结束了,处理更简单。我们关注生成模型。另外还有个控制关于生成多个回答的,用的一个 parent 收集多 child 的输出,每个 child 的输出都会被加入到 queue 中,也都会被作为输入加入到引擎中。
# Add the EngineCoreRequest to EngineCore (separate process). await self.engine_core.add_request_async(request)这里的 engine_core,在 engine/core_client.py 下
# EngineCore (starts the engine in background process). self.engine_core = EngineCoreClient.make_async_mp_client( vllm_config=vllm_config, executor_class=executor_class, log_stats=self.log_stats, client_addresses=client_addresses, client_count=client_count, client_index=client_index, )把 EngineCoreRequest 发送到 EngineCore 走的是 IPC(Inter-process Communication,进程间通信)。API 层和 EngineCore(调度+推理)是两个进程。为什么要区分成两个进程主要有这些原因:
1)稳定性:如果推理崩了:只挂 EngineCore,API server 还能活
2)并发 & 调度:EngineCore 专门做 batching、scheduling、KV cache 管理
3)多 GPU / 多节点:EngineCore 可以:分布式、多 worker
3.2 IPC 模块
常见的 IPC 有:
- multiprocessing Queue
- shared memory
- pipe
- socket
vllm 选择的是 zero message queue(ZMQ)+ 序列化(msgpack),比较适合这种流式的处理,因为 EngineCore 是一边算一边把 token 又推回来的。
3.3 并行 EngineCore
这里其实有三个类:
- AsyncMPClient (所有 api 的请求就对接到同一个 EngineCore)
- DPAsyncMPClient (Data Parallel EngineCore, 外部决定发给哪个 EngineCore)
chosen_engine = self.get_core_engine_for_request(request) - DPLBAsyncMPClient (LB:load balancing,内部自己决定发送给哪个 core)
def make_async_mp_client( vllm_config: VllmConfig, executor_class: type[Executor], log_stats: bool, client_addresses: dict[str, str] | None = None, client_count: int = 1, client_index: int = 0, ) -> "AsyncMPClient": parallel_config = vllm_config.parallel_config client_args = ( vllm_config, executor_class, log_stats, client_addresses, client_count, client_index, ) if parallel_config.data_parallel_size > 1: if parallel_config.data_parallel_external_lb: # External load balancer - client per DP rank. return DPAsyncMPClient(*client_args) # Internal load balancer - client balances to all DP ranks. return DPLBAsyncMPClient(*client_args) return AsyncMPClient(*client_args)我的疑问是,为什么要起多个 EngineCore 进程:
ChatGpt:Data Parallel:模型复制 N 份,每个 GPU 一份。每个 engine 独立的 GPU,独立的 KV-cache,独立的 scheduler不会相互影响,一个 engine 崩了其他不会蹦,有容错。这里先持保留意见,感觉大有学问,后续再来研究,Mark。
如何配置:
1、 启动参数可以指定
python -m vllm.entrypoints.openai.api_server \ --model xxx \ --tensor-parallel-size 1 \ --pipeline-parallel-size 1 \ --data-parallel-size 2data-parallel-size = 2
就会起 2 个 EngineCore
2、默认
–data-parallel-size = GPU 数量
3.4 load balance 设计
DPLBAsyncMPClient 具体是怎么算“最空闲”的(源码级分析),先略。