Show HN: Aiopandas – 为 Pandas 提供异步 .apply() 和 .map(),加速 API/LLMs 调用

🚀 Async-Powered Pandas:轻量级的 Pandas monkey-patch,为 mapapplyapplymapaggregatetransform 添加了异步支持,从而能够无缝处理具有可控并行执行 (max_parallel) 的异步函数。

✨ 特性

🚀 快速开始

import aiopandas as pd # 使用 async 方法 monkey-patch Pandas
import asyncio

# 创建一个示例 DataFrame
df = pd.DataFrame({'x': range(10)})

# 定义一个 async 函数 (模拟 API 调用,I/O 等)
async def f(x):
  await asyncio.sleep(0.1 * x) # 模拟 async 处理
  return x * 2 # 示例转换

# 将 async 函数应用于 DataFrame 列
df['y'] = await df.x.amap(f, max_parallel=5) # 默认 max_parallel=16
print(df)

⚠️ 优雅地处理错误

aiopandas 包含内置的错误处理,允许你管理失败,而不会中断整个操作。

  1. 默认行为 (raise) – 在第一个错误时停止
async def f(x):
  if x > 50 and x % 3:
    raise Exception('exception example')
  await asyncio.sleep(0.1 * x)
  return x

df['y'] = await df.x.amap(f, max_parallel=100) # 引发一个 exception

输出 (Error traceback):

Exception: exception example
  1. 忽略错误 (on_error='ignore')
df['y'] = await df.x.amap(f, max_parallel=100, on_error='ignore') # 轻松忽略 exception

现在,触发 exception 的行将返回 NaN,而不是崩溃:

print(df['y'])
0   0.0
1   1.0
2   2.0
...
95   NaN
96  96.0
97   NaN
98   NaN
99  99.0
Name: y, Length: 100, dtype: float64
  1. 自定义错误处理 (on_error=print)

你可以使用自定义函数 (或协程) 记录或处理错误:

df['y'] = await df.x.amap(f, max_parallel=100, on_error=print) # 打印错误而不是失败

输出:

exception example
exception example
exception example

📊 使用 tqdm 跟踪进度

要可视化进度,请将 tqdm 作为参数传递:

from tqdm import tqdm
df['y'] = await df.x.amap(f, max_parallel=5, tqdm=tqdm)

示例输出:

 69%|█████████████████████████████████████████████████████        | 69/100 [00:06<00:03, 9.99it/s]

🎯 为什么使用 aiopandas?

📦 安装

pip install aiopandas

或者,如果手动使用:

git clone https://github.com/your-repo/aiopandas.git
cd aiopandas
pip install .

💡 贡献

欢迎提交 Pull Request! 如果你发现问题或有建议,请随时提出 issue。 🚀

🙌 鸣谢

aiopandas 中的 monkey patching 在很大程度上受到 tqdm.pandas() 方法的启发(基本上是复制粘贴的)并进行了调整。 特别感谢 tqdm 维护人员在将进度条与 Pandas 集成方面所做的出色工作。