news 2026/1/2 8:13:29

Apache Doris 4.0.1 集群部署与 Paimon 数据湖集成实战文档

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Apache Doris 4.0.1 集群部署与 Paimon 数据湖集成实战文档

目录

1. 架构规划

1.1 硬件与系统信息

1.2节点分配

1.3 依赖组件 (CDH)

2. 操作系统基础配置 (所有节点)

2.1 检查 CPU AVX2 指令集

2.2 操作系统参数优化 (核心稳定性保障)

2.3 配置 Hosts 映射

2.4 创建目录与授权

3. Doris 安装与环境集成 (Bigdata 用户)

3.1 解决 JDK 版本冲突 (关键)

3.2 解压 Doris 安装包

3.3 集成 CDH 配置文件

4. FE (Frontend) 部署

4.1 修改 fe.conf

4.2 启动与组网

5. BE (Backend) 部署

5.1 修改 be.conf

5.2 启动与注册

6. Paimon 数据湖集成 (最终验证方案)

6.1 创建 Catalog SQL (生产推荐)

6.2 验证查询

6.2.1 切换 Catalog 与 数据库

6.2.2 查看表列表

6.2.3 数据查询验证

6.3 方案优势总结 (Why HMS?)

7. 部署过程问题排查记录 (Troubleshooting)

8. 基础测试(增删改查)

8.1 插入数据

8.2 更新数据

8.3 删除数据

9. Paimon外表数据写入Doris内表基础测试

9.1 Paimon数据准备

9.2 测试代码

9.3 验证数据


1. 项目背景与技术升级随着业务数据量的激增及对实时性要求的提高,现有Paimon + OLAP架构面临着更高的性能挑战。尽管早期引入的 Doris 2.x 版本成功解决了大字段存储痛点,但为了进一步挖掘数据价值,追求极致的查询响应速度与更低的资源消耗,项目组计划探索Apache Doris 4.0.1(官方最新版本)的架构潜力。

新版本在异步物化视图、全新的查询优化器(Nereids)以及湖仓读取管线上进行了颠覆性升级。本次选型调整旨在验证新一代引擎在处理超大规模复杂数据时的性能边界,评估其是否能作为下一代核心计算单元,进一步提升数仓的流批处理能力。

2. 测试目标本次测试旨在对Doris 4.0.1 + Paimon 1.1.1进行前瞻性的深度集成验证,重点评估新版本特性带来的收益,具体包括:

  • 极速读取性能:验证新版本针对 Paimon 格式的 Native Reader 优化效果,评估在海量数据扫描场景下的 IO 吞吐提升幅度。

  • 复杂计算与回写效能:在保留大字段处理优势的前提下,对比测试新优化器在执行复杂 ETL 逻辑时的计算加速比,以及高并发场景下的数据回写稳定性。

3.补充说明

Paimon官网支持Doris 2.0.6 及以上版本。对应网址:Doris | Apache Paimon

Doirs官网支持对Paimon的查询,使用 Doris 的分布式计算引擎直接访问 Paimon 数据以实现查询加速;数据集成,读取Paimon数据并将其写入Doris内部表,或使用Doris计算引擎执行ZeroETL;不支持数据写回Paimon。对应网址:Paimon Catalog - Apache Doris

支持Paimon版本为1.0.0

调研架构图如下:

1. 架构规划

1.1 硬件与系统信息

  • 操作系统: CentOS 7 (CDH 6.3.2 环境混合部署)

  • 节点配置:

    • CPU: 10核

    • 内存: 14GB (资源紧缺,需精细调优)

    • 存储: 400GB SSD

  • 部署用户:bigdata

  • Java 环境: Doris 4.0 需独立安装 JDK 17,这里我安装部署之后的路径为/home/bigdata/doris/jdk-17.0.2

Doris官方文档:软硬件环境检查 - Apache Doris

1.2节点分配

前置组件分配

IP主机名角色版本
10.x.xx.201-10.x.xx.205 10.x.xx.215 10.x.xx.149 10.x.xx.151 10.x.xx.156 10.x.xx.157 10.x.xx.167 10.x.xx.206nd1-nd5 nd6 nd11 nd12 nd13 nd14 nd15 nd16CDH6.3.2
10.x.xx.201-10.x.xx.205nd1-nd5Paimon1.1.1

采用 3 节点混合部署 (FE + BE 同节点) 的高可用架构。

IP 地址主机名角色规划操作系统用户备注
10.x.xx.157nd14FE (Master)+BEbigdata引导节点
10.x.xx.167nd15FE (Follower)+BEbigdata高可用节点
10.x.xx.206nd16FE (Follower)+BEbigdata高可用节点

1.3 依赖组件 (CDH)

  • Hive Metastore:nd1 (10.x.xx.201),nd3 (10.x.xx.203)

  • HDFS NameNode: HA 模式 (Active 节点假设为 nd1)

  • CDH Java 版本: JDK 1.8 (与 Doris 4.0 不兼容,需独立安装 JDK 17)

2. 操作系统基础配置 (所有节点)

执行对象:所有 3 台机器,nd14、nd15、nd16上执行以下操作。

2.1 检查 CPU AVX2 指令集

Doris 4.0+ 强依赖 AVX2。

cat /proc/cpuinfo | grep avx2

  • 有输出:继续下一步。

  • 无输出:您需要下载 Doris 的x64-noavx2版本安装包,否则 BE 启动会报错Illegal instruction

2.2 操作系统参数优化 (核心稳定性保障)

针对 14GB 小内存环境,必须关闭 Swap 并调大文件句柄。

# 1. 永久关闭 Swap (防止内存吃紧时拖死机器) # 临时关闭,这里我使用的临时关闭,可根据实际环境选择是否永久关闭 swapoff -a # 永久关闭 sed -i '/swap/s/^/#/' /etc/fstab ​ # 2. 修改内核参数 sudo vi /etc/sysctl.conf # 添加: vm.max_map_count=2000000 vm.swappiness=0 # 生效: sysctl -p

# 3. 修改资源限制 vi /etc/security/limits.conf # 添加: * soft nofile 65536 * hard nofile 65536 * soft nproc 65536 * hard nproc 65536

注意:修改 limits 后,必须退出 SSH 重新登录bigdata用户才能生效。

注意:由于原始的配置均为65535,Doris 官方推荐 65536 是为了取个整(2的16次方),但实际上 65535 对于 Doris 来说没有任何区别。只要这个数值大于 60000,Doris 就能非常稳定地运行。因此上述配置可以不用进行配置。

2.3 配置 Hosts 映射

确保 Doris 节点能解析自身以及 CDH 的节点。由于这里我是在CDH集群节点上选择的节点搭建,相应的配置均有,因此也可不用进行配置。

vi /etc/hosts ​ # Doris 集群 10.x.xx.157 nd14 10.x.xx.167 nd15 10.x.xx.206 nd16 ​ # CDH 依赖 (必须包含 hive-site.xml 中配置的主机名) 10.x.xx.201 nd1 10.x.xx.203 nd3

2.4 创建目录与授权

mkdir -p /home/bigdata/doris mkdir -p /home/bigdata/doris/doris-meta # FE 元数据 mkdir -p /home/bigdata/doris/doris-storage # BE 数据存储 ​ # 移交权限给 bigdata chown -R bigdata:bigdata /home/bigdata/doris

3. Doris 安装与环境集成 (Bigdata 用户)

执行对象:所有 3 台机器执行用户:bigdata

3.1 解决 JDK 版本冲突 (关键)

痛点:CDH 环境是 JDK 8,但 Doris 4.0.1 强制要求 JDK 17。方案:下载免安装版 JDK 17,仅供 Doris 内部使用,不影响系统环境变量。

