跳转到主要内容

Documentation Index

Fetch the complete documentation index at: https://ahvn.top/llms.txt

Use this file to discover all available pages before exploring further.

函数工具处理那些容易写散的运行时动作:可复现随机数据、有界并行、以及带明确输出契约的命令执行。
Callable 持久化和 function-schema 元数据会放到 Capsule 与 prompt 能力中设计;它们还不是当前通用 utility API 的一部分。

1. 用 StableRNG 生成可复现数据

测试、demo 和 benchmark 需要 mock data 时使用 StableRNG。相同 seed 和 step 路径会生成相同数据。
from heavenbase.utils import StableRNG

rng = StableRNG(seed=42).step("bench", "rows")
ids = rng.choice(["p1", "p2", "p3", "p4"], k=2, replace=False)
vec = rng.rnd_vec(dim=4)
step(...) 从同一个 seed 派生独立子随机流,并且不会修改父 generator:
base = StableRNG(seed=42)
rows_rng = base.step("rows")
sample_rng = base.step("samples")
HeavenBase benchmark 默认使用 StableRNG(seed=42),因此生成的行、样本和向量都可以复现。

2. 有界并行

需要并发执行但保持结果顺序时使用 pmap
from heavenbase.utils import pmap

tasks = [{"row": row} for row in rows]
scores = pmap(lambda row: row["score"] * 2, tasks, max_workers=4)
并行 helper 会把每个 item 当成关键字参数传给函数,也就是 fn(**item)。因此 batch item 应该是 dict-like mapping;函数本身可以保留清晰的参数签名。 副作用任务使用 pforeach。调试或测试时可以设 max_workers=1,保留同样的调用形状但走串行路径。max_workers=None 会读取 heavenbase.parallel.max_workers;如果配置未设置,HeavenBase 使用基于 CPU 的有界默认值,并受 heavenbase.parallel.fallback_max_workers 限制。 需要逐项状态、异常、耗时或进度回调时使用 batch。它返回 TaskResult,包含 index, item, ok, value, error, elapsed_ms
from heavenbase.utils import batch

results = batch(lambda x: 10 / x, [{"x": 2}, {"x": 0}, {"x": 5}], max_workers=2, on_error="capture")
assert [result.ok for result in results] == [True, False, True]
assert results[1].error is not None
batch_stream 会在任务完成时流式产出 TaskResult。默认 ordered=False,也就是完成顺序;如果消费者需要输入顺序,传 ordered=True。有序流仍然并发执行,只是较晚的 index 可能会等前面的 index 就绪后再产出。
from heavenbase.utils import batch_stream

for result in batch_stream(work, [{"payload": item} for item in items], max_workers=8, ordered=False):
    if result.ok:
        handle(result.value)
错误策略是显式的:
  • on_error="capture":失败任务保留为 TaskResult(ok=False, error=...)
  • on_error="raise":遇到第一个已观察到的错误就抛出,并停止提交新任务;已经在线程池里运行的任务会在 executor 清理期间自然结束。
  • on_error="skip":输出中省略失败任务。
异步代码使用同样契约:abatchabatch_stream。异步 helper 可以直接接收 async callable;同步 callable 会通过 event loop executor 执行。
from heavenbase.utils import abatch

async def enrich(row):
    return {**row, "score": row["score"] * 2}

results = await abatch(enrich, [{"row": row} for row in rows], max_workers=8, ordered=True)
payloads = [result.value for result in results if result.ok]

3. 命令执行

脚本需要调用本地命令并检查 stdout、stderr 和退出码时使用 cmd
from heavenbase.utils import cmd

result = cmd(["python", "--version"])
if not result.ok:
    raise RuntimeError(result.err)
print(result.out)
如果只需要具体字段,可以使用 include=["stdout", "stderr", "returncode"] 返回字典;单个 include="out" 只返回该字段。新代码优先使用 CmdResult 对象。

Further Exploration

相关资源:
  • 通用工具 - 命令失败时的日志和 debug helper。
  • 工具概览 - 选择合适的 utils 分组。