Agent ETL:先把数据契约、质量校验和回填审批串起来,再谈智能管道
ETL 场景最容易被写成一个会自动发现异常、自动补转换逻辑和自动重跑任务的智能数据管道,但真实落地首先取决于数据契约、质量规则和写入边界是否清楚。
01.ETL 最容易被误写成“自愈型智能数据管道”
很多关于 ETL Agent 的文章,一上来就会讲:
- •自动发现数据质量问题
- •自动生成转换逻辑
- •自动修复 schema drift
- •自动重跑失败任务
这些方向听起来很美好,但真实数据工程里更难的通常不是“多写几段转换代码”,而是:
- •数据源契约到底是什么
- •哪些异常只是数据波动,哪些会影响下游指标和财务口径
- •回填、重算和覆盖写入的影响范围有没有被评估
- •谁对生产数据的最终写入结果负责
所以 ETL Agent 更适合先做“数据链路与修复协作者”,而不是直接替代调度系统和数据 owner 的自愈型管道中枢。
02.先从排查和评审切入,而不是一上来碰自动修复
如果第一版就想覆盖自动建表、自动修 schema、自动 backfill 和自动重写转换逻辑,项目很快就会在口径、回滚和责任边界上失控。
更现实的切入点通常是:
- •schema drift 分诊
- •数据质量告警解释
- •backfill 影响分析
- •失败任务排查摘要
以“数据质量评审助手”为例,一个可交付的第一版通常已经很有价值:
- •汇总失败任务、样本数据、上游 schema 和下游影响
- •判断当前更像 schema 漂移、空值异常还是业务规则变化
- •检索数据契约、字段字典和历史处理记录
- •生成修复计划草稿和待确认项
- •不直接改生产 SQL 或对表做覆盖写入
这类链路比“让 Agent 自己修好整条管道”更符合真实数据工程治理。
03.一条稳定的 ETL 链路,通常由四层组成
1. 数据契约与元数据层
这一层决定系统有没有资格理解当前问题:
- •源表 schema
- •字段语义和数据字典
- •数据更新频率
- •分区策略和主键
- •上下游依赖关系
没有这层元数据,后面的异常解释基本都只能靠猜。
2. 质量与血缘层
ETL 的复杂性,大量来自“异常到底会影响哪里”:
- •空值率和分布漂移
- •去重、唯一性和主外键约束
- •行数突变
- •下游指标和看板的影响范围
Agent 在这里更适合做证据整理和影响说明,而不是直接判定“可以忽略”。
3. 转换与修复计划层
这一层最适合模型和受限代码工具结合:
- •对样本数据做临时剖析
- •生成字段映射和修复建议草稿
- •解释 schema drift 更可能来自哪里
- •生成 backfill、replay 和补数据方案说明
4. 写入、回填与审批层
下面这些动作通常都不适合让 Agent 直接拍板:
- •修改生产转换逻辑
- •回填历史分区
- •覆盖写入事实表
- •修正财务或核心指标数据
这些动作必须保留数据 owner、平台规则和人工审批链。
04.模型负责诊断与说明,系统负责执行与回滚
在 ETL 场景里,一个更稳的职责拆分通常是:
更适合模型处理的部分
- •总结失败任务和异常样本
- •解释 schema drift 和字段映射问题
- •生成修复计划和 backfill 说明草稿
- •整理下游影响和待确认风险
更适合系统处理的部分
- •维护数据契约和元数据
- •执行调度、重跑和回填
- •控制写权限、分区覆盖和回滚
- •记录正式审计日志和数据版本
如果让模型同时负责“诊断 + 改 SQL + 跑回填 + 判定成功”,最终最容易出问题的是数据正确性和可追责性。
05.先让 Agent 输出结构化 ETL 计划,再决定要不要进入生产变更
更稳的方式,是先把 ETL 任务收敛成结构化计划。
from typing import Literal
from pydantic import BaseModel, Field
class EtlPlan(BaseModel):
lane: Literal["schema_drift", "quality_triage", "backfill_review", "handoff"]
dataset: str
partitions: list[str] = Field(default_factory=list)
suspected_causes: list[str] = Field(default_factory=list)
required_owners: list[str] = Field(default_factory=list)
requires_data_owner_approval: bool = True
def run_etl_workflow(plan: EtlPlan, tools):
metadata = tools.load_dataset_metadata(plan.dataset)
lineage = tools.load_lineage(plan.dataset)
draft = tools.generate_etl_summary(plan.lane, metadata, lineage, plan.partitions)
checks = tools.validate_write_guardrails(draft, metadata, plan.required_owners)
if plan.requires_data_owner_approval or checks.has_blockers:
return tools.route_to_data_owner(draft=draft, checks=checks)
return tools.prepare_low_risk_followup(draft=draft, checks=checks)这个模式的价值在于:
- •先区分当前是在处理 schema drift、质量告警还是 backfill 评审
- •分区、疑似原因和 owner 可以显式记录
- •高风险写入默认进入人工审批
06.文件检索和受限代码工具,很适合 ETL 的排查阶段
数据工程团队经常会积累大量规则性资料:
- •字段字典
- •数据契约
- •调度说明
- •历史故障复盘
- •示例 CSV 和抽样文件
文件检索和受限代码工具很适合用于:
- •样本数据剖析
- •字段映射建议
- •质量异常说明
- •回填影响摘要
但这些能力不等于 Agent 可以直接拥有生产写权限。
07.评估不要只看“是不是更聪明”,还要看错误修复率
ETL Agent 的评估,更应该围绕这些问题:
- •异常归因是否接近真实原因
- •修复计划是否忠于数据契约和血缘关系
- •人工 review 最常补充哪些信息
- •是否出现错误回填或错误忽略异常
- •故障排查时间是否明显缩短
这些指标比“AI 写的转换代码多快”更能反映真实价值。
08.三个常见误区
1. 让模型直接改生产转换逻辑
没有数据 owner 审批和回滚准备的自动修复,往往会把一个故障扩大成更大的数据事故。
2. 不看血缘和下游影响,只盯单表异常
ETL 的很多风险不在异常本身,而在它会把什么下游一起带坏。
3. 把一次性样本分析和生产规则混为一谈
临时探索适合用代码容器,稳定生产逻辑则必须回到受控管道和版本管理。
09.总结
ETL Agent 真正可交付的价值,不是做一个“自动修好所有数据问题”的黑盒,而是把原本散落在元数据、质量告警、修复讨论和回填评审里的工作整理成更稳的数据链路:
- •先把契约、血缘和异常事实拉齐
- •让模型承担归因、说明和修复草稿
- •把生产写入、回填和重跑留给平台与人工审批
- •用错误修复率和回滚风险,而不是口号来评估系统
只要把这些边界守住,Agent 就能真正帮助数据工程团队减少排查成本,而不是制造新的数据事故。