# 1. 上传或下载 JDK 17 到 /home/bigdata/doris/ 目录 cd /home/bigdata/doris/ # 假设已下载 openjdk-17.0.2_linux-x64_bin.tar.gz # wget https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz tar -zxvf openjdk-17.0.2_linux-x64_bin.tar.gz ​ # 2. 记录路径 (后续配置要用) # 路径为: /home/bigdata/doris/jdk-17.0.2

也可使用网址下载然后上传:https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/openjdk-17.0.2_linux-x64_bin.tar.gz

下载压缩包成功的截图如下:

解压之后的结果如下:

3.2 解压 Doris 安装包

cd /home/bigdata/doris tar -zxvf apache-doris-4.0.1-bin-x64.tar.gz cd apache-doris-4.0.1-bin-x64 mv fe /home/bigdata/doris/ mv be /home/bigdata/doris/

3.3 集成 CDH 配置文件

nd14上操作,将 nd1上CDH 集群的配置文件拉取到 Doris 配置目录。

mkdir -p /home/bigdata/doris/conf/cdh_conf/ ​ # 从 CDH 节点 (201) 拷贝 # 拷贝 Hadoop 配置文件 (core-site.xml 和 hdfs-site.xml) scp root@10.x.xx.201:/etc/hadoop/conf/core-site.xml /home/bigdata/doris/conf/cdh_conf/ scp root@10.x.xx.201:/etc/hadoop/conf/hdfs-site.xml /home/bigdata/doris/conf/cdh_conf/ # 拷贝 Hive 配置文件 (hive-site.xml) scp root@10.x.xx.201:/etc/hive/conf/hive-site.xml /home/bigdata/doris/conf/cdh_conf/

验证文件

ls -l /home/bigdata/doris/conf/cdh_conf/

分发到其他节点 (nd15, nd16)

将这 3 个文件放到 Doris 2台机器(另外2台)的统一目录,例如/home/bigdata/doris/conf/cdh_conf/

# 1. 确保目标机器也有这个目录 ssh bigdata@nd15 "mkdir -p /home/bigdata/doris/conf/cdh_conf/" ssh bigdata@nd16 "mkdir -p /home/bigdata/doris/conf/cdh_conf/"

# 2. 发送文件给 nd15 scp /home/bigdata/doris/conf/cdh_conf/* bigdata@nd15:/home/bigdata/doris/conf/cdh_conf/

# 3. 发送文件给 nd16 scp /home/bigdata/doris/conf/cdh_conf/* bigdata@nd16:/home/bigdata/doris/conf/cdh_conf/

4. FE (Frontend) 部署

4.1 修改fe.conf

操作对象:3 台机器。

vi /home/bigdata/doris/fe/conf/fe.conf
# 1. 元数据目录 meta_dir = /home/bigdata/doris/doris-meta # 2. 绑定网段 priority_networks = 10.8.15.0/24 # 3. 【关键】指定 JDK 17 路径 (解决 The jdk_version is 8, must be 17 报错) JAVA_HOME = /home/bigdata/doris/jdk-17.0.2 # 4. 内存优化 (G1GC + 4G堆内存) # 14G 内存机器,给 FE 分配 4G,预留资源给 BE JAVA_OPTS = "-Xmx4096m -Xms4096m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -Xlog:gc*:$DORIS_HOME/log/fe.gc.log:time,uptimemillis,level,tags"

4.2 启动与组网

1. 启动 Master (10.x.xx.157)

/home/bigdata/doris/fe/bin/start_fe.sh --daemon # 停止服务 # /home/bigdata/doris/fe/bin/stop_fe.sh --daemon

2. 启动 Followers (10.x.xx.167 和 206)首次启动需指定 Helper

/home/bigdata/doris/fe/bin/start_fe.sh --helper 10.x.xx.157:9010 --daemon

查看日志确认是否启动成功:

tail -f /home/bigdata/doris/fe/log/fe.log

由上述截图可以得出:

  1. receive report from be ...:

    • 这是最关键的信号。说明 FE 收到了 BE 节点发来的心跳汇报。

    • type: DISK: BE 正在汇报磁盘使用情况。

    • type: CPU: BE 正在汇报 CPU 负载。

    • 这意味着FE 和 BE 之间的网络是通的,且 BE 进程已存活

  2. BeLoadRebalancer ... get number of low load paths ...:

    • 这是 Doris 的均衡调度器在工作。它正在检查是否需要在节点间迁移数据副本。

    • isUrgent false: 表示当前没有紧急的均衡任务(正常,因为是新集群,没有数据倾斜)。

  3. BinlogManager.gc() ... no gc binlog:

    • 这是清理过期的 Binlog 日志,属于正常的后台维护任务。

3. 注册节点在 157 上使用 MySQL 客户端连接:

mysql -h 10.x.xx.157 -P 9030 -u root

执行 SQL:

ALTER SYSTEM ADD FOLLOWER "10.x.xx.167:9010"; ALTER SYSTEM ADD FOLLOWER "10.x.xx.206:9010";

SHOW PROC '/frontends';

验证:SHOW PROC '/frontends'; 确保 3 节点 Alive=true

5. BE (Backend) 部署

5.1 修改be.conf

操作对象:3 台机器。

vi /home/bigdata/doris/be/conf/be.conf
# 1. 绑定网段 priority_networks = 10.8.15.0/24 # 2. 数据目录 storage_root_path = /home/bigdata/doris/doris-storage # 3. 【关键】指定 JDK 17 (Paimon 插件依赖) JAVA_HOME = /home/bigdata/doris/jdk-17.0.2 # 4. 【关键】端口避让 (CDH NodeManager 占用 8040,改为 18040) webserver_port = 18040

5.2 启动与注册

1. 启动所有 BE

/home/bigdata/doris/be/bin/start_be.sh --daemon # 停止服务 # /home/bigdata/doris/be/bin/stop_be.sh --daemon

查看日志确认是否启动成功:

tail -f /home/bigdata/doris/be/log/be.log

由上面截图可以得出:

  1. success to build all report tablets info:

    • 这是最重要的信号。说明 BE 已经成功扫描了本地的数据分片(Tablets),并准备好向 FE 汇报状态。

    • tablet_count=15: 说明它已经管理了一些内部表(通常是 Doris 的系统表)。

  2. Scheduled(every 10s) WAL info:

    • 预写日志(WAL)管理器正在例行检查,用于保障数据写入的可靠性。状态正常。

  3. query for dictionary status, return 0 rows:

    • 这是 BE 内部的字典缓存查询,没有报错,属于正常的心跳或检测机制。

集群状态总结

  • FE 日志: 正常接收 BE 汇报,无报错。

  • BE 日志: 正常维护存储,定期汇报,无报错。

  • 进程: FE 和 BE 均已存活。

2. 注册 BE在 157 的 MySQL 客户端执行:

ALTER SYSTEM ADD BACKEND "10.x.xx.157:9050"; ALTER SYSTEM ADD BACKEND "10.x.xx.167:9050"; ALTER SYSTEM ADD BACKEND "10.x.xx.206:9050";

SHOW PROC '/backends';

验证:SHOW PROC '/backends'; 确保 3 节点 Alive=true

6. Paimon 数据湖集成 (最终验证方案)

背景: 在对接 CDH 6.3.2 (Hive 2.1.1) 时,Doris 默认的 Hive 3 客户端协议会触发Invalid method name报错。 在 Doris 2.x 版本中,为了规避此问题通常被迫使用filesystem模式,或者在配置了HA模式下,"hive.metastore.uris"参数只设置一个地址,当其中一个节点挂掉之后,需要手动进行切换。但在 Doris 4.0.1 版本中,内核优化了对多版本协议的支持,完全修复了 HA 模式下版本参数失效的问题

解决方案: 采用HMS (Hive Metastore) 模式,配置双节点 HA,并通过显式指定hive.version进行协议降级。这是兼顾高性能(支持 CBO 优化)高可用数据一致性(支持 ACID)的最佳实践方案。

