0. 写在前面
这不是一篇“我用了什么技术栈”的流水账。这篇文章的目标是:
我将从最底层的字节解析、数据结构反序列化、异构链的归一化处理、以及 AI 工程化落地四个维度,拆解这套系统真正的技术骨架。如果你对链上数据的原始形态没有概念,建议先补课再来。
全文约 10000 字,请预留充足的阅读时间。
web3
第一部分:EVM 监控——从 RPC 字节流到结构化事件的炼狱
EVM 链的监控看似成熟,市面上有无数“监听某地址转账”的脚本。但要做到零漏报、低延迟、高容错、可持久化,需要直面以下几个地狱级难题。我将逐一拆解每个难题的底层实现。
1.1 手动 ABI 解码:为什么不用现成库?
市面上有web3.py等成熟的库可以解析事件,但我选择手写 ABI 解码。这不是为了炫技,而是基于两个非常现实的工程考量:
- 体积与依赖控制:
web3.py及其依赖链会为打包后的 EXE 增加约 15-20MB 的体积。对于一个追求极致轻量的桌面软件,这是不可接受的代价。 - 降级控制力:我需要精确控制解码失败时的降级逻辑。使用第三方库时,一旦抛出异常,整个解析流程就会中断;而手写解码可以让我在单个字段解析失败时,继续尝试提取其他可用信息。
核心的 ERC20Transfer事件解析逻辑如下:
# 主题哈希——ERC20 Transfer 事件的 keccak256 签名 topic_transfer = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef" if log["topics"][0] == topic_transfer: # 地址编码在 topic 中:前 12 字节是填充的 0,后 20 字节才是真实地址 sender = "0x" + log["topics"][1][-40:] # 从第 26 个字符开始截取 receiver = "0x" + log["topics"][2][-40:] # 数量编码在 data 字段中,是 uint256 类型,直接按 16 进制解析 raw_amount = int(log.get("data", "0x0"), 16)这看起来简单,但真正的噩梦是ERC1155 的TransferBatch事件——它是整个 EVM 监控模块中解析复杂度最高的部分。
1.2 地狱级:ERC1155 批量转账的 ABI 解码
ERC1155 与 ERC20/721 的最大区别在于:它支持批量转账。一笔交易可以同时转移多种不同 ID 的代币,每种代币的数量也可以不同。这种灵活性在 ABI 编码层面表现为动态数组的嵌套。
TransferBatch事件的data字段结构如下(ABI 编码规范):
| 偏移量 | 内容 | 说明 |
|---|---|---|
| 0x00 - 0x1F | ids数组的偏移量指针 | 指向ids数组实际数据的起始位置 |
| 0x20 - 0x3F | values数组的偏移量指针 | 指向values数组实际数据的起始位置 |
| ... | ... | 后续是实际数据区 |
解析这个结构需要手动管理所有偏移量,任何字节对齐错误都会导致完全错误的结果。我的解析逻辑如下:
# 去掉 0x 前缀,转换为字节数组 data = bytes.fromhex(log["data"][2:]) # 1. 读取两个数组的偏移量(各占 32 字节) ids_offset = int.from_bytes(data[0:32], 'big') # ids 数组起始位置 values_offset = int.from_bytes(data[32:64], 'big') # values 数组起始位置 # 2. 读取 ids 数组长度(在 ids_offset 位置,占 32 字节) ids_len = int.from_bytes(data[ids_offset:ids_offset+32], 'big') # 3. 逐个解析 ids(每个 id 是 uint256,占 32 字节) ids = [] for i in range(ids_len): start = ids_offset + 32 + i * 32 token_id = int.from_bytes(data[start:start+32], 'big') ids.append(token_id) # 4. 同理解析 values 数组 values_len = int.from_bytes(data[values_offset:values_offset+32], 'big') values = [] for i in range(values_len): start = values_offset + 32 + i * 32 value = int.from_bytes(data[start:start+32], 'big') values.append(value) # 5. 理论上 ids_len == values_len,但实际中可能存在不匹配的畸形交易 # 我的处理是:只处理较短的那个长度,保证不越界 process_len = min(ids_len, values_len) for i in range(process_len): # 对每一对 (token_id, value) 分别生成一条通知 await send_notification(token_id=ids[i], amount=values[i])这里有一个容易被忽略的细节:ABI 编码中的偏移量是从 data 字段的起始位置(即0x后的第一个字节)开始计算的,而不是从整个log对象的起始位置。很多初学者的解析错误都源于这个细节。
此外,TransferBatch可能在一笔交易中包含数十个不同的 NFT。如果我直接在主循环中逐个处理,会导致推送延迟和前端日志堆积。我的优化是:将批量解析结果拆分为独立的通知,但通过asyncio.gather并发推送,将总耗时压缩到与单笔转账相近的水平。
1.3 ERC1155 的TransferSingle:看似简单实则暗藏杀机
ERC1155 还有另一个事件TransferSingle,它只转移单种代币,结构相对简单。但它有一个极易被忽略的坑:value字段的位置与TransferBatch不同。
TransferSingle的data编码为:token_id(32 字节)+value(32 字节),紧凑排列,没有偏移量指针。这要求我必须根据topic[0]的不同,走完全不同的解析分支。很多监控脚本只处理TransferBatch而忽略了TransferSingle,导致漏报。
1.4 代币元数据:eth_call的深坑与填坑指南
获取代币的符号(symbol)和精度(decimals)是推送可读信息的前提。标准方法是调用代币合约的symbol()和decimals()函数。但现实世界中的代币合约千奇百怪:
symbol()返回bytes32而非string:标准规定返回string,但早期很多代币(包括一些知名项目)返回的是固定 32 字节的bytes32。直接按字符串解码会得到一堆补零的乱码,例如"USDC\x00\x00\x00\x00\x00..."。decimals()返回异常值:某些代币的精度是 0,但返回的是空数据;有些代币甚至没有实现这个函数。
我的处理逻辑是多级尝试:
# 调用 symbol() result = await rpc_call({"to": token, "data": "0x95d89b41"}) if result and len(result) > 66: # 尝试按 bytes32 解析 data = result[130:] # 去掉函数签名和偏移量 symbol = bytes.fromhex(data).decode('utf-8').strip('\x00') # 如果解析出来的全是乱码,再尝试按 string 解析 if not symbol or not symbol.isprintable(): # 回退到 string 解码逻辑...这类“边角料”问题,每一个都需要单独编写处理逻辑。它们单个看起来微不足道,但累积起来占据了开发时间的 30% 以上,也是区分“玩具脚本”和“生产级系统”的关键分水岭。
1.5 WSS 连接的高可用设计:节点池与指数退避
公共 RPC/WSS 节点的稳定性众所周知地差。我的系统为每条链配置了多个节点,并实现了一套完整的容灾逻辑:
nodes = config["ETHEREUM_WSS_NODES"] # 至少配置 3 个 current_index = 0 retry_delay = 5 # 初始重试间隔 5 秒 while True: current_url = nodes[current_index] try: async with websockets.connect(current_url) as ws: # 订阅逻辑... retry_delay = 5 # 连接成功后重置 except Exception as e: # 切换到下一个节点(环形轮换) current_index = (current_index + 1) % len(nodes) # 指数退避,但上限 60 秒 await asyncio.sleep(retry_delay) retry_delay = min(retry_delay * 2, 60)这套机制的关键在于:
- 节点池:至少配置 3 个来源不同的节点(官方、社区、付费),避免单点故障。
- 指数退避:避免在节点恢复前对其造成重连风暴。
- 环形轮换:不会因为第一个节点永久故障而卡死。
1.6 原生币的特殊处理:轮询补偿
如前所述,原生币没有事件日志。我的方案是:每 60 秒调用eth_getBalance,与内存快照对比:
current = await rpc_call("eth_getBalance", [addr, "latest"]) if abs(current - last_balance[addr]) > 1e-18: # 考虑浮点误差 change = current - last_balance[addr] await send_notification("收到原生币" if change > 0 else "转出原生币", amount=abs(change)) last_balance[addr] = current这里有一个细节:轮询间隔内可能发生多笔小额转账,余额变化只能体现净额。对于大多数监控场景(关注大额异动),这已经足够;如果未来需要更精确的追踪,可以结合newHeads订阅逐区块扫描。
第二部分:Solana 监控——在数据洪流中精准捕猎
Solana 的监控难度与 EVM 完全不同。它的交易结构是异构指令集合,且原生 RPC 能力极弱。以下是我在 Solana 端踩过的坑和对应的解决方案。
2.1 交易数据的“俄罗斯套娃”:多层级解析
一笔 Solana 交易经过 Helius API 增强后,可能同时包含以下数据层:
nativeTransfers:SOL 转账数组,每个元素包含from、to、amount(lamports)。tokenTransfers:SPL 代币转账数组,包含from、to、mint、tokenAmount。events.nft:NFT 买卖事件,包含seller、buyer、mint、price。accountData:所有涉及账户的状态变更(包括 lamports 余额变化)。
我的解析流水线必须按优先级逐层剥开:
# 第一优先级:原生 SOL 转账(最准确) if tx.get('nativeTransfers'): for transfer in tx['nativeTransfers']: if transfer['to'] in watch_addrs: await handle_sol_received(transfer) elif transfer['from'] in watch_addrs: await handle_sol_sent(transfer) # 第二优先级:SPL 代币转账 elif tx.get('tokenTransfers'): for transfer in tx['tokenTransfers']: # 需要额外调用 getAsset 获取代币符号和价格 ... # 第三优先级:NFT 事件 elif tx.get('events', {}).get('nft'): for nft_event in tx['events']['nft']: ... # 第四优先级:降级到余额变化反推(最后防线) elif tx.get('preBalances') and tx.get('postBalances'): await parse_balance_changes(tx)这种优先级设计的原因是:nativeTransfers是 Helius 直接解析好的结构化数据,最准确;余额变化反推是原始数据,误差最大,仅作兜底。
2.2 降级兜底:余额变化反推算法的完整实现
当 Helius 返回的数据不包含nativeTransfers时(可能是 API 版本问题或该交易确实没有 SOL 转账),我需要用最原始的方法——比对preBalances和postBalances。
算法核心逻辑:
def parse_balance_changes(tx): pre = tx['preBalances'] post = tx['postBalances'] accounts = tx['transaction']['message']['accountKeys'] changes = [] for i, (pre_bal, post_bal) in enumerate(zip(pre, post)): change = post_bal - pre_bal if change != 0: changes.append({ 'account': accounts[i], 'change_lamports': change, 'change_sol': change / 1e9 }) # 寻找与监控地址配对的对手方 for watch_addr in watch_addrs: watch_change = next((c for c in changes if c['account'] == watch_addr), None) if watch_change: # 寻找变化符号相反的账户作为对手方 counterparty = next( (c['account'] for c in changes if c['account'] != watch_addr and c['change_lamports'] * watch_change['change_lamports'] < 0), None ) yield watch_addr, watch_change, counterparty这个算法的局限性在于:
- 当交易涉及多个账户同时转入转出时(如 DEX 聚合交易),对手方的推断可能不准确。
- 它无法区分 SOL 转账和合约交互中的 lamports 变化(如 rent 支付)。
但作为最后一道防线,它至少保证了不会漏掉大额转账。在实际运行中,nativeTransfers的覆盖率超过 99%,余额反推仅在极少数边缘情况下被触发。
2.3 SPL 代币的元数据迷宫
Solana 上的代币信息完全依赖 Helius 的getAssetRPC 调用。但这个接口的返回数据并不总是完整:
- 某些新兴代币的
symbol字段可能为空。 price_info字段不是所有代币都有(需要 Jupiter 等聚合器提供喂价)。
我的策略是多级降级:
# 1. 调用 Helius getAsset asset = await helius_get_asset(mint) symbol = asset.get('content', {}).get('metadata', {}).get('symbol', 'UNKNOWN') # 2. 获取价格——优先使用 Helius 自带 price_info = asset.get('token_info', {}).get('price_info') if price_info: price = price_info.get('price_per_token', 0) else: # 3. 降级到 DexScreener API price = await get_dex_price(mint)DexScreener 的降级逻辑也很关键:它返回的是该代币在所有 DEX 上的交易对,我需要筛选出流动性最大的那个作为参考价格。
2.4 滑动窗口去重的工程实现
Solana 交易没有log_index概念,且同一笔交易可能因 Helius 解析产生多条推送。我的去重方案是一个有限容量的内存滑动窗口:
processed_sigs = set() MAX_SIGS = 1000 def is_duplicate(sig): if sig in processed_sigs: return True processed_sigs.add(sig) # 超过容量时,截断保留最近 500 条 if len(processed_sigs) > MAX_SIGS: processed_sigs = set(list(processed_sigs)[-500:]) return False这个方案在内存占用和去重准确性之间取得了平衡。有人可能会问:为什么不用 Redis 或 SQLite 做持久化去重?因为对于实时监控系统,重启后重新处理少量重复是可接受的,没必要引入外部依赖。
2.5 异步日志写入队列:避免 I/O 阻塞
Solana 的详细交易日志需要写入 JSON 文件供前端查询。如果直接在解析协程中写文件,高并发下会导致 I/O 阻塞和文件损坏。我的方案是生产者-消费者队列:
_write_queue = asyncio.Queue() async def file_writer(): while True: path, data = await _write_queue.get() # 读取已有数据 existing = [] if os.path.exists(path): with open(path, 'r') as f: existing = json.load(f) existing.append(data) # 保留最近 1000 条 if len(existing) > 1000: existing = existing[-1000:] with open(path, 'w') as f: json.dump(existing, f) # 在解析协程中 await _write_queue.put((log_path, transaction_data))一个专门的协程负责所有文件 I/O,彻底杜绝了并发写问题。
第三部分:AI 解读——从“玩具”到“生产级”的工程鸿沟
调用 AI API 是容易的,任何一个初学者都能在 10 分钟内写出调用 OpenAI 的代码。但要让 AI 输出稳定、可解析、对用户有价值的内容,中间隔着巨大的工程鸿沟。以下是我在 AI 模块中的核心设计。
3.1 多提供商特征调度
不同 AI 提供商的能力侧重点不同:
- DeepSeek:代码能力强,适合结构化输出,但不支持联网。
- Moonshot:支持内置
$web_search,适合需要实时信息的项目解读。 - Gemma:Google 出品,作为备用兜底。
我的调度器基于特征标志而非简单的轮询:
class MultiAIClient: def __init__(self): self.providers = [ {'name': 'deepseek', 'features': {'transaction_insight': True, 'daily_report': True}}, {'name': 'moonshot', 'features': {'transaction_insight': True, 'web_search': True}}, {'name': 'gemma', 'features': {'transaction_insight': True}}, ] async def get_insight(self, prompt, required_feature=None): # 筛选出支持所需特征的提供商 candidates = [p for p in self.providers if (not required_feature or p['features'].get(required_feature))] for p in candidates: result = await self._call_provider(p, prompt) if result: return result, p['name'] return None, None当用户在前端选择“联网搜索”时,系统自动跳过 DeepSeek,优先调用 Moonshot;如果 Moonshot 超时,则降级到 Gemma(此时联网功能丢失,但至少能给出基础解读)。
3.2 防御性 JSON 解析的三层护甲
大模型返回 JSON 的稳定性堪比“抽盲盒”。我设计了三层解析护甲,确保即使 AI 返回了“半成品垃圾”,前端依然能展示有效信息。
第一层:标准 JSON 解析
try: return json.loads(response) except json.JSONDecodeError: pass第二层:清洗 Markdown 代码块标记
这是最常见的污染形式——AI 会把 JSON 包裹在json ...中。
cleaned = response.strip() if cleaned.startswith('```json'): cleaned = cleaned[7:] if cleaned.endswith('```'): cleaned = cleaned[:-3] cleaned = cleaned.strip() try: return json.loads(cleaned) except: pass第三层:正则暴力提取
如果前两层都失败,说明 AI 返回的可能是纯文本或格式完全损坏的 JSON。此时我用正则直接提取关键字段:
import re insight_match = re.search(r'"insight"\s*:\s*"([^"]*)"', response, re.DOTALL) risk_match = re.search(r'"risk"\s*:\s*"([^"]*)"', response) risk_reason_match = re.search(r'"risk_reason"\s*:\s*"([^"]*)"', response) return { "insight": insight_match.group(1) if insight_match else response[:200], "risk": risk_match.group(1) if risk_match else "未知", "risk_reason": risk_reason_match.group(1) if risk_reason_match else "" }这三层护甲确保了:AI 返回什么垃圾,前端都能展示出可读的内容,而不是白屏报错让用户一脸茫然。
3.3 联网搜索的工具调用循环(Moonshot 专用)
Moonshot 的$web_search是一个内置工具,但 OpenAI SDK 不会自动处理多轮工具调用。我需要手动实现循环:
messages = [{"role": "user", "content": prompt}] tools = [{"type": "builtin_function", "function": {"name": "$web_search"}}] max_iterations = 3 for _ in range(max_iterations): response = await client.chat.completions.create( model="kimi-k2.5", messages=messages, tools=tools ) choice = response.choices[0] if choice.finish_reason == "tool_calls": # 将助手消息加入历史 messages.append(choice.message) # 为每个工具调用构造 tool 响应 for tool_call in choice.message.tool_calls: if tool_call.function.name == "$web_search": messages.append({ "role": "tool", "tool_call_id": tool_call.id, "name": tool_call.function.name, "content": "搜索已完成" # Moonshot 服务端已执行搜索,客户端只需回传确认 }) else: # 最终回复 return choice.message.content return "工具调用超过最大迭代次数"这个循环的关键在于:服务端已经执行了搜索,客户端只需要按协议回传 tool 消息。许多开发者误以为需要在客户端执行搜索,导致实现复杂且容易出错。
3.4 提示词工程的防御性设计
AI 解读的提示词不是我随意写的,而是经过多次迭代的“防御性设计”:
请严格按照以下JSON格式返回,键名必须为英文: { "insight": "详细解读内容(可包含换行)", "risk": "低/中/高", "risk_reason": "风险原因描述", "suggestion": "操作建议(可选)" }为什么要强调“键名必须为英文”和“只返回 JSON”?因为 AI 在面对中文用户时,有时会“自作聪明”地用中文键名(如{"解读": "...", "风险": "高"}),导致前端解析失败。明确约束后,成功率从约 70% 提升到 95% 以上。
4. 结语:为什么我认为这套系统“很难复现”?
不是因为用了多么高深的算法——事实上,本文拆解的每一个技术点,单独拿出来都不算“黑科技”。真正的难度在于:
- 异构链的适配成本:EVM 和 Solana 的底层差异巨大,你必须同时精通两套技术栈。市场上能同时驾驭两条链的开发者本就稀缺。
- 边缘情况的处理密度:ERC1155 的批量解析、Solana 的降级反推、代币元数据的异常返回值、WSS 静默断线的检测……每一个“边角料”问题都需要单独编写处理逻辑。这些代码占据了整个项目 60% 以上的行数,但在功能演示中完全看不见。
- AI 的工程化落地:把 AI 从“玩具”变成“可靠的生产力工具”,需要大量的防御性设计——多提供商容错、JSON 多层解析、工具调用循环、提示词迭代。这些工作没有现成教程,全靠踩坑积累。
- 系统的长期稳定性:节点切换、指数退避、异步队列、WAL 模式……这些运维层面的设计,是系统能 7×24 小时无人值守运行的基石。
如果你读完这篇文章觉得“信息量过大”或者“有些地方没完全看懂”,那我的目的就达到了。真正的技术深度,从来不是让人一眼望穿的。它隐藏在那些你甚至没有意识到需要处理的问题里。
技术关键词:ABI 解码、ERC1155 批量解析、Solana 异构交易、多级降级、WSS 节点容灾、AI 工具调用循环、防御性 JSON 解析、异步队列
作者:潇楠Web3哨兵,全栈 & Web3 独立开发者。 GITHUB:github.com/pingdj/Web3