数据同步方案详解
本章导读
数据同步是分布式系统中保证数据一致性的核心机制,无论是数据库主从复制、多数据中心同步,还是异构数据源集成,都离不开可靠的数据同步方案。本章将深入探讨同步复制、异步复制、增量同步、双向同步等关键技术。
学习目标:
- 目标1:掌握同步复制和异步复制的原理与实现
- 目标2:理解增量同步和变更数据捕获(CDC)技术
- 目标3:能够设计双向同步方案并处理数据冲突
前置知识:熟悉强一致性和最终一致性概念,了解数据库复制原理
阅读时长:约 40 分钟
一、知识概述
在分布式系统中,数据同步是保证数据一致性的核心机制。无论是主从复制、多数据中心同步,还是异构数据源集成,都需要可靠的数据同步方案。本文将深入探讨各种数据同步策略,包括同步/异步复制、增量同步、双向同步等技术。
数据同步的挑战
- 网络延迟:跨地域同步的网络延迟影响系统性能
- 数据冲突:并发更新导致的数据冲突问题
- 一致性保证:保证源和目标数据的最终一致
- 容错处理:处理网络故障、系统故障等异常情况
二、核心同步方案
2.1 同步复制
原理说明
同步复制要求主节点在提交事务前,将数据同步到从节点:
- 主节点接收写请求
- 主节点将数据发送到从节点
- 等待从节点确认
- 主节点提交事务
Java 实现
importjava.util.*;importjava.util.concurrent.*;importjava.util.concurrent.atomic.*;/** * 同步复制管理器 */publicclassSyncReplicationManager{privatefinalStringnodeId;privatefinalList<ReplicaNode>replicas;privatefinalExecutorServiceexecutor;privatefinalintwriteQuorum;privatefinalinttimeout;// 本地数据存储privatefinalMap<String,Object>dataStore;privatefinalMap<String,Long>versionStore;privatefinalAtomicLongcurrentVersion;publicSyncReplicationManager(StringnodeId,List<ReplicaNode>replicas,intwriteQuorum,inttimeoutMs){this.nodeId=nodeId;this.replicas=replicas;this.executor=Executors.newFixedThreadPool(replicas.size());this.writeQuorum=writeQuorum;this.timeout=timeoutMs;this.dataStore=newConcurrentHashMap<>();this.versionStore=newConcurrentHashMap<>();this.currentVersion=newAtomicLong(0);}/** * 写入数据(同步复制) */publicWriteResultwrite(Stringkey,Objectvalue){longversion=currentVersion.incrementAndGet();WriteRequestrequest=newWriteRequest(UUID.randomUUID().toString(),key,value,version,System.currentTimeMillis());// 1. 本地写入dataStore.put(key,value);versionStore.put(key,version);// 2. 同步到从节点AtomicIntegersuccessCount=newAtomicInteger(1);// 本地算一个CountDownLatchlatch=newCountDownLatch(replicas.size());for(ReplicaNodereplica:replicas){executor.submit(()->{try{booleansuccess=replica.replicate(request);if(success){successCount.incrementAndGet();}}catch(Exceptione){System.err.println("[SyncReplication] Replication failed to "+replica.getNodeId()+": "+e.getMessage());}finally{latch.countDown();}});}// 3. 等待仲裁确认try{booleancompleted=latch.await(timeout,TimeUnit.MILLISECONDS);if(!completed){// 超时,回滚dataStore.remove(key);versionStore.remove(key);returnWriteResult.timeout("Replication timeout");}if(successCount.get()<writeQuorum){// 未达到仲裁,回滚dataStore.remove(key);versionStore.remove(key);returnWriteResult.failure("Insufficient replicas, got "+successCount.get()+", need "+writeQuorum);}returnWriteResult.success(version);}catch(InterruptedExceptione){Thread.currentThread().interrupt();returnWriteResult.failure("Interrupted");}}/** * 读取数据 */publicReadResultread(Stringkey){Objectvalue=dataStore.get(key);Longversion=versionStore.get(key);if(value==null){returnReadResult.notFound();}returnReadResult.success(value,version);}/** * 接收复制请求(从节点视角) */publicbooleanreceiveReplication(WriteRequestrequest){try{// 检查版本LongcurrentVersion=versionStore.get(request.key);if(currentVersion!=null&¤tVersion>=request.version){// 版本过旧,忽略returntrue;}// 写入数据dataStore.put(request.key,request.value);versionStore.put(request.key,request.version);System.out.println("[Node-"+nodeId+"] Received replication: "+request.key+" v"+request.version);returntrue;}catch(Exceptione){returnfalse;}}/** * 获取当前状态 */publicReplicationStatusgetStatus(){returnnewReplicationStatus(nodeId,dataStore.size(),currentVersion.get(),replicas.stream().map(ReplicaNode::getNodeId).toList());}publicvoidshutdown(){executor.shutdown();}}/** * 副本节点 */interfaceReplicaNode{StringgetNodeId();booleanreplicate(WriteRequestrequest);}/** * 远程副本节点实现 */classRemoteReplicaNodeimplementsReplicaNode{privatefinalStringnodeId;privatefinalStringaddress;publicRemoteReplicaNode(StringnodeId,Stringaddress){this.nodeId=nodeId;this.address=address;}@OverridepublicStringgetNodeId(){returnnodeId;}@Overridepublicbooleanreplicate(WriteRequestrequest){// 模拟远程调用// 实际中通过 RPC/HTTP 调用System.out.println("[Remote-"+nodeId+"] Replicating "+request.key+" v"+request.version);returntrue;}}/** * 本地副本节点(用于测试) */classLocalReplicaNodeimplementsReplicaNode{privatefinalStringnodeId;privatefinalSyncReplicationManagerreplicaManager;publicLocalReplicaNode(StringnodeId,SyncReplicationManagerreplicaManager){this.nodeId=nodeId;this.replicaManager=replicaManager;}@OverridepublicStringgetNodeId(){returnnodeId;}@Overridepublicbooleanreplicate(WriteRequestrequest){returnreplicaManager.receiveReplication(request);}}// 辅助类recordWriteRequest(StringrequestId,Stringkey,Objectvalue,longversion,longtimestamp){}recordWriteResult(booleansuccess,Stringmessage,longversion){staticWriteResultsuccess(longversion){returnnewWriteResult(true,"Success",version);}staticWriteResultfailure(Stringmessage){returnnewWriteResult(false,message,-1);}staticWriteResulttimeout(Stringmessage){returnnewWriteResult(false,message,-1);}}recordReadResult(booleanfound,Objectvalue,Longversion){staticReadResultsuccess(Objectvalue,Longversion){returnnewReadResult(true,value,version);}staticReadResultnotFound(){returnnewReadResult(false,null,null);}}recordReplicationStatus(StringnodeId,intdataSize,longversion,List<String>replicas){}// 同步复制演示publicclassSyncReplicationDemo{publicstaticvoidmain(String[]args){// 创建副本节点SyncReplicationManagerreplica1=newSyncReplicationManager("replica-1",Collections.emptyList(),1,5000);SyncReplicationManagerreplica2=newSyncReplicationManager("replica-2",Collections.emptyList(),1,5000);SyncReplicationManagerreplica3=newSyncReplicationManager("replica-3",Collections.emptyList(),1,5000);// 创建主节点List<ReplicaNode>replicas=Arrays.asList(newLocalReplicaNode("replica-1",replica1),newLocalReplicaNode("replica-2",replica2),newLocalReplicaNode("replica-3",replica3));SyncReplicationManagerprimary=newSyncReplicationManager("primary",replicas,2,5000);// W=2,至少2个节点确认// 写入数据System.out.println("=== Writing data ===");WriteResultresult1=primary.write("user:001","张三");System.out.println("Write result: "+result1);WriteResultresult2=primary.write("user:002","李四");System.out.println("Write result: "+result2);// 读取数据System.out.println("\n=== Reading data ===");System.out.println("Primary: "+primary.read("user:001"));System.out.println("Replica1: "+replica1.read("user:001"));System.out.println("Replica2: "+replica2.read("user:001"));// 关闭primary.shutdown();replica1.shutdown();replica2.shutdown();replica3.shutdown();}}2.2 异步复制
原理说明
异步复制在主节点提交事务后,异步地将数据同步到从节点:
- 主节点接收写请求并提交
- 立即返回成功响应
- 后台异步同步到从节点
Java 实现
importjava.util.*;importjava.util.concurrent.*;/** * 异步复制管理器 */publicclassAsyncReplicationManager{privatefinalStringnodeId;privatefinalList<ReplicaNode>replicas;privatefinalExecutorServiceexecutor;privatefinalBlockingQueue<WriteRequest>replicationQueue;privatefinalScheduledExecutorServicescheduler;// 本地数据存储privatefinalMap<String,Object>dataStore;privatefinalMap<String,Long>versionStore;privatefinalAtomicLongcurrentVersion;// 复制状态privatefinalMap<String,Long>replicaPositions;privatevolatilebooleanrunning;publicAsyncReplicationManager(StringnodeId,List<ReplicaNode>replicas){this.nodeId=nodeId;this.replicas=replicas;this.executor=Executors.newFixedThreadPool(replicas.size());this.replicationQueue=newLinkedBlockingQueue<>(10000);this.scheduler=Executors.newScheduledThreadPool(2);this.dataStore=