6.1 创建 Catalog SQL (生产推荐)

在 Doris 4.0.1 中,可以放心使用 HA 配置。在 Doris MySQL 客户端执行:

DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( -- 1. 指定 Catalog 类型为 Paimon "type" = "paimon", -- 2. 【核心配置】使用 hms 模式,通过 Hive Metastore 管理元数据 -- 相比 filesystem 模式,支持权限控制、ACID 事务读取和查询优化 "paimon.catalog.type" = "hms", -- 3. 【高可用配置】配置多个 Metastore 地址 (Doris 4.0 已修复 HA 兼容性问题) "hive.metastore.uris" = "thrift://nd1:9083,thrift://nd3:9083", -- 4. 【关键兼容修复】 -- 强制指定 Hive 版本为 2.1.1 (对应 CDH 6.3.2),禁用高版本不兼容的 API 调用 "hive.version" = "2.1.1", -- 5. 指定数仓物理路径 "warehouse" = "hdfs://nd1:8020/user/hive/warehouse", -- 6. 加载 Hadoop 配置文件 (用于读取 HDFS HA、Kerberos 等配置) "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/", "hadoop.username" = "hdfs" );

注意:此处paimon.catalog.type必须填写为"hms",不可使用"hive"(旧版写法)或"filesystem"

6.2 验证查询

6.2.1 切换 Catalog 与 数据库

SWITCH paimon_catalog; SHOW DATABASES;

结果如下:

6.2.2 查看表列表

USE ods; SHOW TABLES;

结果如下:

6.2.3 数据查询验证

SELECT * FROM t_admin_division_code LIMIT 5;

结果如下:

6.3 方案优势总结 (Why HMS?)

相比之前的 Filesystem 方案,当前方案具有显著优势:

  1. 高可用性 (HA):配置了nd1nd3双 Metastore 节点,任意单点故障不影响 Doris 业务查询。

  2. 性能优化 (CBO):Doris 可以从 HMS 获取表的行数、文件大小等统计信息,生成更优的 Join 执行计划。

  3. 数据准确性:HMS 模式能正确识别 Paimon/Hive 的 ACID 事务状态,避免读取到未提交或已删除的脏数据。

  4. 运维规范:统一通过 Metastore 管理元数据,符合数仓建设标准。

对于Doris 2.1.10的所有方案均可以在Doris 4.0.1里面进行查看,相应的截图如下:

方案一【Doris 2.1.10】:

DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( "type" = "paimon", "paimon.catalog.type" = "hms", "hive.metastore.uris" = "thrift://nd1:9083,thrift://nd3:9083", "warehouse" = "hdfs://nd1:8020/user/hive/warehouse", "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/", "hadoop.username" = "hdfs", -- 【关键修复】显式指定 Hive 版本,禁止调用 Hive 3 的新 API "hive.version" = "2.1.1" ); SWITCH paimon_catalog; USE ods; SELECT * FROM t_admin_division_code LIMIT 5;

方案二【Doris 2.1.10】:

DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( "type" = "paimon", "paimon.catalog.type" = "hms", "hive.metastore.uris" = "thrift://nd1:9083,thrift://nd3:9083", "warehouse" = "hdfs://nd1:8020/user/hive/warehouse", "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/", "hadoop.username" = "hdfs", -- 【关键修复】显式指定 Hive 版本,禁止调用 Hive 3 的新 API "hive.version" = "1.1.0" ); SWITCH paimon_catalog; USE ods; SELECT * FROM t_admin_division_code LIMIT 5;

方案三【Doris 2.1.10】:

DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( "type" = "paimon", "paimon.catalog.type" = "filesystem", -- 直接指向 HDFS 上的数仓根目录 (注意:如果 nameservice 未解析,直接写 active namenode 地址) "warehouse" = "hdfs://nd1:8020/user/hive/warehouse", "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/", "hadoop.username" = "hdfs" ); SWITCH paimon_catalog; USE ods; SELECT * FROM t_admin_division_code LIMIT 5;

方案四【Doris 2.1.10】:

DROP CATALOG IF EXISTS paimon_catalog; CREATE CATALOG paimon_catalog PROPERTIES ( "type" = "paimon", -- 【这里是修改点】:将 'hive' 改为 'hms' "paimon.catalog.type" = "hms", -- 指定 HMS 地址 "hive.metastore.uris" = "thrift://nd1:9083", -- 【核心兼容配置】保持不变,解决 CDH 兼容性 "hive.version" = "2.1.1", -- 数仓路径 "warehouse" = "hdfs://nd1:8020/user/hive/warehouse", -- 复用配置文件 "hadoop.conf.dir" = "/home/bigdata/doris/conf/cdh_conf/", "hadoop.username" = "hdfs" ); SWITCH paimon_catalog; USE ods; SELECT * FROM t_admin_division_code LIMIT 5;

7. 部署过程问题排查记录 (Troubleshooting)

在本次部署中,我们遇到了以下关键问题并已解决:

#问题现象报错关键信息根本原因解决方案
1FE 启动失败The jdk_version is 8, must be 17CDH 环境默认是 JDK 8,SelectDB 4.0 强制要求 JDK 17。下载解压 JDK 17,在fe.confbe.conf中显式配置JAVA_HOME指向新 JDK 路径。
2权限拒绝fe.out: Permission denied曾使用 root 用户启动过 Doris,导致日志文件归属变为 root,切回 bigdata 后无法写入。使用 root 执行chown -R bigdata:bigdata /home/bigdata/doris修复所有权,并杀掉残留的 root 进程。
3端口冲突tcp listen failed, errno=98(8040)CDH 的 YARN NodeManager 占用了 8040 端口。修改be.conf,设置webserver_port = 18040
5内存隐患(无报错,但在高负载下可能死机)14GB 内存下,BE 默认尝试占用 90% 内存,会挤压 FE 和 OS 空间。be.conf中强制设置mem_limit = 60%

8. 基础测试(增删改查)

将下述测试代码保存为doris4_test.py【python解释器:3.8.20、windows系统:11】

