Title here
Summary here
特点:
| 特性 | TP | PP |
|---|---|---|
| 切分方式 | 层内 | 层间 |
| 通信模式 | AllReduce | Point-to-Point |
| 内存效率 | 需同步所有 GPU | 可流水线 |
| 适用规模 | 单机 | 跨机 |
问题:GPU 大量空闲等待。
# PP=2
python -m sglang.launch_server \
--model meta-llama/Llama-3.1-70B-Instruct \
--pipeline-parallel-size 2
# TP=2, PP=2
python -m sglang.launch_server \
--model meta-llama/Llama-3.1-70B-Instruct \
--tensor-parallel-size 2 \
--pipeline-parallel-size 2def init_pipeline_parallel(self):
"""初始化流水线并行"""
# 创建 PP 通信组
self.pp_group = create_pipeline_parallel_group(self.pp_size)
# 计算本 rank 负责的层
layers_per_rank = self.num_layers // self.pp_size
self.layer_start = self.pp_rank * layers_per_rank
self.layer_end = self.layer_start + layers_per_rank
# 是否为首/尾 rank
self.is_first_rank = (self.pp_rank == 0)
self.is_last_rank = (self.pp_rank == self.pp_size - 1)def forward_pp(self, input_ids, positions, forward_batch):
"""流水线并行前向"""
if self.is_first_rank:
# 首 rank:处理输入
hidden = self.embed_tokens(input_ids)
else:
# 非首 rank:接收上一 rank 的激活
hidden = recv_tensor(src=self.pp_rank - 1, group=self.pp_group)
# 执行本 rank 的层
for layer_idx in range(self.layer_start, self.layer_end):
hidden = self.layers[layer_idx](hidden, positions, forward_batch)
if self.is_last_rank:
# 尾 rank:计算输出
logits = self.lm_head(hidden)
return logits
else:
# 非尾 rank:发送到下一 rank
send_tensor(hidden, dst=self.pp_rank + 1, group=self.pp_group)
return None推理场景下,主要关注前向调度:
def get_optimal_microbatch_size(batch_size, pp_size):
"""计算最优微批次大小"""
# 微批次数 >= PP size 可减少气泡
num_microbatches = max(pp_size, batch_size // optimal_size)
# 平衡通信和计算
microbatch_size = batch_size // num_microbatches
return microbatch_sizedef send_tensor(tensor, dst, group):
"""发送张量"""
# 发送形状
shape = torch.tensor(tensor.shape, device=tensor.device)
dist.send(shape, dst=dst, group=group)
# 发送数据
dist.send(tensor.contiguous(), dst=dst, group=group)
def recv_tensor(src, group, dtype, device):
"""接收张量"""
# 接收形状
shape = torch.zeros(4, dtype=torch.long, device=device)
dist.recv(shape, src=src, group=group)
# 接收数据
tensor = torch.zeros(*shape.tolist(), dtype=dtype, device=device)
dist.recv(tensor, src=src, group=group)
return tensorasync def forward_pp_async(self, batches):
"""异步流水线前向"""
pending_recv = None
pending_send = None
for batch in batches:
# 等待上一个接收
if pending_recv:
hidden = await pending_recv
# 异步接收下一个
if not self.is_first_rank:
pending_recv = async_recv(src=self.pp_rank - 1)
# 计算
hidden = self.forward_layers(hidden)
# 等待上一个发送
if pending_send:
await pending_send
# 异步发送
if not self.is_last_rank:
pending_send = async_send(hidden, dst=self.pp_rank + 1)# 4 GPU: TP=2, PP=2
# GPU 0,1 -> Stage 0
# GPU 2,3 -> Stage 1
python -m sglang.launch_server \
--model meta-llama/Llama-3.1-70B-Instruct \
--tensor-parallel-size 2 \
--pipeline-parallel-size 2气泡比例 = (PP_Size - 1) / (PP_Size - 1 + Num_Microbatches)
例: PP=4, 微批次=8
气泡比例 = 3 / 11 ≈ 27%
例: PP=4, 微批次=16
气泡比例 = 3 / 19 ≈ 16%
每 Stage 内存 = 模型内存 / PP_Size + 激活内存
激活内存 = 微批次数 × 激活大小
| 场景 | 推荐配置 |
|---|---|
| 单机多卡 | 优先 TP |
| 跨机部署 | TP 机内, PP 跨机 |
| 超大模型 | TP + PP 组合 |
# 70B on 8 GPU
# TP=4 (NVLink 连接), PP=2
python -m sglang.launch_server \
--model meta-llama/Llama-3.1-70B-Instruct \
--tensor-parallel-size 4 \
--pipeline-parallel-size 2
# 405B on 16 GPU (2 nodes)
# 每节点 TP=8, 跨节点 PP=2
python -m sglang.launch_server \
--model meta-llama/Llama-3.1-405B-Instruct \
--tensor-parallel-size 8 \
--pipeline-parallel-size 2| 特性 | TP | PP |
|---|---|---|
| 通信开销 | 高(AllReduce) | 低(P2P) |
| 气泡 | 无 | 有(可优化) |
| 扩展性 | 受限于 NVLink | 可跨机 |
在下一章《专家并行》中,我们将: