AI AgentTechnical Deep Dive
Token 与成本优化:省钱技巧大全
发布时间2025/12/27
分类AI Agent
预计阅读8 分钟
作者吴长龙
*
LLM API 调用的成本主要取决于 Token 消耗。本文系统介绍 Token 的概念、计费方式以及从 Prompt 设计到架构层面的全面优化策略。
01.内容
# Token 与成本优化:省钱技巧大全
对于生产环境的 AI Agent 来说,API 成本是一个不可忽视的问题。一个每天处理上万次请求的 Agent,如果每个请求能节省 50% 的 Token,一年下来节省的成本可能是数万甚至数十万。
本文将从 Token 的基本概念出发,系统介绍成本优化的全面策略。
02.1. 理解 Token
1.1 什么是 Token?
Token 是 LLM 处理文本的基本单位。对于英文,1 Token ≈ 0.75 个单词;对于中文,1 Token ≈ 1-2 个字符。
python snippetpython
# 示例
"Hello, world!" → ["Hello", ", world", "!"] → 3 tokens
"你好世界" → ["你好", "世界"] → 2 tokens1.2 Token 如何计费?
API 费用通常按输入 Token 和输出 Token 分别计费:
code snippetcode
总费用 = 输入Token数 × 输入单价 + 输出Token数 × 输出单价不同模型的价格差异巨大:
| 模型 | 输入价格 | 输出价格 |
|---|---|---|
| GPT-4o | $2.50/1M | $10.00/1M |
| GPT-4o mini | $0.15/1M | $0.60/1M |
| Claude 4 Sonnet | $3.00/1M | $15.00/1M |
| Gemini 2.0 Flash | 免费 | 免费 |
1.3 Token 数量估算
python snippetpython
import tiktoken
def count_tokens(text, model="gpt-4"):
encoder = tiktoken.encoding_for_model(model)
return len(encoder.encode(text))
# 示例
text = "这是一个测试句子"
print(count_tokens(text)) # 约 6-8 tokens03.2. Prompt 层面的优化
2.1 精简 Prompt
原则:只传递必要信息
python snippetpython
# ❌ 冗余
prompt = """
你是一个专业的 AI 助手。我需要你帮我处理一些任务。
请你认真阅读以下内容,仔细思考,然后给出准确的回答。
谢谢!
任务:总结以下文章:
{article}
"""
# ✅ 精简
prompt = "总结以下文章:\n{article}"2.2 压缩示例(Few-shot)
python snippetpython
# ❌ 冗余示例
examples = [
{
"input": "用户:你好",
"output": "客服:您好!有什么可以帮助您的?"
},
{
"input": "用户:我想查询订单",
"output": "客服:好的,请问您的订单号是多少?"
},
{
"input": "用户:我的订单还没到",
"output": "客服:抱歉给您带来不便,我帮您查一下物流信息。"
}
]
# ✅ 精简示例(只保留最能体现格式的1个)
examples = [
{
"input": "用户:你好",
"output": "客服:您好!有什么可以帮助您的?"
}
]2.3 使用参考而非重复
python snippetpython
# ❌ 每次都传完整规则
prompt = """
规则:
1. 用户问候要回复问候
2. 用户询问价格要告知价格
3. 用户投诉要道歉并处理
...
用户:你们产品多少钱?
"""
# ✅ 用引用代替
prompt = """
参考规则文档(见附件):rule_v2.pdf
用户:你们产品多少钱?
"""04.3. 上下文管理优化
3.1 选择性保留上下文
python snippetpython
def selective_context(messages, max_tokens=8000):
"""只保留最近的关键消息"""
encoded = tokenizer.encode(messages)
if len(encoded) <= max_tokens:
return messages
# 保留系统提示 + 最近消息
system = messages[0] # 系统提示
recent = messages[-max_tokens_tokens:] # 最近的消息
return [system] + recent3.2 摘要压缩
python snippetpython
def compress_with_summary(conversation, summary_model):
"""定期压缩对话历史"""
if len(conversation) > 10:
# 用小模型生成摘要
summary = summary_model.invoke(
f"简要总结以下对话要点:{conversation}"
)
return [{"role": "system", "content": "之前对话摘要:" + summary}]
return conversation3.3 滑动窗口
python snippetpython
class SlidingWindowBuffer:
def __init__(self, max_tokens=4000):
self.max_tokens = max_tokens
self.messages = []
def add(self, message):
self.messages.append(message)
self._trim()
def _trim(self):
while self.total_tokens() > self.max_tokens:
# 移除最旧的消息(保留系统提示)
if len(self.messages) > 1:
self.messages.pop(1)
def total_tokens(self):
return sum(count_tokens(m["content"]) for m in self.messages)05.4. 缓存策略
4.1 相同请求缓存
python snippetpython
import hashlib
import json
from redis import Redis
class LLMCache:
def __init__(self, redis_url=None, ttl=3600):
self.cache = Redis.from_url(redis_url) if redis_url else {}
self.ttl = ttl
def _hash(self, prompt):
return hashlib.sha256(prompt.encode()).hexdigest()
def get(self, prompt):
key = self._hash(prompt)
return self.cache.get(key)
def set(self, prompt, response):
key = self._hash(prompt)
self.cache.setex(key, self.ttl, response)
# 使用
cache = LLMCache(redis_url="redis://localhost:6379")
def cached_completion(prompt):
# 先检查缓存
cached = cache.get(prompt)
if cached:
return cached
# 没有缓存,调用 API
response = llm.invoke(prompt)
# 存入缓存
cache.set(prompt, response)
return response4.2 向量缓存
python snippetpython
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
class SemanticCache:
def __init__(self, threshold=0.9):
self.embeddings = OpenAIEmbeddings()
self.vectorstore = Chroma("semantic_cache", self.embeddings)
self.threshold = threshold
def get(self, prompt):
prompt_embedding = self.embeddings.embed_query(prompt)
# 搜索相似缓存
results = self.vectorstore.similarity_search_by_vector(
prompt_embedding, k=1
)
if results and results[0].metadata.get("score", 0) > self.threshold:
return results[0].metadata["response"]
return None
def set(self, prompt, response, score):
# 存入向量数据库
self.vectorstore.add_texts(
texts=[prompt],
metadatas=[{"response": response, "score": score}]
)4.3 分层缓存
python snippetpython
class TieredCache:
"""本地内存 + Redis + 数据库 多层缓存"""
def __init__(self):
self.memory = {} # L1: 内存,最快
self.redis = Redis() # L2: Redis
self.db = Database() # L3: 数据库,最慢
def get(self, key):
# L1 检查
if key in self.memory:
return self.memory[key]
# L2 检查
val = self.redis.get(key)
if val:
self.memory[key] = val
return val
# L3 检查
val = self.db.query(key)
if val:
self.redis.setex(key, 3600, val)
self.memory[key] = val
return val
return None06.5. 模型路由优化
5.1 简单任务用小模型
python snippetpython
def route_task(task):
"""根据任务复杂度选择模型"""
# 简单任务:小模型即可
simple_patterns = [
"翻译",
"格式转换",
"简单问答",
"问候"
]
if any(p in task for p in simple_patterns):
return "gpt-4o-mini"
# 中等任务:标准模型
medium_patterns = [
"写代码",
"分析",
"总结"
]
if any(p in task for p in medium_patterns):
return "gpt-4o"
# 复杂任务:大模型
return "gpt-4"
# 使用
model_name = route_task(user_input)
response = llm(model_name).invoke(prompt)5.2 问题分类路由
python snippetpython
from sklearn.linear_model import LogisticRegression
import numpy as np
class TaskClassifier:
def __init__(self):
# 训练好的分类器
self.classifier = LogisticRegression()
# 特征提取器
self.vectorizer = TfidfVectorizer(max_features=100)
def classify(self, text):
features = self.vectorizer.transform([text])
return self.classifier.predict(features)[0]
def get_model(self, task_type):
routing = {
"translation": "gpt-4o-mini",
"coding": "gpt-4o",
"reasoning": "claude-4-sonnet",
"creative": "gpt-4o",
"analysis": "claude-4-opus",
}
return routing.get(task_type, "gpt-4o")
# 使用
classifier = TaskClassifier()
task_type = classifier.classify(user_input)
model = classifier.get_model(task_type)07.6. 输出优化
6.1 限制输出长度
python snippetpython
# 在 Prompt 中限制
prompt = """
用不超过 100 字回答以下问题:
{question}
"""
# 或用参数限制
response = llm.invoke(
prompt,
max_tokens=100 # 限制输出最多 100 tokens
)6.2 流式输出
python snippetpython
# 不用等完整响应,边生成边处理
collected = ""
for chunk in llm.stream(prompt):
collected += chunk.content
# 实时处理(如打字机效果、进度显示)
print(chunk.content, end="", flush=True)6.3 结构化输出
python snippetpython
# 用 JSON 模式限制输出格式
response = llm.invoke(
prompt,
response_format={"type": "json_object"},
# 或用 Function Calling 限制
tools=[{
"type": "function",
"function": {
"name": "extract_info",
"parameters": {
"type": "object",
"properties": {
"sentiment": {"type": "string", "enum": ["positive", "negative", "neutral"]},
"confidence": {"type": "number"}
}
}
}
}]
)08.7. 架构层面优化
7.1 批量处理
python snippetpython
# ❌ 逐个处理
results = []
for item in items:
results.append(llm.invoke(process(item)))
# ✅ 批量处理
batch_prompt = "处理以下列表:\n" + "\n".join(items)
results = llm.invoke(batch_prompt)
# 解析结果7.2 异步调用
python snippetpython
import asyncio
async def async_invoke(prompt):
return await llm.ainvoke(prompt)
async def main():
# 并发调用
tasks = [async_invoke(p) for p in prompts]
results = await asyncio.gather(*tasks)
return results7.3 请求合并
python snippetpython
class RequestBatcher:
"""合并相似请求,减少 API 调用"""
def __init__(self, batch_size=10, wait_time=0.1):
self.batch_size = batch_size
self.wait_time = wait_time
self.pending = []
async def add(self, prompt):
self.pending.append(prompt)
if len(self.pending) >= self.batch_size:
return await self._process_batch()
# 等待一段时间,合并更多请求
await asyncio.sleep(self.wait_time)
if self.pending:
return await self._process_batch()
async def _process_batch(self):
# 批量调用
combined = "\n---\n".join(self.pending)
response = await llm.ainvoke(combined)
# 拆分结果
results = response.split("\n---\n")
self.pending = []
return results09.8. 成本监控
8.1 实时监控
python snippetpython
class CostTracker:
def __init__(self):
self.total_input = 0
self.total_output = 0
self.costs = []
def track(self, model, input_tokens, output_tokens):
prices = {
"gpt-4o": (2.5, 10.0),
"gpt-4o-mini": (0.15, 0.6),
"claude-4-sonnet": (3.0, 15.0),
}
input_price, output_price = prices.get(model, (0, 0))
input_cost = input_tokens * input_price / 1_000_000
output_cost = output_tokens * output_price / 1_000_000
total = input_cost + output_cost
self.total_input += input_tokens
self.total_output += output_tokens
self.costs.append(total)
return {
"input_cost": input_cost,
"output_cost": output_cost,
"total_cost": total,
"cumulative": sum(self.costs)
}
def get_report(self):
return {
"total_input_tokens": self.total_input,
"total_output_tokens:": self.total_output,
"total_cost": sum(self.costs),
"avg_cost_per_call": sum(self.costs) / len(self.costs) if self.costs else 0
}8.2 成本告警
python snippetpython
class CostAlert:
def __init__(self, threshold=100):
self.threshold = threshold
self.daily_cost = 0
def check(self, cost):
self.daily_cost += cost
if self.daily_cost > self.threshold:
send_alert(
f"今日 API 成本 ${self.daily_cost:.2f} 超过阈值 ${self.threshold}"
)10.9. 总结
| 优化手段 | 效果 | 难度 | 适用场景 |
|---|---|---|---|
| 精简 Prompt | 20-40% | 低 | 所有场景 |
| 缓存 | 30-70% | 中 | 重复性任务 |
| 小模型路由 | 50-80% | 中 | 任务可分类 |
| 上下文压缩 | 30-50% | 中 | 长对话 |
| 批量处理 | 20-30% | 低 | 离线任务 |
| 输出限制 | 20-50% | 低 | 可预测输出 |
最佳实践组合:
- •基础优化:精简 Prompt + 输出限制
- •中级优化:缓存 + 模型路由
- •高级优化:上下文压缩 + 批量处理 + 成本监控
通过系统性的优化,通常可以将 API 成本降低 50-80%,同时不影响服务质量。下一篇文章我们将进入实战,用 LangGraph 搭建你的第一个 Agent。