# -*- coding: utf-8 -*- import pymysql import random import time import logging import functools from sshtunnel import SSHTunnelForwarder # ================= 配置信息 ================= # 1. SSH 连接配置 (连接到 Doris FE 所在的服务器) # 您提供的 IP 列表: 10.x.xx.157, 10.x.xx.167, 10.x.xx.206 SSH_HOST = '10.x.xx.157' # 选一个 FE 节点 (Master) SSH_PORT = 22 SSH_USER = 'xxxxxx' # CDH 环境通常使用 bigdata 用户 SSH_PASSWORD = 'xxxxxxxxxxxxxxxxxxxxx' # 您提供的密码 # 2. Doris 数据库连接配置 DORIS_LOCAL_HOST = '127.0.0.1' # 隧道建立后,对本地而言就是连本机 DORIS_QUERY_PORT = 9030 # Doris MySQL 协议查询端口 DORIS_DB_USER = 'root' DORIS_DB_PWD = '' # 默认无密码 DB_NAME = 'doris4_test_db' TABLE_NAME = 'user_profile_v4' LOG_FILE = 'doris4_test.log' # ================= 日志与工具模块 ================= def setup_logger(): logger = logging.getLogger("Doris4Tester") logger.setLevel(logging.INFO) if logger.hasHandlers(): logger.handlers.clear() formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') # 文件日志 file_handler = logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8') file_handler.setFormatter(formatter) # 控制台日志 console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger = setup_logger() def measure_time(func): """装饰器:用于记录函数执行耗时""" @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() logger.info(f"正在执行: [{func.__name__}] ...") try: result = func(*args, **kwargs) duration = time.time() - start_time logger.info(f"执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒") return result except Exception as e: duration = time.time() - start_time logger.error(f"执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}") raise e return wrapper # ================= Doris 4.0 核心业务逻辑 ================= @measure_time def init_db_and_table(cursor): """初始化数据库和表 (适配 Doris 4.0 Unique Key 模型)""" cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DB_NAME}") cursor.execute(f"USE {DB_NAME}") # Doris 4.0 建表最佳实践: # 1. 使用 Unique Key 模型 # 2. 开启 enable_unique_key_merge_on_write (MoW) 实现类 MySQL 的高性能更新 # 3. replication_num = 1 (测试环境节省资源,生产建议 3) create_sql = f""" CREATE TABLE IF NOT EXISTS {TABLE_NAME} ( user_id INT COMMENT "用户ID", username VARCHAR(50) COMMENT "用户名", age INT COMMENT "年龄", city VARCHAR(20) COMMENT "城市", balance DECIMAL(10, 2) COMMENT "余额", create_time DATETIME(3) COMMENT "创建时间(毫秒精度)" ) UNIQUE KEY(user_id) DISTRIBUTED BY HASH(user_id) BUCKETS 1 PROPERTIES ( "replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "store_row_column" = "true" ); """ # store_row_column="true" 是 4.0 特性,优化部分列更新和点查性能 cursor.execute(create_sql) # 每次测试前清空表,保证环境纯净 cursor.execute(f"TRUNCATE TABLE {TABLE_NAME}") logger.info(f"数据库 {DB_NAME} 和表 {TABLE_NAME} (Unique Key MoW) 已初始化") @measure_time def insert_data(cursor, count=10): """批量插入数据""" logger.info(f"准备插入 {count} 条数据...") data_list = [] cities = ['Beijing', 'Shanghai', 'Guangzhou', 'Shenzhen', 'Chengdu'] for i in range(1, count + 1): user_id = i username = f"User_{i:03d}" age = random.randint(20, 60) city = random.choice(cities) balance = round(random.uniform(100.0, 5000.0), 2) # 模拟当前时间 create_time = time.strftime('%Y-%m-%d %H:%M:%S') data_list.append((user_id, username, age, city, balance, create_time)) sql = f"INSERT INTO {TABLE_NAME} VALUES (%s, %s, %s, %s, %s, %s)" cursor.executemany(sql, data_list) logger.info(f"成功插入 {cursor.rowcount} 条数据") @measure_time def update_data(cursor): """随机更新数据 (Doris 4.0 MoW 模型支持高效 UPDATE)""" # 1. 先查出所有 ID cursor.execute(f"SELECT user_id FROM {TABLE_NAME}") all_ids = [row['user_id'] for row in cursor.fetchall()] if not all_ids: logger.warning("表中无数据,跳过更新") return # 2. 随机选 3 个 ID 更新 target_ids = random.sample(all_ids, min(len(all_ids), 3)) logger.info(f"随机选中 ID 进行更新: {target_ids}") for uid in target_ids: new_balance = 9999.99 new_city = "Updated_City" # 标准 MySQL 更新语法 sql = f"UPDATE {TABLE_NAME} SET balance = %s, city = %s WHERE user_id = %s" cursor.execute(sql, (new_balance, new_city, uid)) logger.info(f" -> 已更新 ID={uid}: Balance设为 {new_balance}, City设为 {new_city}") @measure_time def delete_data(cursor): """随机删除数据""" # 1. 先查出所有 ID cursor.execute(f"SELECT user_id FROM {TABLE_NAME}") all_ids = [row['user_id'] for row in cursor.fetchall()] if not all_ids: logger.warning("表中无数据,跳过删除") return # 2. 随机选 2 个 ID 删除 target_ids = random.sample(all_ids, min(len(all_ids), 2)) logger.info(f"随机选中 ID 进行删除: {target_ids}") # 使用 IN 语法批量删除 format_strings = ','.join(['%s'] * len(target_ids)) sql = f"DELETE FROM {TABLE_NAME} WHERE user_id IN ({format_strings})" cursor.execute(sql, tuple(target_ids)) logger.info(f" -> 删除操作完成,受影响行数: {cursor.rowcount}") @measure_time def query_final_result(cursor): """查询并展示最终结果""" sql = f"SELECT * FROM {TABLE_NAME} ORDER BY user_id" cursor.execute(sql) results = cursor.fetchall() logger.info("-" * 50) logger.info(f"最终表数据 (总行数: {len(results)}):") logger.info(f"{'ID':<5} {'Name':<10} {'Age':<5} {'City':<15} {'Balance':<10}") logger.info("-" * 50) for row in results: # 这里的 row 是字典,因为 connect 时指定了 DictCursor logger.info(f"{row['user_id']:<5} {row['username']:<10} {row['age']:<5} {row['city']:<15} {row['balance']:<10}") logger.info("-" * 50) # ================= 主程序入口 ================= def main(): ssh_tunnel = None db_conn = None try: logger.info(">>> 1. 正在建立 SSH 隧道 ...") # 配置 SSH 隧道:本地随机端口 -> SSH(10.8.15.157) -> Doris FE(127.0.0.1:9030) ssh_tunnel = SSHTunnelForwarder( (SSH_HOST, SSH_PORT), ssh_username=SSH_USER, ssh_password=SSH_PASSWORD, remote_bind_address=(DORIS_LOCAL_HOST, DORIS_QUERY_PORT) ) ssh_tunnel.start() logger.info(f">>> SSH 隧道建立成功! 本地映射端口: {ssh_tunnel.local_bind_port}") logger.info(">>> 2. 正在连接 Doris 4.0.1 数据库 ...") db_conn = pymysql.connect( host='127.0.0.1', # 连接本机 port=ssh_tunnel.local_bind_port, # 隧道端口 user=DORIS_DB_USER, password=DORIS_DB_PWD, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, # 返回字典格式数据方便读取 autocommit=True # 开启自动提交,这对 Doris 很重要 ) cursor = db_conn.cursor() # --- 执行测试步骤 --- logger.info(">>> 3. 开始执行 CRUD 测试流程") # 1. 建库建表 init_db_and_table(cursor) # 2. 插入 10 条数据 insert_data(cursor, count=10) time.sleep(2) # 稍作等待,确保数据版本提交(虽然4.0很快,但为了演示效果) # 3. 随机更新 update_data(cursor) time.sleep(1) # 4. 随机删除 delete_data(cursor) time.sleep(1) # 5. 查询结果 query_final_result(cursor) logger.info(">>> 4. 测试全部通过!Doris 4.0.1 运行正常。") except Exception as e: logger.error(f"❌ 测试过程中发生错误: {e}") import traceback logger.error(traceback.format_exc()) finally: # 资源清理 if db_conn: db_conn.close() logger.info("数据库连接已关闭") if ssh_tunnel: ssh_tunnel.stop() logger.info("SSH 隧道已关闭") if __name__ == "__main__": main()

对应log文件记录如下:

