Title here
Summary here
阅读时间: 约 15 分钟 适用人群: 需要调试存储相关问题的开发者
本文提供存储层的调试入口点、常用技巧和常见问题解决方案。
存储层调试入口
==============
1. 存储工厂创建
└── ucm/store/factory.py:50
UcmConnectorFactory.create_connector()
2. Pipeline 存储初始化
└── ucm/store/pipeline/connector.py:56
UcmPipelineStore.__init__()
3. Block 查找
└── ucm/store/ucmstore.py:72
UcmKVStoreBase.lookup()
4. KV 加载
└── ucm/store/ucmstore.py:95
UcmKVStoreBase.load()
5. KV 保存
└── ucm/store/ucmstore.py:115
UcmKVStoreBase.dump()
6. 任务等待
└── ucm/store/ucmstore.py:135
UcmKVStoreBase.wait()
7. POSIX 后端
└── ucm/store/posix/connector.py:30
UcmPosixStore.__init__()
8. Cache 后端
└── ucm/store/cache/connector.py:25
UcmCacheStore.__init__()
export UNIFIED_CACHE_LOG_LEVEL=DEBUG
python your_script.py[UCM.Store] # 存储层通用日志
[UCM.Posix] # POSIX 后端日志
[UCM.Cache] # Cache 后端日志
[UCM.Pipeline] # Pipeline 日志
[UCM.Factory] # 工厂日志[UCM.Factory] Creating connector: UcmPipelineStore
[UCM.Pipeline] Pipeline stages: ['Cache', 'Posix']
[UCM.Cache] Buffer pool initialized: 1024 buffers
[UCM.Posix] Storage path: /tmp/ucm_cache
[UCM.Store] lookup: 32 blocks, hit: 28
[UCM.Store] load: task_id=1, blocks=4
[UCM.Store] dump: task_id=2, blocks=32
from ucm.store.factory import UcmConnectorFactory
print("Registered backends:")
for name in UcmConnectorFactory._registry:
print(f" - {name}")
store = UcmConnectorFactory.create_connector(
"UcmPipelineStore",
config
)
print(f"Store type: {type(store)}")
print(f"CC store ptr: {store.cc_store()}")import hashlib
# 生成测试 Block ID
test_block_ids = [
hashlib.md5(f"test_block_{i}".encode()).digest()
for i in range(10)
]
# 测试 lookup
results = store.lookup(test_block_ids)
print(f"Lookup results: {results}")
print(f"Hit count: {sum(results)}")
print(f"Miss count: {len(results) - sum(results)}")import torch
# 准备测试张量
block_size = 16
num_layers = 32
head_dim = 128
num_heads = 32
# 模拟 KV Cache 张量
kv_tensor = torch.zeros(
(num_layers * 2, block_size, num_heads, head_dim),
dtype=torch.float16,
device='cuda'
)
block_ids = [hashlib.md5(f"block_{i}".encode()).digest() for i in range(1)]
task = store.dump(block_ids, offset=0, src_tensor=kv_tensor)
print(f"Dump task: {task}")
status = store.wait(task)
print(f"Dump status: {status}")
# 测试 load
load_tensor = torch.zeros_like(kv_tensor)
task = store.load(block_ids, offset=0, dst_tensor=load_tensor)
status = store.wait(task)
print(f"Load status: {status}")
# 验证数据
if torch.allclose(kv_tensor, load_tensor):
print("Data verification: PASSED")
else:
print("Data verification: FAILED")# ucm/store/factory.py
def create_connector(cls, connector_name: str, config: dict):
# 断点 1: 检查 connector_name 和 config
if connector_name not in cls._registry:
raise ValueError(f"Unknown connector: {connector_name}")
# 断点 2: 检查类加载
connector_cls = cls._get_connector_class(connector_name)
# 断点 3: 检查实例化
return connector_cls(config)# ucm/store/pipeline/connector.py
def __init__(self, config: dict):
# 断点 1: 解析 pipeline 配置
pipeline_spec = config.get("store_pipeline", "Cache|Posix")
# 断点 2: 检查各阶段创建
for stage_name in pipeline_spec.split("|"):
stage = self._create_stage(stage_name, config)
self.stages.append(stage)# ucm/store/ucmstore.py
def load(self, block_ids, offset, dst_tensor):
# 断点 1: 检查参数
print(f"block_ids: {len(block_ids)}")
print(f"offset: {offset}")
print(f"tensor shape: {dst_tensor.shape}")
print(f"tensor device: {dst_tensor.device}")
# 断点 2: 检查任务创建
task = self._create_load_task(block_ids, offset, dst_tensor)
return task症状:
ValueError: Unknown connector: UcmPipelineStore
排查步骤:
from ucm.store.factory import UcmConnectorFactory
print(list(UcmConnectorFactory._registry.keys()))
import ucm.store.pipeline.connector # 触发注册症状:
排查步骤:
print(f"Storage path: {store.storage_path}")
import os
print(f"Path exists: {os.path.exists(store.storage_path)}")
import subprocess
result = subprocess.run(['ls', '-la', store.storage_path], capture_output=True)
print(result.stdout.decode())
# 相同的 token 序列应该产生相同的哈希症状:
wait(task) 长时间不返回排查步骤:
import time
task = store.load(block_ids, offset, tensor)
for i in range(100):
status, done = store.check(task)
print(f"Check {i}: status={status}, done={done}")
if done:
break
time.sleep(0.1)
import torch
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"Current device: {torch.cuda.current_device()}")
# 查看 ucm/shared/trans/ 相关日志症状:
排查步骤:
original = torch.randn(100, dtype=torch.float16, device='cuda')
block_id = hashlib.md5(b"test").digest()
task = store.dump([block_id], 0, original)
store.wait(task)
store.commit([block_id], [True])
loaded = torch.zeros_like(original)
task = store.load([block_id], 0, loaded)
store.wait(task)
diff = (original - loaded).abs().max()
print(f"Max diff: {diff}")
if diff > 1e-5:
print("WARNING: Data mismatch!")
print(f"Original ptr: {original.data_ptr()}")
print(f"Loaded ptr: {loaded.data_ptr()}")
print(f"Aligned: {original.data_ptr() % 512 == 0}")import time
block_ids = generate_block_ids(1000)
start = time.time()
for _ in range(100):
results = store.lookup(block_ids)
elapsed = time.time() - start
print(f"Lookup 1000 blocks x 100 times: {elapsed:.3f}s")
print(f"Average per lookup: {elapsed / 100 * 1000:.2f}ms")import time
import torch
# 准备大块数据
size_mb = 100
data = torch.randn(size_mb * 1024 * 1024 // 2, dtype=torch.float16, device='cuda')
block_ids = [hashlib.md5(f"block_{i}".encode()).digest() for i in range(100)]
start = time.time()
task = store.dump(block_ids, 0, data)
store.wait(task)
elapsed = time.time() - start
speed = size_mb / elapsed
print(f"Dump speed: {speed:.2f} MB/s")
loaded = torch.zeros_like(data)
start = time.time()
task = store.load(block_ids, 0, loaded)
store.wait(task)
elapsed = time.time() - start
speed = size_mb / elapsed
print(f"Load speed: {speed:.2f} MB/s")