2025-12-09 11:46:53,544 - INFO - >>> 1. 正在建立 SSH 隧道 ... 2025-12-09 11:46:53,830 - INFO - >>> SSH 隧道建立成功! 本地映射端口: 64441 2025-12-09 11:46:53,831 - INFO - >>> 2. 正在连接 Doris 4.0.1 数据库 ... 2025-12-09 11:46:54,028 - INFO - >>> 3. 开始执行 CRUD 测试流程 2025-12-09 11:46:54,028 - INFO - 正在执行: [init_db_and_table] ... 2025-12-09 11:46:54,044 - INFO - 数据库 doris4_test_db 和表 user_profile_v4 (Unique Key MoW) 已初始化 2025-12-09 11:46:54,044 - INFO - 执行完成: [init_db_and_table] | 耗时: 0.0165 秒 2025-12-09 11:46:54,045 - INFO - 正在执行: [insert_data] ... 2025-12-09 11:46:54,045 - INFO - 准备插入 100 条数据... 2025-12-09 11:46:54,188 - INFO - 成功插入 100 条数据 2025-12-09 11:46:54,188 - INFO - 执行完成: [insert_data] | 耗时: 0.1437 秒 2025-12-09 11:46:56,189 - INFO - 正在执行: [update_data] ... 2025-12-09 11:46:56,224 - INFO - 随机选中 ID 进行更新: [46, 71, 41] 2025-12-09 11:46:56,275 - INFO - -> 已更新 ID=46: Balance设为 9999.99, City设为 Updated_City 2025-12-09 11:46:56,326 - INFO - -> 已更新 ID=71: Balance设为 9999.99, City设为 Updated_City 2025-12-09 11:46:56,387 - INFO - -> 已更新 ID=41: Balance设为 9999.99, City设为 Updated_City 2025-12-09 11:46:56,388 - INFO - 执行完成: [update_data] | 耗时: 0.1991 秒 2025-12-09 11:46:57,388 - INFO - 正在执行: [delete_data] ... 2025-12-09 11:46:57,405 - INFO - 随机选中 ID 进行删除: [58, 50] 2025-12-09 11:46:57,457 - INFO - -> 删除操作完成,受影响行数: 2 2025-12-09 11:46:57,457 - INFO - 执行完成: [delete_data] | 耗时: 0.0691 秒 2025-12-09 11:46:58,458 - INFO - 正在执行: [query_final_result] ... 2025-12-09 11:46:58,483 - INFO - -------------------------------------------------- 2025-12-09 11:46:58,483 - INFO - 最终表数据 (总行数: 98): 2025-12-09 11:46:58,484 - INFO - ID Name Age City Balance 2025-12-09 11:46:58,484 - INFO - -------------------------------------------------- 2025-12-09 11:46:58,484 - INFO - 1 User_001 32 Guangzhou 4987.09 2025-12-09 11:46:58,484 - INFO - 2 User_002 55 Shanghai 4799.23 2025-12-09 11:46:58,484 - INFO - 3 User_003 29 Guangzhou 3795.66 2025-12-09 11:46:58,484 - INFO - 4 User_004 21 Shanghai 1065.80 2025-12-09 11:46:58,484 - INFO - 5 User_005 48 Beijing 3250.84 2025-12-09 11:46:58,484 - INFO - 6 User_006 33 Shenzhen 2121.31 2025-12-09 11:46:58,484 - INFO - 7 User_007 39 Guangzhou 1792.82 2025-12-09 11:46:58,485 - INFO - 8 User_008 21 Chengdu 350.23 2025-12-09 11:46:58,485 - INFO - 9 User_009 59 Chengdu 4108.27 2025-12-09 11:46:58,485 - INFO - 10 User_010 57 Shanghai 2674.18 2025-12-09 11:46:58,485 - INFO - 11 User_011 43 Guangzhou 4362.53 2025-12-09 11:46:58,485 - INFO - 12 User_012 24 Chengdu 845.92 2025-12-09 11:46:58,485 - INFO - 13 User_013 47 Shenzhen 3597.69 2025-12-09 11:46:58,485 - INFO - 14 User_014 53 Shenzhen 1159.92 2025-12-09 11:46:58,485 - INFO - 15 User_015 48 Shanghai 4159.49 2025-12-09 11:46:58,485 - INFO - 16 User_016 30 Guangzhou 3863.11 2025-12-09 11:46:58,485 - INFO - 17 User_017 30 Guangzhou 2356.23 2025-12-09 11:46:58,485 - INFO - 18 User_018 43 Beijing 4038.47 2025-12-09 11:46:58,485 - INFO - 19 User_019 33 Shenzhen 3465.97 2025-12-09 11:46:58,486 - INFO - 20 User_020 33 Guangzhou 2548.67 2025-12-09 11:46:58,486 - INFO - 21 User_021 47 Shanghai 2104.14 2025-12-09 11:46:58,486 - INFO - 22 User_022 44 Beijing 4932.08 2025-12-09 11:46:58,486 - INFO - 23 User_023 60 Guangzhou 2993.93 2025-12-09 11:46:58,486 - INFO - 24 User_024 57 Chengdu 1831.64 2025-12-09 11:46:58,486 - INFO - 25 User_025 26 Shenzhen 2478.94 2025-12-09 11:46:58,486 - INFO - 26 User_026 31 Shanghai 4901.88 2025-12-09 11:46:58,486 - INFO - 27 User_027 42 Shanghai 1422.68 2025-12-09 11:46:58,486 - INFO - 28 User_028 46 Shenzhen 2586.16 2025-12-09 11:46:58,486 - INFO - 29 User_029 31 Guangzhou 3395.99 2025-12-09 11:46:58,486 - INFO - 30 User_030 50 Shanghai 657.19 2025-12-09 11:46:58,486 - INFO - 31 User_031 51 Shenzhen 4879.95 2025-12-09 11:46:58,487 - INFO - 32 User_032 58 Guangzhou 1523.34 2025-12-09 11:46:58,487 - INFO - 33 User_033 48 Shanghai 2711.63 2025-12-09 11:46:58,487 - INFO - 34 User_034 38 Shanghai 1920.85 2025-12-09 11:46:58,487 - INFO - 35 User_035 31 Shanghai 1700.61 2025-12-09 11:46:58,487 - INFO - 36 User_036 56 Chengdu 2682.61 2025-12-09 11:46:58,487 - INFO - 37 User_037 55 Shenzhen 1431.78 2025-12-09 11:46:58,487 - INFO - 38 User_038 31 Shenzhen 4727.04 2025-12-09 11:46:58,487 - INFO - 39 User_039 32 Chengdu 3227.39 2025-12-09 11:46:58,487 - INFO - 40 User_040 43 Shenzhen 3663.79 2025-12-09 11:46:58,487 - INFO - 41 User_041 50 Updated_City 9999.99 2025-12-09 11:46:58,487 - INFO - 42 User_042 43 Guangzhou 1648.04 2025-12-09 11:46:58,487 - INFO - 43 User_043 35 Shanghai 3318.13 2025-12-09 11:46:58,487 - INFO - 44 User_044 48 Chengdu 2464.68 2025-12-09 11:46:58,488 - INFO - 45 User_045 28 Shenzhen 4477.58 2025-12-09 11:46:58,488 - INFO - 46 User_046 31 Updated_City 9999.99 2025-12-09 11:46:58,488 - INFO - 47 User_047 59 Shenzhen 3873.40 2025-12-09 11:46:58,488 - INFO - 48 User_048 55 Beijing 4772.47 2025-12-09 11:46:58,488 - INFO - 49 User_049 50 Shenzhen 1199.26 2025-12-09 11:46:58,488 - INFO - 51 User_051 59 Beijing 1975.62 2025-12-09 11:46:58,488 - INFO - 52 User_052 52 Beijing 309.98 2025-12-09 11:46:58,488 - INFO - 53 User_053 34 Shenzhen 1315.21 2025-12-09 11:46:58,488 - INFO - 54 User_054 40 Guangzhou 4976.19 2025-12-09 11:46:58,488 - INFO - 55 User_055 59 Shenzhen 2495.20 2025-12-09 11:46:58,488 - INFO - 56 User_056 38 Shanghai 2183.50 2025-12-09 11:46:58,489 - INFO - 57 User_057 47 Shanghai 3532.53 2025-12-09 11:46:58,489 - INFO - 59 User_059 29 Guangzhou 3959.38 2025-12-09 11:46:58,489 - INFO - 60 User_060 57 Shenzhen 2794.60 2025-12-09 11:46:58,489 - INFO - 61 User_061 44 Guangzhou 1043.38 2025-12-09 11:46:58,489 - INFO - 62 User_062 44 Beijing 1445.02 2025-12-09 11:46:58,489 - INFO - 63 User_063 34 Chengdu 2018.03 2025-12-09 11:46:58,489 - INFO - 64 User_064 30 Beijing 1325.72 2025-12-09 11:46:58,489 - INFO - 65 User_065 60 Shenzhen 2405.65 2025-12-09 11:46:58,489 - INFO - 66 User_066 24 Shanghai 1521.32 2025-12-09 11:46:58,489 - INFO - 67 User_067 47 Beijing 3320.86 2025-12-09 11:46:58,489 - INFO - 68 User_068 39 Shenzhen 2205.58 2025-12-09 11:46:58,490 - INFO - 69 User_069 43 Beijing 4372.42 2025-12-09 11:46:58,490 - INFO - 70 User_070 48 Guangzhou 1719.70 2025-12-09 11:46:58,490 - INFO - 71 User_071 55 Updated_City 9999.99 2025-12-09 11:46:58,490 - INFO - 72 User_072 58 Shanghai 4531.46 2025-12-09 11:46:58,490 - INFO - 73 User_073 47 Shanghai 1505.61 2025-12-09 11:46:58,490 - INFO - 74 User_074 30 Beijing 1342.90 2025-12-09 11:46:58,490 - INFO - 75 User_075 31 Shanghai 1321.63 2025-12-09 11:46:58,490 - INFO - 76 User_076 60 Chengdu 2761.57 2025-12-09 11:46:58,490 - INFO - 77 User_077 33 Shanghai 3493.49 2025-12-09 11:46:58,490 - INFO - 78 User_078 37 Shanghai 254.65 2025-12-09 11:46:58,490 - INFO - 79 User_079 29 Beijing 2223.87 2025-12-09 11:46:58,491 - INFO - 80 User_080 46 Chengdu 4061.74 2025-12-09 11:46:58,491 - INFO - 81 User_081 26 Shanghai 1116.46 2025-12-09 11:46:58,491 - INFO - 82 User_082 22 Guangzhou 3541.36 2025-12-09 11:46:58,491 - INFO - 83 User_083 43 Beijing 1379.45 2025-12-09 11:46:58,491 - INFO - 84 User_084 47 Beijing 4405.15 2025-12-09 11:46:58,491 - INFO - 85 User_085 22 Guangzhou 328.10 2025-12-09 11:46:58,491 - INFO - 86 User_086 37 Shanghai 1957.66 2025-12-09 11:46:58,491 - INFO - 87 User_087 44 Shenzhen 4303.43 2025-12-09 11:46:58,491 - INFO - 88 User_088 24 Shenzhen 2125.09 2025-12-09 11:46:58,491 - INFO - 89 User_089 57 Shenzhen 3759.54 2025-12-09 11:46:58,491 - INFO - 90 User_090 45 Guangzhou 4788.86 2025-12-09 11:46:58,492 - INFO - 91 User_091 46 Beijing 683.87 2025-12-09 11:46:58,492 - INFO - 92 User_092 56 Chengdu 1321.36 2025-12-09 11:46:58,492 - INFO - 93 User_093 36 Shanghai 4819.07 2025-12-09 11:46:58,492 - INFO - 94 User_094 28 Beijing 4583.63 2025-12-09 11:46:58,492 - INFO - 95 User_095 29 Shenzhen 2064.74 2025-12-09 11:46:58,492 - INFO - 96 User_096 52 Beijing 3397.31 2025-12-09 11:46:58,492 - INFO - 97 User_097 33 Shanghai 3227.49 2025-12-09 11:46:58,492 - INFO - 98 User_098 27 Beijing 1809.42 2025-12-09 11:46:58,492 - INFO - 99 User_099 41 Chengdu 2890.17 2025-12-09 11:46:58,492 - INFO - 100 User_100 41 Chengdu 3228.25 2025-12-09 11:46:58,493 - INFO - -------------------------------------------------- 2025-12-09 11:46:58,493 - INFO - 执行完成: [query_final_result] | 耗时: 0.0348 秒 2025-12-09 11:46:58,493 - INFO - >>> 4. 测试全部通过!Doris 4.0.1 运行正常。 2025-12-09 11:46:58,493 - INFO - 数据库连接已关闭 2025-12-09 11:46:58,512 - INFO - SSH 隧道已关闭

去nd14执行下述语句进行查看:

mysql -h 10.x.xx.157 -P 9030 -uroot
show databases; use doris4_test_db; show tables;

select * from user_profile_v4;

8.1 插入数据

这里展示部分数据,可以看出插入了100条测试数据:

8.2 更新数据

控制台打印结果:

终端查看结果:

8.3 删除数据

控制台打印结果:

终端验证结果:

9. Paimon外表数据写入Doris内表基础测试

9.1 Paimon数据准备

对于Paimon外表数据写入Doris内表基础测试,需要提前在Flink SQL会话里面创建Paimon表,并插入测试数据

-- 1. 创建 Flink 端的 Paimon Catalog CREATE CATALOG paimon_catalog WITH ( 'type' = 'paimon', 'warehouse' = 'hdfs:///user/hive/warehouse', 'metastore' = 'hive', 'hive-conf-dir' = '/etc/hive/conf.cloudera.hive' ); -- 2. 切换 Catalog 和 Database USE CATALOG my_paimon; CREATE DATABASE IF NOT EXISTS ods; USE ods; -- 3. 创建 Paimon 表 (源表) -- 这是一个记录用户行为的日志表 CREATE TABLE IF NOT EXISTS paimon_source_event ( user_id INT, item_id INT, behavior STRING, dt STRING, ts TIMESTAMP(3), PRIMARY KEY (dt, user_id, item_id) NOT ENFORCED ) PARTITIONED BY (dt) WITH ( 'bucket' = '1', 'file.format' = 'parquet' ); -- 4. 写入测试数据 (Batch 模式写入) INSERT INTO paimon_source_event VALUES (1001, 501, 'click', '2025-12-12', TIMESTAMP '2025-12-12 10:00:00.123'), (1002, 502, 'view', '2025-12-12', TIMESTAMP '2025-12-12 10:05:00.456'), (1003, 501, 'buy', '2025-12-12', TIMESTAMP '2025-12-12 10:10:00.789'), (1001, 503, 'view', '2025-12-13', TIMESTAMP '2025-12-13 11:00:00.000'), (1004, 501, 'click', '2025-12-13', TIMESTAMP '2025-12-13 11:05:00.000');

9.2 测试代码

将下述测试代码保存为doris4_paimon_tel_test.py【python解释器:3.8.20、windows系统:11】

# -*- coding: utf-8 -*- import pymysql import time import logging import functools from sshtunnel import SSHTunnelForwarder # ================= 配置信息 ================= # 1. SSH 连接信息 (使用 Doris 4.0 所在的节点 IP) # 参考 doris4_test.py 的配置 SSH_HOST = '10.x.xx.157' # Doris 4.0 FE Master SSH_PORT = 22 SSH_USER = 'xxxxxx' SSH_PASSWORD = 'xxxxxxxxxxxxxxxxx' # 2. Doris 数据库连接信息 DORIS_LOCAL_HOST = '127.0.0.1' DORIS_QUERY_PORT = 9030 DORIS_DB_USER = 'root' DORIS_DB_PWD = '' # 3. Paimon Catalog 配置 # 【重要提示】: 请确保 Doris 4.0 的节点上,该路径下也有 core-site.xml/hdfs-site.xml CATALOG_PROPS = { "type": "paimon", "paimon.catalog.type": "hms", "hive.metastore.uris": "thrift://nd1:9083,thrift://nd3:9083", # HMS 地址保持不变 "hive.version": "2.1.1", # 保持兼容性配置 "warehouse": "hdfs://nd1:8020/user/hive/warehouse", "hadoop.conf.dir": "/home/bigdata/doris/conf/cdh_conf/", "hadoop.username": "hdfs" } # 4. 业务配置 PAIMON_CATALOG_NAME = 'paimon_catalog' PAIMON_DB = 'ods' PAIMON_TABLE = 'paimon_source_event' # 使用新的数据库名区分测试 DORIS_TEST_DB = 'doris4_paimon_test_db' DORIS_TARGET_TABLE = 'doris4_target_event_sink' LOG_FILE = 'doris4_paimon_etl_report.log' # ================= 日志与工具模块 ================= def setup_logger(): logger = logging.getLogger("Doris4PaimonTester") logger.setLevel(logging.INFO) if logger.hasHandlers(): logger.handlers.clear() formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler = logging.FileHandler(LOG_FILE, mode='w', encoding='utf-8') file_handler.setFormatter(formatter) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger logger = setup_logger() def measure_time(func): @functools.wraps(func) def wrapper(*args, **kwargs): start_time = time.time() logger.info(f"正在执行: [{func.__name__}] ...") try: result = func(*args, **kwargs) duration = time.time() - start_time logger.info(f"执行完成: [{func.__name__}] | 耗时: {duration:.4f} 秒") return result except Exception as e: duration = time.time() - start_time logger.error(f"执行失败: [{func.__name__}] | 耗时: {duration:.4f} 秒 | 错误: {e}") raise e return wrapper # ================= 核心测试逻辑 ================= @measure_time def init_doris_catalog(cursor): """在 Doris 4.0 中初始化 Paimon Catalog""" logger.info(f"正在初始化 Doris Catalog: {PAIMON_CATALOG_NAME} ...") # 删除旧的 Catalog (如果存在) cursor.execute(f"DROP CATALOG IF EXISTS {PAIMON_CATALOG_NAME}") # 拼接创建语句 props_str = ",\n".join([f'"{k}" = "{v}"' for k, v in CATALOG_PROPS.items()]) create_sql = f""" CREATE CATALOG {PAIMON_CATALOG_NAME} PROPERTIES ( {props_str} ); """ logger.info("发送 Create Catalog 请求...") cursor.execute(create_sql) logger.info("Catalog 创建成功!") time.sleep(2) # 稍微等待元数据同步 @measure_time def check_paimon_source(cursor): """验证 Doris 是否能通过 Catalog 读取 Paimon 数据""" logger.info(f"检查 Paimon 数据源: {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}") # 1. 切换到 Paimon Catalog cursor.execute(f"SWITCH {PAIMON_CATALOG_NAME}") # 2. 检查表是否存在 cursor.execute(f"USE {PAIMON_DB}") cursor.execute("SHOW TABLES") # Doris 不同版本 fetchall 返回结构可能微调,这里做通用处理 tables = [list(row.values())[0] for row in cursor.fetchall()] if PAIMON_TABLE not in tables: logger.warning(f"当前 Catalog 下的表: {tables}") raise Exception(f"Paimon 表 {PAIMON_TABLE} 未找到!") # 3. 预览数据 sql = f"SELECT * FROM {PAIMON_TABLE} ORDER BY dt, user_id LIMIT 5" cursor.execute(sql) results = cursor.fetchall() logger.info(f"Paimon 数据预览 (前5条):") for row in results: logger.info(row) if not results: raise Exception("Paimon 表为空,请先在 Flink 端写入数据!") return len(results) @measure_time def create_doris_target_table(cursor): """创建 Doris 4.0 内部表 (Unique Key MoW 模型)""" cursor.execute("SWITCH internal") cursor.execute(f"CREATE DATABASE IF NOT EXISTS {DORIS_TEST_DB}") cursor.execute(f"USE {DORIS_TEST_DB}") # 注意:Unique Key 模型要求 Key 字段必须排在 Value 字段前面 # Key: user_id, item_id, dt # Value: behavior, ts create_sql = f""" CREATE TABLE IF NOT EXISTS {DORIS_TARGET_TABLE} ( user_id INT COMMENT "用户ID", item_id INT COMMENT "商品ID", dt VARCHAR(20) COMMENT "日期分区", behavior VARCHAR(50) COMMENT "行为类型", ts DATETIME(3) COMMENT "时间戳" ) UNIQUE KEY(user_id, item_id, dt) PARTITION BY LIST(dt) ( PARTITION p20251212 VALUES IN ("2025-12-12"), PARTITION p20251213 VALUES IN ("2025-12-13") ) DISTRIBUTED BY HASH(user_id) BUCKETS 1 PROPERTIES ( "replication_num" = "1", "enable_unique_key_merge_on_write" = "true", "store_row_column" = "true" ); """ # "store_row_column" = "true" 是 4.0 的特性,用于优化点查 cursor.execute(create_sql) cursor.execute(f"TRUNCATE TABLE {DORIS_TARGET_TABLE}") logger.info(f"Doris 4.0 内表 {DORIS_TARGET_TABLE} 已准备就绪") @measure_time def execute_etl_paimon_to_doris(cursor): """执行 INSERT INTO ... SELECT ...""" logger.info(">>> 开始执行从 Paimon 到 Doris 4.0 的数据导入 (ETL) <<<") # 显式指定字段顺序,防止 select * 顺序不一致 etl_sql = f""" INSERT INTO internal.{DORIS_TEST_DB}.{DORIS_TARGET_TABLE} (user_id, item_id, dt, behavior, ts) SELECT user_id, item_id, dt, behavior, ts FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE} """ cursor.execute(etl_sql) logger.info("ETL SQL 提交完毕") @measure_time def verify_data_consistency(cursor): """验证数据一致性""" logger.info(">>> 开始数据一致性校验 <<<") # 1. Paimon 源数据量 cursor.execute(f"SELECT count(*) as cnt FROM {PAIMON_CATALOG_NAME}.{PAIMON_DB}.{PAIMON_TABLE}") paimon_count = cursor.fetchone()['cnt'] # 2. Doris 目标数据量 cursor.execute(f"SELECT count(*) as cnt FROM internal.{DORIS_TEST_DB}.{DORIS_TARGET_TABLE}") doris_count = cursor.fetchone()['cnt'] logger.info(f"Paimon 源表行数: {paimon_count}") logger.info(f"Doris 目标表行数: {doris_count}") if paimon_count == doris_count: logger.info("✅ 测试通过:数据条数一致!") else: logger.error("❌ 测试失败:数据条数不一致!") # 3. 抽样展示 cursor.execute(f"SELECT * FROM internal.{DORIS_TEST_DB}.{DORIS_TARGET_TABLE} ORDER BY dt, user_id LIMIT 3") rows = cursor.fetchall() logger.info("Doris 内表数据抽样:") for row in rows: logger.info(row) # ================= 主流程 ================= def main_process(): server = None conn = None try: logger.info(">>> 1. 正在建立 SSH 隧道 (连接至 Doris 4.0) ...") # 建立隧道:本地 -> SSH(10.8.15.157) -> Doris FE(127.0.0.1:9030) server = SSHTunnelForwarder( (SSH_HOST, SSH_PORT), ssh_username=SSH_USER, ssh_password=SSH_PASSWORD, remote_bind_address=(DORIS_LOCAL_HOST, DORIS_QUERY_PORT) ) server.start() logger.info(f">>> SSH 隧道建立成功! 本地端口: {server.local_bind_port}") logger.info(">>> 2. 连接 Doris 数据库 ...") conn = pymysql.connect( host='127.0.0.1', port=server.local_bind_port, user=DORIS_DB_USER, password=DORIS_DB_PWD, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, autocommit=True ) cursor = conn.cursor() # --- 测试步骤 --- # 步骤 1: 初始化 Catalog init_doris_catalog(cursor) # 步骤 2: 确认源端 Paimon 可读 check_paimon_source(cursor) # 步骤 3: 准备 Doris 4.0 目标表 create_doris_target_table(cursor) # 步骤 4: 执行导入 execute_etl_paimon_to_doris(cursor) # Doris 4.0 导入通常非常快,但仍建议稍微sleep time.sleep(2) # 步骤 5: 校验结果 verify_data_consistency(cursor) logger.info(">>> Doris 4.0 与 Paimon 集成测试全部完成 <<<") except Exception as e: logger.error(f"主流程发生错误: {e}") import traceback logger.error(traceback.format_exc()) finally: if conn: conn.close() logger.info("数据库连接已关闭") if server: server.stop() logger.info("SSH 隧道已关闭") if __name__ == "__main__": main_process()

对应的log文件内容如下:

2025-12-12 15:55:44,709 - INFO - >>> 1. 正在建立 SSH 隧道 (连接至 Doris 4.0) ... 2025-12-12 15:55:45,144 - INFO - >>> SSH 隧道建立成功! 本地端口: 60651 2025-12-12 15:55:45,144 - INFO - >>> 2. 连接 Doris 数据库 ... 2025-12-12 15:55:45,353 - INFO - 正在执行: [init_doris_catalog] ... 2025-12-12 15:55:45,354 - INFO - 正在初始化 Doris Catalog: paimon_catalog ... 2025-12-12 15:55:45,359 - INFO - 发送 Create Catalog 请求... 2025-12-12 15:55:45,437 - INFO - Catalog 创建成功! 2025-12-12 15:55:47,438 - INFO - 执行完成: [init_doris_catalog] | 耗时: 2.0848 秒 2025-12-12 15:55:47,444 - INFO - 正在执行: [check_paimon_source] ... 2025-12-12 15:55:47,445 - INFO - 检查 Paimon 数据源: paimon_catalog.ods.paimon_source_event 2025-12-12 15:55:48,296 - INFO - Paimon 数据预览 (前5条): 2025-12-12 15:55:48,296 - INFO - {'user_id': 1001, 'item_id': 501, 'behavior': 'click', 'dt': '2025-12-12', 'ts': datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)} 2025-12-12 15:55:48,296 - INFO - {'user_id': 1002, 'item_id': 502, 'behavior': 'view', 'dt': '2025-12-12', 'ts': datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)} 2025-12-12 15:55:48,296 - INFO - {'user_id': 1003, 'item_id': 501, 'behavior': 'buy', 'dt': '2025-12-12', 'ts': datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)} 2025-12-12 15:55:48,296 - INFO - {'user_id': 1001, 'item_id': 503, 'behavior': 'view', 'dt': '2025-12-13', 'ts': datetime.datetime(2025, 12, 13, 11, 0)} 2025-12-12 15:55:48,297 - INFO - {'user_id': 1004, 'item_id': 501, 'behavior': 'click', 'dt': '2025-12-13', 'ts': datetime.datetime(2025, 12, 13, 11, 5)} 2025-12-12 15:55:48,297 - INFO - 执行完成: [check_paimon_source] | 耗时: 0.8523 秒 2025-12-12 15:55:48,297 - INFO - 正在执行: [create_doris_target_table] ... 2025-12-12 15:55:48,345 - INFO - Doris 4.0 内表 doris4_target_event_sink 已准备就绪 2025-12-12 15:55:48,345 - INFO - 执行完成: [create_doris_target_table] | 耗时: 0.0479 秒 2025-12-12 15:55:48,345 - INFO - 正在执行: [execute_etl_paimon_to_doris] ... 2025-12-12 15:55:48,345 - INFO - >>> 开始执行从 Paimon 到 Doris 4.0 的数据导入 (ETL) <<< 2025-12-12 15:55:48,515 - INFO - ETL SQL 提交完毕 2025-12-12 15:55:48,516 - INFO - 执行完成: [execute_etl_paimon_to_doris] | 耗时: 0.1707 秒 2025-12-12 15:55:50,517 - INFO - 正在执行: [verify_data_consistency] ... 2025-12-12 15:55:50,517 - INFO - >>> 开始数据一致性校验 <<< 2025-12-12 15:55:51,382 - INFO - Paimon 源表行数: 5 2025-12-12 15:55:51,384 - INFO - Doris 目标表行数: 5 2025-12-12 15:55:51,385 - INFO - ✅ 测试通过:数据条数一致! 2025-12-12 15:55:51,438 - INFO - Doris 内表数据抽样: 2025-12-12 15:55:51,438 - INFO - {'user_id': 1001, 'item_id': 501, 'dt': '2025-12-12', 'behavior': 'click', 'ts': datetime.datetime(2025, 12, 12, 10, 0, 0, 123000)} 2025-12-12 15:55:51,438 - INFO - {'user_id': 1002, 'item_id': 502, 'dt': '2025-12-12', 'behavior': 'view', 'ts': datetime.datetime(2025, 12, 12, 10, 5, 0, 456000)} 2025-12-12 15:55:51,439 - INFO - {'user_id': 1003, 'item_id': 501, 'dt': '2025-12-12', 'behavior': 'buy', 'ts': datetime.datetime(2025, 12, 12, 10, 10, 0, 789000)} 2025-12-12 15:55:51,439 - INFO - 执行完成: [verify_data_consistency] | 耗时: 0.9222 秒 2025-12-12 15:55:51,439 - INFO - >>> Doris 4.0 与 Paimon 集成测试全部完成 <<< 2025-12-12 15:55:51,440 - INFO - 数据库连接已关闭 2025-12-12 15:55:51,444 - INFO - SSH 隧道已关闭

9.3 验证数据

去Doris终端验证数据结果如下:

mysql -h 10.x.xx.157 -P 9030 -uroot

执行下述sql

SWITCH internal; SHOW DATABASES; USE doris4_paimon_test_db; SHOW TABLES;

SELECT * FROM doris4_target_event_sink ORDER BY user_id;

也可以在同一个查询窗口中直接对比两边的数量(不需要反复 SWITCH):

-- 这里的 internal 和 paimon_catalog 是 Catalog 名称 SELECT (SELECT count(*) FROM internal.doris4_paimon_test_db.doris4_target_event_sink) as doris_count, (SELECT count(*) FROM paimon_catalog.ods.paimon_source_event) as paimon_count;

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2025/12/13 15:38:13

NCL画图完全指南:快速掌握数据可视化核心技能

NCL画图完全指南&#xff1a;快速掌握数据可视化核心技能 【免费下载链接】NCL画图入门教程 探索NCL画图的奇妙世界&#xff01;本教程为您提供详尽的《NCL画图个例讲解.pdf》&#xff0c;助您快速入门并掌握NCL画图的核心技能。无论您是初学者还是希望提升技能&#xff0c;本教…

作者头像 李华
网站建设 2025/12/13 15:37:53

Java开发者的终极武器:JDK 1.8 API中文文档完全指南 [特殊字符]

Java开发者的终极武器&#xff1a;JDK 1.8 API中文文档完全指南 &#x1f680; 【免费下载链接】JAVAJDK1.8API中文文档高清完整版CHM分享7cdd1 本仓库提供了一份完整的 JAVA JDK 1.8 API 中文文档&#xff0c;采用 CHM 格式&#xff0c;方便 Java 开发者查阅和使用。该文档包含…

作者头像 李华
网站建设 2025/12/19 6:20:56

31、XUL 组件与控件全解析

XUL 组件与控件全解析 在 XUL(XML User Interface Language)的世界里,有着丰富多样的组件和控件,它们各自承担着不同的功能,为构建用户界面提供了强大的支持。下面将详细介绍一些常见的 XUL 组件及其特性。 1. 标签页相关组件 tabs :作为标签元素的容器,通常呈现为一…

作者头像 李华
网站建设 2025/12/13 15:37:32

OpenUSD终极入门指南:从零开始构建3D场景的完整教程

OpenUSD终极入门指南&#xff1a;从零开始构建3D场景的完整教程 【免费下载链接】OpenUSD Universal Scene Description 项目地址: https://gitcode.com/GitHub_Trending/ope/OpenUSD Universal Scene Description&#xff08;USD&#xff09;是皮克斯动画工作室开发的高…

作者头像 李华