架构之非结构化数据存储
引言
在数字化时代,数据呈现出爆炸式增长,其中非结构化数据占据了绝大部分比例。从社交媒体的用户生成内容、物联网设备的传感器数据,到企业的文档管理系统,非结构化数据无处不在。非结构化数据存储架构法则强调:对于非强事务的数据操作,或者非结构化的文档数据,使用NoSQL数据库能够提供更好的性能、扩展性和灵活性。
传统的关系型数据库在处理结构化数据方面表现出色,但在面对非结构化数据时往往显得力不从心。NoSQL数据库通过其灵活的数据模型、水平扩展能力和高性能特点,为非结构化数据存储提供了理想的解决方案。
非结构化数据存储的核心理念
为什么需要NoSQL?
NoSQL数据库能够有效解决上述挑战:
- 灵活的数据模型:无需预定义模式,支持动态schema
- 水平扩展能力:通过增加节点实现线性扩展
- 高性能表现:针对特定场景优化的存储引擎
- 成本效益:使用普通硬件,降低总体拥有成本
- 高可用性:内置复制和故障转移机制
NoSQL数据库分类
文档数据库架构
MongoDB核心原理
MongoDB作为最流行的文档数据库,采用BSON(Binary JSON)格式存储数据,支持复杂的嵌套文档结构。
// MongoDB文档模型设计@Document(collection="users")publicclassUserDocument{@IdprivateStringid;@Indexed(unique=true)privateStringusername;@IndexedprivateStringemail;privateProfileprofile;privateList<Address>addresses;privateList<OrderSummary>orderHistory;privatePreferencespreferences;@CreatedDateprivateDatecreatedAt;@LastModifiedDateprivateDateupdatedAt;// 嵌套文档示例@Data@BuilderpublicstaticclassProfile{privateStringfirstName;privateStringlastName;privateDatedateOfBirth;privateStringavatar;privateMap<String,Object>metadata;}@Data@BuilderpublicstaticclassAddress{privateStringtype;// home, work, shippingprivateStringstreet;privateStringcity;privateStringstate;privateStringzipCode;privateStringcountry;privatebooleanisDefault;}}// MongoDB Repository实现@RepositorypublicinterfaceUserRepositoryextendsMongoRepository<UserDocument,String>{// 基于索引的高效查询Optional<UserDocument>findByUsername(Stringusername);Optional<UserDocument>findByEmail(Stringemail);// 复杂查询@Query("{ 'profile.firstName' : ?0, 'profile.lastName' : ?1 }")List<UserDocument>findByFullName(StringfirstName,StringlastName);// 地理空间查询@Query("{ 'addresses.city' : ?0 }")List<UserDocument>findByCity(Stringcity);// 聚合查询@Aggregation({"{ $match: { 'orderHistory.status': 'COMPLETED' } }","{ $unwind: '$orderHistory' }","{ $group: { _id: '$username', totalSpent: { $sum: '$orderHistory.totalAmount' } } }","{ $sort: { totalSpent: -1 } }","{ $limit: 10 }"})List<UserSpending>findTopSpendingUsers();}// MongoDB服务层实现@Service@Slf4jpublicclassUserDocumentService{@AutowiredprivateUserRepositoryuserRepository;@AutowiredprivateMongoTemplatemongoTemplate;/** * 创建用户文档 */publicUserDocumentcreateUser(CreateUserRequestrequest){UserDocumentuser=UserDocument.builder().username(request.getUsername()).email(request.getEmail()).profile(UserDocument.Profile.builder().firstName(request.getFirstName()).lastName(request.getLastName()).dateOfBirth(request.getDateOfBirth()).metadata(newHashMap<>()).build()).addresses(newArrayList<>()).orderHistory(newArrayList<>()).preferences(newPreferences()).build();UserDocumentsavedUser=userRepository.save(user);log.info("用户文档创建成功: {}",savedUser.getId());returnsavedUser;}/** * 添加用户地址 */publicUserDocumentaddAddress(StringuserId,AddressRequestrequest){UserDocumentuser=userRepository.findById(userId).orElseThrow(()->newUserNotFoundException(userId));UserDocument.Addressaddress=UserDocument.Address.builder().type(request.getType()).street(request.getStreet()).city(request.getCity()).state(request.getState()).zipCode(request.getZipCode()).country(request.getCountry()).isDefault(request.isDefault()).build();// 如果设置为默认地址,清除其他地址的默认标记if(request.isDefault()){user.getAddresses().forEach(addr->addr.setDefault(false));}user.getAddresses().add(address);UserDocumentupdatedUser=userRepository.save(user);log.info("用户地址添加成功: {}, 地址类型: {}",userId,request.getType());returnupdatedUser;}/** * 更新用户偏好设置 */publicUserDocumentupdatePreferences(StringuserId,Map<String,Object>preferences){Queryquery=newQuery(Criteria.where("_id").is(userId));Updateupdate=newUpdate().set("preferences",preferences);FindAndModifyOptionsoptions=FindAndModifyOptions.options().returnNew(true).upsert(false);UserDocumentupdatedUser=mongoTemplate.findAndModify(query,update,options,UserDocument.class);log.info("用户偏好设置更新成功: {}",userId);returnupdatedUser;}/** * 复杂聚合查询:用户购买行为分析 */publicList<UserBehaviorAnalysis>analyzeUserBehavior(DatestartDate,DateendDate){Aggregationaggregation=Aggregation.newAggregation(Aggregation.match(Criteria.where("orderHistory.orderDate").gte(startDate).lte(endDate)),Aggregation.unwind("orderHistory"),Aggregation.match(Criteria.where("orderHistory.status").is("COMPLETED")),Aggregation.group("username").sum("orderHistory.totalAmount").as("totalSpent").avg("orderHistory.totalAmount").as("avgOrderValue").count().as("orderCount"),Aggregation.sort(Sort.Direction.DESC,"totalSpent"),Aggregation.limit(100));returnmongoTemplate.aggregate(aggregation,"users",UserBehaviorAnalysis.class).getMappedResults();}/** * 文本搜索:用户搜索功能 */publicList<UserDocument>searchUsers(StringsearchText){TextCriteriacriteria=TextCriteria.forDefaultLanguage().matchingAny(searchText);Queryquery=TextQuery.queryText(criteria).sortByScore().limit(20);returnmongoTemplate.find(query,UserDocument.class);}/** * 地理空间查询:查找附近用户 */publicList<UserDocument>findNearbyUsers(doublelongitude,doublelatitude,doublemaxDistance){Pointpoint=newPoint(longitude,latitude);Distancedistance=newDistance(maxDistance,Metrics.KILOMETERS);// 假设用户文档中有地理位置字段Queryquery=newQuery(Criteria.where("location").near(point).maxDistance(distance));returnmongoTemplate.find(query,UserDocument.class);}}MongoDB适用场景
场景1:内容管理系统
典型特征:
- 文档结构复杂:文章内容包含富文本、多媒体、元数据
- 读写比例均衡:读操作为主,写操作频繁
- 查询模式多样:支持全文搜索、标签过滤、时间范围查询
- 数据增长快速:内容数量快速增长,需要水平扩展
技术优势:
- 灵活的文档结构,无需预定义模式
- 强大的查询语言,支持复杂条件查询
- 内置全文搜索功能,支持文本索引
- 自动分片支持,易于水平扩展
// 内容管理系统实现@Document(collection="articles")publicclassArticle{@IdprivateStringid;@IndexedprivateStringtitle;@TextIndexedprivateStringcontent;@IndexedprivateList<String>tags;@IndexedprivateStringauthorId;privateMap<String,Object>metadata;privateList<Media>attachments;privateList<Comment>comments;@CreatedDateprivateDatecreatedAt;@LastModifiedDateprivateDateupdatedAt;}@ServicepublicclassContentManagementService{@AutowiredprivateArticleRepositoryarticleRepository;/** * 创建文章 */publicArticlecreateArticle(CreateArticleRequestrequest){Articlearticle=Article.builder().title(request.getTitle()).content(request.getContent()).tags(request.getTags()).authorId(request.getAuthorId()).metadata(request.getMetadata()).attachments(request.getAttachments()).comments(newArrayList<>()).build();returnarticleRepository.save(article);}/** * 全文搜索文章 */publicList<Article>searchArticles(Stringkeyword,List<String>tags,DatestartDate,DateendDate){Criteriacriteria=newCriteria();// 全文搜索if(StringUtils.hasText(keyword)){criteria.and("content").regex(keyword,"i");}// 标签过滤if(tags!=null&&!tags.isEmpty()){criteria.and("tags").in(tags);}// 时间范围if(startDate!=null&&endDate!=null){criteria.and("createdAt").gte(startDate).lte(endDate);}Queryquery=Query.query(criteria).with(Sort.by(Sort.Direction.DESC,"createdAt")).limit(50);returnmongoTemplate.find(query,Article.class);}}场景2:实时分析系统
典型特征:
- 数据写入密集:大量实时数据需要快速写入
- 数据结构多变:不同来源的数据结构差异大
- 聚合查询频繁:需要实时统计和分析
- 时间序列数据:数据按时间顺序产生
技术优势:
- 高写入性能,支持批量插入
- 灵活的聚合框架,支持复杂分析
- 时间序列数据优化,支持TTL索引
- 副本集支持,保证数据安全
// 实时分析系统实现@Document(collection="events")publicclassAnalyticsEvent{@IdprivateStringid;@IndexedprivateStringeventType;@IndexedprivateStringuserId;@IndexedprivateDatetimestamp;privateMap<String,Object>properties;privateMap<String,Object>context;@CreatedDateprivateDatecreatedAt;}@ServicepublicclassAnalyticsService{@AutowiredprivateAnalyticsEventRepositoryeventRepository;@AutowiredprivateMongoTemplatemongoTemplate;/** * 批量记录事件 */publicvoidrecordEvents(List<EventRequest>eventRequests){List<AnalyticsEvent>events=eventRequests.stream().map(request->AnalyticsEvent.builder().eventType(request.getEventType()).userId(request.getUserId()).timestamp(request.getTimestamp()).properties(request.getProperties()).context(request.getContext()).build()).collect(Collectors.toList());eventRepository.saveAll(events);log.info("批量记录 {} 个事件",events.size());}/** * 实时统计分析 */publicAnalyticsSummarygetRealtimeAnalytics(DatestartTime,DateendTime){Aggregationaggregation=Aggregation.newAggregation(Aggregation.match(Criteria.where("timestamp").gte(startTime).lte(endTime)),Aggregation.group("eventType").count().as("count").sum("properties.revenue").as("totalRevenue"),Aggregation.project("count","totalRevenue").and("_id").as("eventType"));List<EventStats>eventStats=mongoTemplate.aggregate(aggregation,"events",EventStats.class).getMappedResults();returnAnalyticsSummary.builder().timeRange(newTimeRange(startTime,endTime)).eventStats(eventStats).totalEvents(eventStats.stream().mapToLong(EventStats::getCount).sum()).totalRevenue(eventStats.stream().mapToDouble(EventStats::getTotalRevenue).sum()).build();}}键值数据库架构
Redis核心原理
Redis作为高性能的键值数据库,支持丰富的数据结构和原子操作。
// Redis缓存架构实现@Configuration@EnableCachingpublicclassRedisConfig{@BeanpublicRedisTemplate<String,Object>redisTemplate(RedisConnectionFactoryconnectionFactory){RedisTemplate<String,Object>template=newRedisTemplate<>();template.setConnectionFactory(connectionFactory);// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值Jackson2JsonRedisSerializer<Object>serializer=newJackson2JsonRedisSerializer<>(Object.class);ObjectMappermapper=newObjectMapper();mapper.setVisibility(PropertyAccessor.ALL,JsonAutoDetect.Visibility.ANY);mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL);serializer.setObjectMapper(mapper);template.setValueSerializer(serializer);template.setKeySerializer(newStringRedisSerializer());template.afterPropertiesSet();returntemplate;}@BeanpublicCacheManagercacheManager(RedisConnectionFactoryconnectionFactory){RedisCacheConfigurationconfig=RedisCacheConfiguration.defaultCacheConfig().entryTtl(Duration.ofMinutes(30)).disableCachingNullValues().serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(newStringRedisSerializer())).serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(newGenericJackson2JsonRedisSerializer()));returnRedisCacheManager.builder(connectionFactory).cacheDefaults(config).build();}}// Redis缓存服务实现@Service@Slf4jpublicclassRedisCacheService{@AutowiredprivateRedisTemplate<String,Object>redisTemplate;@AutowiredprivateStringRedisTemplatestringRedisTemplate;/** * 缓存用户会话信息 */publicvoidcacheUserSession(StringsessionId,UserSessionsession,Durationttl){try{Stringkey="session:"+sessionId;redisTemplate.opsForValue().set(key,session,ttl);log.info("用户会话缓存成功: {}, TTL: {}秒",sessionId,ttl.getSeconds());}catch(Exceptione){log.error("缓存用户会话失败: {}",sessionId,e);}}/** * 获取用户会话 */publicUserSessiongetUserSession(StringsessionId){try{Stringkey="session:"+sessionId;UserSessionsession=(UserSession)redisTemplate.opsForValue().get(key);if(session!=null){log.debug("用户会话获取成功: {}",sessionId);}returnsession;}catch(Exceptione){log.error("获取用户会话失败: {}",sessionId,e);returnnull;}}/** * 实现分布式锁 */publicbooleanacquireLock(StringlockKey,StringrequestId,Durationttl){StringluaScript="if redis.call('exists', KEYS[1]) == 0 then "+" redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2]) "+" return 1 "+"elseif redis.call('get', KEYS[1]) == ARGV[1] then "+" redis.call('pexpire', KEYS[1], ARGV[2]) "+" return 1 "+"else "+" return 0 "+"end";try{Booleanresult=stringRedisTemplate.execute(newDefaultRedisScript<>(luaScript,Boolean.class),Collections.singletonList(lockKey),requestId,String.valueOf(ttl.toMillis()));booleanacquired=Boolean.TRUE.equals(result);if(acquired){log.debug("分布式锁获取成功: {}, requestId: {}",lockKey,requestId);}returnacquired;}catch(Exceptione){log.error("获取分布式锁失败: {}",lockKey,e);returnfalse;}}/** * 释放分布式锁 */publicbooleanreleaseLock(StringlockKey,StringrequestId){StringluaScript="if redis.call('get', KEYS[1]) == ARGV[1] then "+" return redis.call('del', KEYS[1]) "+"else "+" return 0 "+"end";try{Longresult=stringRedisTemplate.execute(newDefaultRedisScript<>(luaScript,Long.class),Collections.singletonList(lockKey),requestId);booleanreleased=Long.valueOf(1L).equals(result);if(released){log.debug("分布式锁释放成功: {}, requestId: {}",lockKey,requestId);}returnreleased;}catch(Exceptione){log.error("释放分布式锁失败: {}",lockKey,e);returnfalse;}}/** * 计数器操作 */publiclongincrementCounter(StringcounterKey,longdelta){try{Longresult=redisTemplate.opsForValue().increment(counterKey,delta);log.debug("计数器更新成功: {}, delta: {}, result: {}",counterKey,delta,result);returnresult!=null?result:0;}catch(Exceptione){log.error("计数器更新失败: {}",counterKey,e);return0;}}/** * 排行榜实现 */publicvoidupdateLeaderboard(StringleaderboardKey,Stringmember,doublescore){try{redisTemplate.opsForZSet().add(leaderboardKey,member,score);log.debug("排行榜更新成功: {}, member: {}, score: {}",leaderboardKey,member,score);}catch(Exceptione){log.error("排行榜更新失败: {}",leaderboardKey,e);}}/** * 获取排行榜前N名 */publicList<LeaderboardEntry>getTopLeaderboard(StringleaderboardKey,inttopN){try{Set<ZSetOperations.TypedTuple<Object>>topMembers=redisTemplate.opsForZSet().reverseRangeWithScores(leaderboardKey,0,topN-1);List<LeaderboardEntry>entries=newArrayList<>();intrank=1;for(ZSetOperations.TypedTuple<Object>tuple:topMembers){entries.add(LeaderboardEntry.builder().rank(rank++).member((String)tuple.getValue()).score(tuple.getScore()).build());}returnentries;}catch(Exceptione){log.error("获取排行榜失败: {}",leaderboardKey,e);returnCollections.emptyList();}}/** * 布隆过滤器实现 */publicvoidaddToBloomFilter(StringbloomFilterKey,Stringvalue){try{// 使用多个hash函数int[]hashValues=getHashValues(value);for(inthash:hashValues){redisTemplate.opsForValue().setBit(bloomFilterKey,Math.abs(hash),true);}log.debug("布隆过滤器添加成功: {}, value: {}",bloomFilterKey,value);}catch(Exceptione){log.error("布隆过滤器添加失败: {}",bloomFilterKey,e);}}/** * 检查布隆过滤器 */publicbooleanmightContain(StringbloomFilterKey,Stringvalue){try{int[]hashValues=getHashValues(value);for(inthash:hashValues){Booleanbit=redisTemplate.opsForValue().getBit(bloomFilterKey,Math.abs(hash));if(bit==null||!bit){returnfalse;}}returntrue;}catch(Exceptione){log.error("布隆过滤器检查失败: {}",bloomFilterKey,e);returnfalse;}}privateint[]getHashValues(Stringvalue){// 使用多个不同的hash函数returnnewint[]{value.hashCode(),(value+"salt1").hashCode(),(value+"salt2").hashCode()};}}列族数据库架构
Cassandra核心原理
Cassandra作为分布式列族数据库,提供高可用性和线性扩展能力。
// Cassandra数据模型设计@Table("user_activities")publicclassUserActivity{@PrimaryKeyprivateUserActivityKeykey;@Column("activity_type")privateStringactivityType;@Column("activity_data")privateMap<String,Object>activityData;@Column("created_at")privateInstantcreatedAt;@PrimaryKeyClasspublicstaticclassUserActivityKeyimplementsSerializable{@PrimaryKeyColumn(name="user_id",ordinal=0,type=PrimaryKeyType.PARTITIONED)privateStringuserId;@PrimaryKeyColumn(name="activity_time",ordinal=1,type=PrimaryKeyType.CLUSTERED,ordering=Ordering.DESCENDING)privateInstantactivityTime;@PrimaryKeyColumn(name="activity_id",ordinal=2,type=PrimaryKeyType.CLUSTERED,ordering=Ordering.DESCENDING)privateUUIDactivityId;}}@Table("time_series_data")publicclassTimeSeriesData{@PrimaryKeyprivateTimeSeriesKeykey;@Column("value")privatedoublevalue;@Column("tags")privateMap<String,String>tags;@Column("quality")privateintquality;@PrimaryKeyClasspublicstaticclassTimeSeriesKeyimplementsSerializable{@PrimaryKeyColumn(name="metric_name",ordinal=0,type=PrimaryKeyType.PARTITIONED)privateStringmetricName;@PrimaryKeyColumn(name="timestamp_bucket",ordinal=1,type=PrimaryKeyType.PARTITIONED)privatelongtimestampBucket;// 小时级时间桶@PrimaryKeyColumn(name="timestamp",ordinal=2,type=PrimaryKeyType.CLUSTERED,ordering=Ordering.ASCENDING)privateInstanttimestamp;@PrimaryKeyColumn(name="sensor_id",ordinal=3,type=PrimaryKeyType.CLUSTERED,ordering=Ordering.ASCENDING)privateStringsensorId;}}// Cassandra服务实现@Service@Slf4jpublicclassCassandraService{@AutowiredprivateUserActivityRepositoryuserActivityRepository;@AutowiredprivateTimeSeriesDataRepositorytimeSeriesDataRepository;@AutowiredprivateCassandraTemplatecassandraTemplate;/** * 记录用户活动 */publicvoidrecordUserActivity(StringuserId,StringactivityType,Map<String,Object>activityData){try{UserActivityactivity=newUserActivity();UserActivity.UserActivityKeykey=newUserActivity.UserActivityKey();key.setUserId(userId);key.setActivityTime(Instant.now());key.setActivityId(UUID.randomUUID());activity.setKey(key);activity.setActivityType(activityType);activity.setActivityData(activityData);activity.setCreatedAt(Instant.now());userActivityRepository.save(activity);log.debug("用户活动记录成功: {}, 类型: {}",userId,activityType);}catch(Exceptione){log.error("记录用户活动失败: {}",userId,e);}}/** * 获取用户最近活动 */publicList<UserActivity>getRecentUserActivities(StringuserId,intlimit){try{// 使用Cassandra的查询优化InstantoneWeekAgo=Instant.now().minus(7,ChronoUnit.DAYS);Selectselect=QueryBuilder.select().from("user_activities").where(QueryBuilder.eq("user_id",userId)).and(QueryBuilder.gte("activity_time",oneWeekAgo)).limit(limit).orderBy(QueryBuilder.desc("activity_time")).orderBy(QueryBuilder.desc("activity_id"));returncassandraTemplate.select(select,UserActivity.class);}catch(Exceptione){log.error("获取用户活动失败: {}",userId,e);returnCollections.emptyList();}}/** * 批量写入时序数据 */publicvoidbatchInsertTimeSeriesData(List<TimeSeriesData>dataPoints){try{// 批量插入优化List<Insert>inserts=dataPoints.stream().map(data->{Insertinsert=QueryBuilder.insertInto("time_series_data").value("metric_name",data.getKey().getMetricName()).value("timestamp_bucket",data.getKey().getTimestampBucket()).value("timestamp",data.getKey().getTimestamp()).value("sensor_id",data.getKey().getSensorId()).value("value",data.getValue()).value("tags",data.getTags()).value("quality",data.getQuality());returninsert;}).collect(Collectors.toList());// 使用批量执行BatchStatementbatch=BatchStatement.newInstance(DefaultBatchType.UNLOGGED);inserts.forEach(batch::add);cassandraTemplate.getCqlOperations().execute(batch);log.debug("批量插入 {} 个时序数据点",dataPoints.size());}catch(Exceptione){log.error("批量插入时序数据失败",e);}}/** * 时序数据聚合查询 */publicList<TimeSeriesAggregate>aggregateTimeSeriesData(StringmetricName,InstantstartTime,InstantendTime,DurationaggregationWindow){try{// 计算时间桶longstartBucket=startTime.toEpochMilli()/aggregationWindow.toMillis();longendBucket=endTime.toEpochMilli()/aggregationWindow.toMillis();// 构建聚合查询Selectselect=QueryBuilder.select().column("metric_name").column("sensor_id").column("timestamp").column("value").from("time_series_data").where(QueryBuilder.eq("metric_name",metricName)).and(QueryBuilder.in("timestamp_bucket",generateTimeBuckets(startBucket,endBucket))).and(QueryBuilder.gte("timestamp",startTime)).and(QueryBuilder.lte("timestamp",endTime));List<TimeSeriesData>rawData=cassandraTemplate.select(select,TimeSeriesData.class);// 客户端聚合returnaggregateData(rawData,aggregationWindow);}catch(Exceptione){log.error("时序数据聚合查询失败",e);returnCollections.emptyList();}}privateList<Long>generateTimeBuckets(longstartBucket,longendBucket){List<Long>buckets=newArrayList<>();for(longbucket=startBucket;bucket<=endBucket;bucket++){buckets.add(bucket);}returnbuckets;}privateList<TimeSeriesAggregate>aggregateData(List<TimeSeriesData>rawData,DurationaggregationWindow){// 按传感器和时间窗口分组Map<String,Map<Long,List<TimeSeriesData>>>groupedData=rawData.stream().collect(Collectors.groupingBy(data->data.getKey().getSensorId(),Collectors.groupingBy(data->data.getKey().getTimestamp().toEpochMilli()/aggregationWindow.toMillis())));List<TimeSeriesAggregate>aggregates=newArrayList<>();groupedData.forEach((sensorId,timeGroups)->{timeGroups.forEach((timeWindow,dataPoints)->{if(!dataPoints.isEmpty()){doubleavg=dataPoints.stream().mapToDouble(TimeSeriesData::getValue).average().orElse(0.0);doublemax=dataPoints.stream().mapToDouble(TimeSeriesData::getValue).max().orElse(0.0);doublemin=dataPoints.stream().mapToDouble(TimeSeriesData::getValue).min().orElse(0.0);TimeSeriesAggregateaggregate=TimeSeriesAggregate.builder().sensorId(sensorId).timeWindow(Instant.ofEpochMilli(timeWindow*aggregationWindow.toMillis())).average(avg).maximum(max).minimum(min).count(dataPoints.size()).build();aggregates.add(aggregate);}});});returnaggregates;}}图数据库架构
Neo4j核心原理
Neo4j作为原生图数据库,使用节点和关系来存储数据,适合处理复杂的关系网络。
// Neo4j图数据模型@Node("User")publicclassUserNode{@Id@GeneratedValueprivateLongid;@Property("userId")privateStringuserId;@Property("username")privateStringusername;@Property("email")privateStringemail;@Property("createdAt")privateLocalDateTimecreatedAt;@Relationship(type="FOLLOWS",direction=Relationship.Direction.OUTGOING)privateSet<UserRelationship>following=newHashSet<>();@Relationship(type="FOLLOWS",direction=Relationship.Direction.INCOMING)privateSet<UserRelationship>followers=newHashSet<>();@Relationship(type="LIKES",direction=Relationship.Direction.OUTGOING)privateSet<ContentRelationship>likedContents=newHashSet<>();}@RelationshipPropertiespublicclassUserRelationship{@Id@GeneratedValueprivateLongid;@Property("createdAt")privateLocalDateTimecreatedAt;@TargetNodeprivateUserNodetargetUser;}@Node("Content")publicclassContentNode{@Id@GeneratedValueprivateLongid;@Property("contentId")privateStringcontentId;@Property("title")privateStringtitle;@Property("content")privateStringcontent;@Property("createdAt")privateLocalDateTimecreatedAt;@Relationship(type="AUTHORED",direction=Relationship.Direction.INCOMING)privateUserNodeauthor;@Relationship(type="LIKES",direction=Relationship.Direction.INCOMING)privateSet<UserRelationship>likedBy=newHashSet<>();@Relationship(type="TAGGED_WITH",direction=Relationship.Direction.OUTGOING)privateSet<TagRelationship>tags=newHashSet<>();}@Node("Tag")publicclassTagNode{@Id@GeneratedValueprivateLongid;@Property("name")privateStringname;@Property("popularity")privateintpopularity;}// Neo4j图数据库服务实现@Service@Slf4jpublicclassGraphDatabaseService{@AutowiredprivateUserNodeRepositoryuserNodeRepository;@AutowiredprivateContentNodeRepositorycontentNodeRepository;@AutowiredprivateNeo4jTemplateneo4jTemplate;/** * 创建用户关系 */publicvoidcreateUserRelationship(StringfollowerId,StringfollowingId){try{Stringquery="MATCH (follower:User {userId: $followerId}) "+"MATCH (following:User {userId: $followingId}) "+"MERGE (follower)-[r:FOLLOWS]->(following) "+"SET r.createdAt = $createdAt "+"RETURN r";Map<String,Object>parameters=Map.of("followerId",followerId,"followingId",followingId,"createdAt",LocalDateTime.now());neo4jTemplate.findAll(query,parameters);log.debug("用户关系创建成功: {} -> {}",followerId,followingId);}catch(Exceptione){log.error("创建用户关系失败: {} -> {}",followerId,followingId,e);}}/** * 查找共同关注 */publicList<UserNode>findCommonFollowings(StringuserId1,StringuserId2){try{Stringquery="MATCH (user1:User {userId: $userId1})-[:FOLLOWS]->(common:User)<-[:FOLLOWS]-(user2:User {userId: $userId2}) "+"RETURN common";Map<String,Object>parameters=Map.of("userId1",userId1,"userId2",userId2);returnneo4jTemplate.findAll(query,parameters,UserNode.class);}catch(Exceptione){log.error("查找共同关注失败: {} - {}",userId1,userId2,e);returnCollections.emptyList();}}/** * 推荐可能认识的人 */publicList<UserRecommendation>recommendPeopleYouMayKnow(StringuserId,intlimit){try{Stringquery="MATCH (user:User {userId: $userId})-[:FOLLOWS]->(following)-[:FOLLOWS]->(recommendation) "+"WHERE NOT (user)-[:FOLLOWS]->(recommendation) AND user <> recommendation "+"WITH recommendation, count(*) as mutualFriends "+"ORDER BY mutualFriends DESC "+"LIMIT $limit "+"RETURN recommendation.userId as userId, recommendation.username as username, mutualFriends";Map<String,Object>parameters=Map.of("userId",userId,"limit",limit);returnneo4jTemplate.findAll(query,parameters,UserRecommendation.class);}catch(Exceptione){log.error("推荐可能认识的人失败: {}",userId,e);returnCollections.emptyList();}}/** * 查找最短路径 */publicList<String>findShortestPath(StringstartUserId,StringendUserId,intmaxDepth){try{Stringquery="MATCH path = shortestPath((start:User {userId: $startUserId})-[*..$maxDepth]-(end:User {userId: $endUserId})) "+"WHERE ALL(rel in relationships(path) WHERE type(rel) = 'FOLLOWS') "+"RETURN [node in nodes(path) | node.userId] as userIds";Map<String,Object>parameters=Map.of("startUserId",startUserId,"endUserId",endUserId,"maxDepth",maxDepth);Resultresult=neo4jTemplate.getClient().query(query).bindAll(parameters).run();if(result.hasNext()){returnresult.next().get("userIds").asList(Value::asString);}returnCollections.emptyList();}catch(Exceptione){log.error("查找最短路径失败: {} -> {}",startUserId,endUserId,e);returnCollections.emptyList();}}/** * 内容推荐算法 */publicList<ContentRecommendation>recommendContentBasedOnGraph(StringuserId,intlimit){try{Stringquery="MATCH (user:User {userId: $userId})-[:FOLLOWS]->(following)-[:LIKES]->(content) "+"WHERE NOT (user)-[:LIKES]->(content) "+"WITH content, following, count(*) as likesFromFollowing "+"MATCH (content)-[:TAGGED_WITH]->(tag)<-[:TAGGED_WITH]-(userLiked:User)-[:LIKES]->(userContent:Content) "+"WHERE (user)-[:LIKES]->(userContent) "+"WITH content, likesFromFollowing, count(DISTINCT tag) as commonTags "+"RETURN content.contentId as contentId, content.title as title, "+" (likesFromFollowing * 0.7 + commonTags * 0.3) as score "+"ORDER BY score DESC "+"LIMIT $limit";Map<String,Object>parameters=Map.of("userId",userId,"limit",limit);returnneo4jTemplate.findAll(query,parameters,ContentRecommendation.class);}catch(Exceptione){log.error("内容推荐失败: {}",userId,e);returnCollections.emptyList();}}/** * 社群发现 */publicList<Community>detectCommunities(intminCommunitySize){try{Stringquery="CALL gds.louvain.stream('userGraph') "+"YIELD nodeId, communityId "+"WITH communityId, collect(gds.util.asNode(nodeId).userId) as members "+"WHERE size(members) >= $minCommunitySize "+"RETURN communityId, members, size(members) as memberCount "+"ORDER BY memberCount DESC";Map<String,Object>parameters=Map.of("minCommunitySize",minCommunitySize);returnneo4jTemplate.findAll(query,parameters,Community.class);}catch(Exceptione){log.error("社群发现失败",e);returnCollections.emptyList();}}}NoSQL选择策略
数据库选择矩阵
| 应用场景 | 推荐NoSQL类型 | 典型数据库 | 核心优势 | 注意事项 |
|---|---|---|---|---|
| 文档存储 | 文档数据库 | MongoDB、CouchDB | 灵活schema、查询强大 | 内存消耗较大 |
| 缓存会话 | 键值数据库 | Redis、Memcached | 极高性能、丰富数据结构 | 数据持久化限制 |
| 时序数据 | 列族/时序数据库 | Cassandra、InfluxDB | 高写入性能、压缩率高 | 查询灵活性有限 |
| 社交网络 | 图数据库 | Neo4j、ArangoDB | 关系查询高效 | 扩展性相对有限 |
| 日志数据 | 列族数据库 | Cassandra、HBase | 顺序写入、高吞吐量 | 随机读取性能一般 |
选择决策流程
选择合适的NoSQL数据库需要系统性的分析方法,以下是推荐的决策流程:
第一步:数据特征分析
数据结构复杂度评估:
- 高复杂度(嵌套对象、数组、多态结构)→ 推荐文档数据库
- 中等复杂度(扁平化但有关系)→ 推荐列族数据库
- 低复杂度(简单键值对)→ 推荐键值数据库
- 关系复杂(图结构、网络关系)→ 推荐图数据库
数据量评估:
- TB级以下:MongoDB、Redis、Neo4j
- TB-PB级:Cassandra、HBase、分布式MongoDB
- PB级以上:Cassandra、HBase、定制解决方案
第二步:访问模式分析
读写比例:
- 读多写少(>80%读):CouchDB、MongoDB(带索引优化)
- 读写均衡(50%-80%读):MongoDB、Neo4j
- 写多读少(<50%读):Cassandra、InfluxDB
查询复杂度:
- 简单查询(主键查询):Redis、DynamoDB
- 中等复杂度(范围查询、聚合):MongoDB、Cassandra
- 复杂查询(多条件、全文搜索):MongoDB、Elasticsearch
- 关系查询(图遍历):Neo4j、ArangoDB
第三步:性能要求评估
延迟要求:
- 毫秒级延迟:Redis(内存)、ScyllaDB(C++实现)
- 亚秒级延迟:MongoDB、Cassandra
- 秒级延迟可接受:HBase、传统方案
吞吐量要求:
- 10万QPS以下:MongoDB、Redis、Neo4j
- 10万-100万QPS:Cassandra、分布式Redis
- 100万QPS以上:Cassandra集群、定制方案
第四步:综合决策矩阵
| 场景特征 | 首选方案 | 备选方案 | 关键考虑因素 |
|---|---|---|---|
| 社交媒体内容 | MongoDB | CouchDB | 灵活schema、全文搜索 |
| 用户会话缓存 | Redis | Memcached | 内存存储、TTL支持 |
| 实时推荐 | Neo4j | ArangoDB | 图算法、关系查询 |
| IoT时序数据 | InfluxDB | Cassandra | 高写入、数据压缩 |
| 电商购物车 | Redis | MongoDB | 高性能、持久化 |
| 日志分析 | Cassandra | HBase | 顺序写入、水平扩展 |
| 地理位置 | MongoDB | PostGIS | 地理索引、2D/2DSphere |
第五步:技术约束检查
一致性要求:
- 强一致性:MongoDB(单文档)、关系型数据库
- 最终一致性:Cassandra、DynamoDB
- 可调一致性:Cassandra、Riak
可用性要求:
- 99.9%可用性:MongoDB副本集、Redis哨兵
- 99.99%可用性:Cassandra多数据中心、分布式架构
- 99.999%可用性:多区域部署、混合方案
运维复杂度:
- 低运维:MongoDB Atlas、Redis Cloud
- 中等运维:自建MongoDB、Redis集群
- 高运维:Cassandra、HBase、定制方案
第六步:成本效益分析
开发成本:
- 学习曲线平缓:MongoDB、Redis
- 需要专门技能:Cassandra、Neo4j、HBase
运维成本:
- 云托管:AWS DocumentDB、Azure Cosmos DB
- 自建集群:需要专业DBA团队
- 混合模式:核心自建+边缘云托管
硬件成本:
- 内存密集型:Redis、高性能MongoDB
- 存储密集型:Cassandra、HBase
- 计算密集型:图数据库、分析型场景
通过这套系统化的决策流程,可以科学地选择最适合业务场景的NoSQL数据库,避免技术选型的盲目性。
NoSQL最佳实践
数据建模原则
// NoSQL数据建模最佳实践@ComponentpublicclassNoSQLDataModelingBestPractices{privatestaticfinalLoggerlog=LoggerFactory.getLogger(NoSQLDataModelingBestPractices.class);/** * 原则1:根据查询需求设计数据模型 */publicvoiddemonstrateQueryDrivenModeling(){log.info("=== 查询驱动的数据建模 ===");// 不好的做法:按照关系模型设计// User表、Order表、Product表分开存储,需要多次查询// 好的做法:根据查询需求反规范化// 将经常一起查询的数据放在同一个文档中@DocumentclassOrderDocument{@IdprivateStringorderId;// 嵌入用户基本信息,避免额外查询privateUserSummaryuserInfo;// 嵌入订单项详情privateList<OrderItem>items;// 嵌入配送信息privateShippingInfoshippingInfo;// 状态历史记录privateList<StatusHistory>statusHistory;}log.info("查询驱动的数据模型设计完成");}/** * 原则2:合理使用嵌入和引用 */publicvoiddemonstrateEmbeddingVsReferencing(){log.info("=== 嵌入vs引用的选择 ===");// 适合嵌入的场景:// 1. 数据经常一起查询// 2. 嵌入数据不会单独更新// 3. 嵌入数据量较小@DocumentclassProductWithEmbeddedReviews{@IdprivateStringproductId;// 嵌入最近的几条评论@DBRef(lazy=true)privateList<Review>recentReviews;// 评论统计信息嵌入privateReviewStatisticsreviewStats;}// 适合引用的场景:// 1. 数据需要独立查询// 2. 数据会被多个文档引用// 3. 数据经常更新@DocumentclassUserProfile{@IdprivateStringuserId;// 引用用户设置,因为设置可能被多个地方使用@DBRefprivateUserSettingssettings;// 引用用户权限,因为权限管理是独立的@DBRefprivateList<Permission>permissions;}log.info("嵌入和引用的选择策略应用完成");}/** * 原则3:处理数据一致性 */publicvoiddemonstrateDataConsistency(){log.info("=== 数据一致性处理 ===");// 最终一致性模式@ServiceclassEventuallyConsistentService{@AutowiredprivateOrderRepositoryorderRepository;@AutowiredprivateInventoryServiceinventoryService;@AutowiredprivateEventPublishereventPublisher;/** * 使用事件驱动保证最终一致性 */@TransactionalpublicvoidcreateOrder(CreateOrderRequestrequest){// 1. 创建订单Orderorder=createOrderDocument(request);orderRepository.save(order);// 2. 发布订单创建事件OrderCreatedEventevent=newOrderCreatedEvent(order.getOrderId(),order.getItems());eventPublisher.publishEvent(event);// 3. 库存服务异步处理事件// inventoryService.handleOrderCreated(event);}/** * 使用补偿事务处理失败情况 */@EventListenerpublicvoidhandleInventoryUpdateFailed(InventoryUpdateFailedEventevent){log.error("库存更新失败,执行补偿操作: {}",event.getOrderId());// 取消订单cancelOrder(event.getOrderId());// 恢复用户积分restoreUserPoints(event.getUserId(),event.getPointsUsed());// 发送通知notifyUserOrderCancelled(event.getUserId(),event.getOrderId());}}log.info("数据一致性处理策略应用完成");}/** * 原则4:索引优化策略 */publicvoiddemonstrateIndexingStrategy(){log.info("=== 索引优化策略 ===");@Document@CompoundIndex(name="user_time_idx",def="{'userId': 1, 'createdAt': -1}")@CompoundIndex(name="status_priority_idx",def="{'status': 1, 'priority': -1}")classOptimizedDocument{@IdprivateStringid;@IndexedprivateStringuserId;@IndexedprivateStringstatus;@Indexedprivateintpriority;@IndexedprivateLocalDateTimecreatedAt;// 文本搜索索引@TextIndexedprivateStringcontent;// 地理空间索引@GeoSpatialIndexed(type=GeoSpatialIndexType.GEO_2DSPHERE)privatedouble[]location;}log.info("索引优化策略应用完成");}/** * 原则5:分片和分区策略 */publicvoiddemonstrateShardingStrategy(){log.info("=== 分片和分区策略 ===");// 时间范围分区@DocumentclassTimePartitionedDocument{@IdprivateStringid;@IndexedprivateStringpartitionKey;// 格式: yyyy-MM@IndexedprivateLocalDateTimetimestamp;privateMap<String,Object>data;/** * 根据时间戳生成分区键 */publicstaticStringgeneratePartitionKey(LocalDateTimetimestamp){returntimestamp.format(DateTimeFormatter.ofPattern("yyyy-MM"));}}// 哈希分片@DocumentclassHashShardedDocument{@IdprivateStringid;@IndexedprivateintshardKey;// 0-999的哈希值privateStringdata;/** * 根据ID生成哈希分片键 */publicstaticintgenerateShardKey(Stringid){returnMath.abs(id.hashCode()%1000);}}log.info("分片和分区策略应用完成");}}性能优化策略
// NoSQL性能优化@ComponentpublicclassNoSQLPerformanceOptimization{privatestaticfinalLoggerlog=LoggerFactory.getLogger(NoSQLPerformanceOptimization.class);/** * 连接池优化 */@ConfigurationpublicclassConnectionPoolOptimization{@Bean@ConfigurationProperties("mongodb.connection-pool")publicMongoClientSettingsmongoClientSettings(){returnMongoClientSettings.builder().applyToConnectionPoolSettings(builder->builder.minSize(10).maxSize(100).maxWaitTime(10,TimeUnit.SECONDS).maxConnectionIdleTime(60,TimeUnit.SECONDS).maxConnectionLifeTime(300,TimeUnit.SECONDS)).build();}@BeanpublicJedisPoolConfigjedisPoolConfig(){JedisPoolConfigconfig=newJedisPoolConfig();config.setMaxTotal(100);config.setMaxIdle(50);config.setMinIdle(10);config.setTestOnBorrow(true);config.setTestOnReturn(true);config.setTestWhileIdle(true);config.setNumTestsPerEvictionRun(10);config.setTimeBetweenEvictionRunsMillis(60000);returnconfig;}}/** * 批量操作优化 */publicvoiddemonstrateBatchOperations(){log.info("=== 批量操作优化 ===");@ServiceclassBatchOperationService{@AutowiredprivateMongoTemplatemongoTemplate;/** * 批量插入优化 */publicvoidbatchInsert(List<Document>documents){// 分批处理,避免单次操作过大intbatchSize=1000;List<List<Document>>batches=Lists.partition(documents,batchSize);for(List<Document>batch:batches){mongoTemplate.insert(batch,Document.class);}}/** * 批量更新优化 */publicvoidbatchUpdate(List<DocumentUpdate>updates){BulkOperationsbulkOps=mongoTemplate.bulkOps(BulkOperations.BulkMode.UNORDERED,Document.class);for(DocumentUpdateupdate:updates){Queryquery=newQuery(Criteria.where("_id").is(update.getId()));UpdatemongoUpdate=newUpdate();update.getFields().forEach(mongoUpdate::set);bulkOps.updateOne(query,mongoUpdate);}BulkWriteResultresult=bulkOps.execute();log.info("批量更新完成: 成功 {}, 失败 {}",result.getModifiedCount(),result.getMatchedCount()-result.getModifiedCount());}}log.info("批量操作优化策略应用完成");}/** * 查询优化策略 */publicvoiddemonstrateQueryOptimization(){log.info("=== 查询优化策略 ===");@ServiceclassQueryOptimizationService{@AutowiredprivateMongoTemplatemongoTemplate;/** * 使用投影减少数据传输 */publicList<DocumentSummary>findDocumentSummaries(Stringcriteria){Queryquery=newQuery();query.addCriteria(Criteria.where("status").is("active"));// 只返回需要的字段query.fields().include("id","title","status","createdAt").exclude("content","metadata");returnmongoTemplate.find(query,DocumentSummary.class);}/** * 使用游标处理大数据集 */publicvoidprocessLargeDataset(Stringcriteria){Queryquery=newQuery();query.addCriteria(Criteria.where("criteria").is(criteria));try(CloseableIterator<Document>cursor=mongoTemplate.stream(query,Document.class)){while(cursor.hasNext()){Documentdoc=cursor.next();// 处理每个文档processDocument(doc);}}catch(Exceptione){log.error("处理大数据集失败",e);}}/** * 使用覆盖索引查询 */publicList<String>findIdsByStatus(Stringstatus){Queryquery=newQuery();query.addCriteria(Criteria.where("status").is(status));// 确保查询只使用索引字段query.fields().include("id");returnmongoTemplate.find(query,Document.class).stream().map(Document::getId).collect(Collectors.toList());}privatevoidprocessDocument(Documentdoc){// 文档处理逻辑log.debug("处理文档: {}",doc.getId());}}log.info("查询优化策略应用完成");}/** * 缓存策略优化 */publicvoiddemonstrateCachingStrategy(){log.info("=== 缓存策略优化 ===");@ServiceclassCachingOptimizationService{@AutowiredprivateRedisTemplate<String,Object>redisTemplate;@AutowiredprivateCaffeineCachelocalCache;/** * 多级缓存策略 */@Cacheable(value="documents",key="#id",unless="#result == null")publicDocumentgetDocumentWithCache(Stringid){// L1: 方法级缓存(Caffeine)// L2: Redis缓存检查StringcacheKey="document:"+id;DocumentcachedDoc=(Document)redisTemplate.opsForValue().get(cacheKey);if(cachedDoc!=null){returncachedDoc;}// L3: 数据库查询Documentdoc=findDocumentInDatabase(id);// 写入Redis缓存if(doc!=null){redisTemplate.opsForValue().set(cacheKey,doc,Duration.ofMinutes(30));}returndoc;}/** * 缓存预热策略 */@Scheduled(cron="0 0 6 * * *")// 每天6点执行publicvoidpreheatCache(){log.info("开始缓存预热");// 预热热门数据List<String>hotDocumentIds=findHotDocumentIds();for(StringdocId:hotDocumentIds){Documentdoc=findDocumentInDatabase(docId);if(doc!=null){StringcacheKey="document:"+docId;redisTemplate.opsForValue().set(cacheKey,doc,Duration.ofHours(1));}}log.info("缓存预热完成,预热了 {} 个文档",hotDocumentIds.size());}/** * 缓存穿透保护 */publicDocumentgetDocumentWithProtection(Stringid){StringcacheKey="document:"+id;StringnullKey="document:null:"+id;// 检查空值缓存BooleanisNull=redisTemplate.hasKey(nullKey);if(Boolean.TRUE.equals(isNull)){returnnull;}// 查询缓存Documentdoc=(Document)redisTemplate.opsForValue().get(cacheKey);if(doc!=null){returndoc;}// 查询数据库doc=findDocumentInDatabase(id);if(doc!=null){// 写入正常缓存redisTemplate.opsForValue().set(cacheKey,doc,Duration.ofMinutes(30));}else{// 写入空值缓存,防止缓存穿透redisTemplate.opsForValue().set(nullKey,"1",Duration.ofMinutes(5));}returndoc;}privateDocumentfindDocumentInDatabase(Stringid){// 数据库查询逻辑returnnewDocument();// 模拟实现}privateList<String>findHotDocumentIds(){// 查找热门文档ID逻辑returnCollections.emptyList();// 模拟实现}}log.info("缓存策略优化应用完成");}}非结构化数据存储实践案例
案例1:社交媒体平台
// 社交媒体平台NoSQL架构实现@Service@Slf4jpublicclassSocialMediaPlatformService{@AutowiredprivateMongoTemplatemongoTemplate;@AutowiredprivateRedisTemplate<String,Object>redisTemplate;@AutowiredprivateCassandraTemplatecassandraTemplate;/** * 发布动态 */publicPostcreatePost(CreatePostRequestrequest){// 1. 存储到MongoDB(主存储)Postpost=Post.builder().postId(UUID.randomUUID().toString()).userId(request.getUserId()).content(request.getContent()).mediaAttachments(request.getMediaAttachments()).tags(request.getTags()).location(request.getLocation()).privacySettings(request.getPrivacySettings()).createdAt(Instant.now()).updatedAt(Instant.now()).build();mongoTemplate.save(post);// 2. 写入用户时间线(Cassandra)writeToUserTimeline(post);// 3. 推送到粉丝Feed(Redis)pushToFollowersFeed(post);// 4. 更新相关统计数据updatePostStatistics(post);log.info("动态发布成功: {}, 用户: {}",post.getPostId(),request.getUserId());returnpost;}/** * 获取用户Feed */publicList<Post>getUserFeed(StringuserId,intpage,intsize){// 1. 从Redis获取Feed缓存StringfeedCacheKey="user:feed:"+userId+":"+page;List<Post>cachedFeed=(List<Post>)redisTemplate.opsForValue().get(feedCacheKey);if(cachedFeed!=null){returncachedFeed;}// 2. 从Cassandra获取时间线List<String>postIds=getTimelinePostIds(userId,page,size);// 3. 从MongoDB获取详细内容List<Post>posts=getPostsByIds(postIds);// 4. 缓存结果redisTemplate.opsForValue().set(feedCacheKey,posts,Duration.ofMinutes(5));returnposts;}/** * 搜索相关内容 */publicSearchResultssearchContent(Stringkeyword,SearchFiltersfilters){// 1. 使用MongoDB全文搜索TextCriteriacriteria=TextCriteria.forDefaultLanguage().matchingAny(keyword);Queryquery=TextQuery.queryText(criteria).addCriteria(buildFilterCriteria(filters)).sortByScore().limit(filters.getLimit());List<Post>posts=mongoTemplate.find(query,Post.class);// 2. 获取搜索结果统计longtotalCount=mongoTemplate.count(query,Post.class);returnSearchResults.builder().posts(posts).totalCount(totalCount).keyword(keyword).build();}/** * 获取热门话题 */publicList<TrendingTopic>getTrendingTopics(intlimit){// 使用Redis实时计算StringtrendingKey="trending:topics";// 获取最近24小时的热门标签ZSetOperations<String,Object>zSetOps=redisTemplate.opsForZSet();Set<ZSetOperations.TypedTuple<Object>>trendingTags=zSetOps.reverseRangeWithScores(trendingKey,0,limit-1);returntrendingTags.stream().map(tuple->TrendingTopic.builder().tag(tuple.getValue().toString()).score(tuple.getScore()).build()).collect(Collectors.toList());}/** * 用户行为分析 */publicUserAnalyticsgetUserAnalytics(StringuserId,TimeRangetimeRange){// 1. 从Cassandra获取行为数据List<UserActivity>activities=getUserActivities(userId,timeRange);// 2. 使用MongoDB聚合分析Aggregationaggregation=Aggregation.newAggregation(Aggregation.match(Criteria.where("userId").is(userId).and("timestamp").gte(timeRange.getStart()).lte(timeRange.getEnd())),Aggregation.group("activityType").count().as("count").sum("engagementScore").as("totalEngagement"),Aggregation.sort(Sort.Direction.DESC,"count"));List<ActivitySummary>activitySummaries=mongoTemplate.aggregate(aggregation,"user_activities",ActivitySummary.class).getMappedResults();returnUserAnalytics.builder().userId(userId).timeRange(timeRange).activitySummaries(activitySummaries).totalActivities(activities.size()).build();}privatevoidwriteToUserTimeline(Postpost){try{UserTimelinetimeline=UserTimeline.builder().userId(post.getUserId()).postId(post.getPostId()).timestamp(post.getCreatedAt()).build();cassandraTemplate.insert(timeline);}catch(Exceptione){log.error("写入用户时间线失败: {}",post.getPostId(),e);}}privatevoidpushToFollowersFeed(Postpost){try{// 获取用户粉丝列表List<String>followerIds=getFollowerIds(post.getUserId());// 推送到每个粉丝的Feedfor(StringfollowerId:followerIds){StringfeedKey="user:feed:"+followerId;redisTemplate.opsForList().leftPush(feedKey,post.getPostId());// 限制Feed长度redisTemplate.opsForList().trim(feedKey,0,999);// 设置过期时间redisTemplate.expire(feedKey,Duration.ofDays(7));}}catch(Exceptione){log.error("推送到粉丝Feed失败: {}",post.getPostId(),e);}}privateList<String>getFollowerIds(StringuserId){// 从图数据库获取粉丝列表// 这里简化实现,实际应该使用Neo4j等图数据库returnCollections.emptyList();}privateList<String>getTimelinePostIds(StringuserId,intpage,intsize){// 从Cassandra获取时间线Stringquery="SELECT post_id FROM user_timeline WHERE user_id = ? "+"ORDER BY timestamp DESC LIMIT ?";// 实现分页逻辑returnCollections.emptyList();// 简化实现}privateList<Post>getPostsByIds(List<String>postIds){if(postIds.isEmpty()){returnCollections.emptyList();}Queryquery=newQuery(Criteria.where("postId").in(postIds));returnmongoTemplate.find(query,Post.class);}privateCriteriabuildFilterCriteria(SearchFiltersfilters){Criteriacriteria=newCriteria();if(filters.getUserId()!=null){criteria.and("userId").is(filters.getUserId());}if(filters.getStartDate()!=null&&filters.getEndDate()!=null){criteria.and("createdAt").gte(filters.getStartDate()).lte(filters.getEndDate());}if(filters.getTags()!=null&&!filters.getTags().isEmpty()){criteria.and("tags").in(filters.getTags());}returncriteria;}privateList<UserActivity>getUserActivities(StringuserId,TimeRangetimeRange){// 从Cassandra获取用户活动数据Selectselect=QueryBuilder.select().from("user_activities").where(QueryBuilder.eq("user_id",userId)).and(QueryBuilder.gte("timestamp",timeRange.getStart())).and(QueryBuilder.lte("timestamp",timeRange.getEnd()));returncassandraTemplate.select(select,UserActivity.class);}privatevoidupdatePostStatistics(Postpost){// 更新用户发帖统计StringstatsKey="user:stats:"+post.getUserId();redisTemplate.opsForHash().increment(statsKey,"totalPosts",1);// 更新标签热度if(post.getTags()!=null){for(Stringtag:post.getTags()){StringtagKey="tag:popularity:"+tag;redisTemplate.opsForZSet().incrementScore("trending:topics",tag,1);}}}}案例2:物联网数据处理平台
// 物联网数据处理平台@Service@Slf4jpublicclassIoTDataProcessingService{@AutowiredprivateCassandraTemplatecassandraTemplate;@AutowiredprivateMongoTemplatemongoTemplate;@AutowiredprivateInfluxDBinfluxDB;/** * 处理传感器数据 */publicvoidprocessSensorData(SensorDatadata){// 1. 写入时序数据库(InfluxDB)- 实时监控writeToInfluxDB(data);// 2. 写入Cassandra - 历史数据存储writeToCassandra(data);// 3. 异常检测detectAnomalies(data);// 4. 实时告警checkAlerts(data);log.debug("传感器数据处理完成: {}, 值: {}",data.getSensorId(),data.getValue());}/** * 批量处理历史数据 */publicvoidbatchProcessHistoricalData(StringsensorId,InstantstartTime,InstantendTime){// 1. 从Cassandra读取历史数据List<SensorData>historicalData=getHistoricalData(sensorId,startTime,endTime);// 2. 数据清洗和预处理List<SensorData>cleanedData=cleanData(historicalData);// 3. 存储到MongoDB用于分析storeForAnalysis(cleanedData);// 4. 生成数据质量报告generateDataQualityReport(cleanedData);log.info("批量处理历史数据完成: {}, 数据量: {}",sensorId,cleanedData.size());}/** * 实时数据分析 */publicRealtimeAnalysisperformRealtimeAnalysis(StringsensorGroup,Durationwindow){// 1. 从InfluxDB获取实时数据Stringquery=String.format("SELECT MEAN(value) as avg_value, MAX(value) as max_value, MIN(value) as min_value "+"FROM sensor_data "+"WHERE sensor_group = '%s' AND time >= now() - %s "+"GROUP BY sensor_id",sensorGroup,window);QueryResultqueryResult=influxDB.query(newQuery(query,"iot_data"));// 2. 处理查询结果List<AnalysisResult>results=processInfluxDBResult(queryResult);// 3. 计算统计指标Statisticsstats=calculateStatistics(results);returnRealtimeAnalysis.builder().sensorGroup(sensorGroup).timeWindow(window).results(results).statistics(stats).timestamp(Instant.now()).build();}/** * 预测性维护 */publicMaintenancePredictionpredictMaintenance(StringsensorId){// 1. 获取设备历史数据List<SensorData>historicalData=getHistoricalData(sensorId,Instant.now().minus(30,ChronoUnit.DAYS),Instant.now());// 2. 特征提取Map<String,Double>features=extractFeatures(historicalData);// 3. 使用机器学习模型预测doublefailureProbability=predictFailureProbability(features);// 4. 生成维护建议MaintenanceRecommendationrecommendation=generateMaintenanceRecommendation(failureProbability);returnMaintenancePrediction.builder().sensorId(sensorId).failureProbability(failureProbability).recommendation(recommendation).predictionDate(Instant.now()).build();}/** * 数据聚合和降采样 */publicvoidaggregateAndDownsample(){// 1. 小时级聚合aggregateHourlyData();// 2. 日级聚合aggregateDailyData();// 3. 清理过期原始数据cleanupOldData();log.info("数据聚合和降采样完成");}privatevoidwriteToInfluxDB(SensorDatadata){try{Pointpoint=Point.measurement("sensor_data").time(data.getTimestamp().toEpochMilli(),TimeUnit.MILLISECONDS).tag("sensor_id",data.getSensorId()).tag("sensor_type",data.getSensorType()).tag("location",data.getLocation()).addField("value",data.getValue()).addField("quality",data.getQuality()).build();influxDB.write("iot_data","autogen",point);}catch(Exceptione){log.error("写入InfluxDB失败: {}",data.getSensorId(),e);}}privatevoidwriteToCassandra(SensorDatadata){try{TimeSeriesDatatimeSeriesData=TimeSeriesData.builder().key(TimeSeriesData.TimeSeriesKey.builder().metricName("sensor_reading").timestampBucket(data.getTimestamp().toEpochMilli()/3600000)// 小时桶.timestamp(data.getTimestamp()).sensorId(data.getSensorId()).build()).value(data.getValue()).tags(Map.of("sensor_type",data.getSensorType(),"location",data.getLocation())).quality(data.getQuality()).build();cassandraTemplate.insert(timeSeriesData);}catch(Exceptione){log.error("写入Cassandra失败: {}",data.getSensorId(),e);}}privatevoiddetectAnomalies(SensorDatadata){// 基于规则的异常检测if(data.getValue()<0||data.getValue()>100){log.warn("检测到异常值: sensorId={}, value={}",data.getSensorId(),data.getValue());// 记录异常事件AnomalyEventevent=AnomalyEvent.builder().sensorId(data.getSensorId()).anomalyType("VALUE_OUT_OF_RANGE").detectedValue(data.getValue()).expectedRange("0-100").timestamp(Instant.now()).build();mongoTemplate.save(event);}}privatevoidcheckAlerts(SensorDatadata){// 检查是否触发告警条件StringalertKey="alert:threshold:"+data.getSensorId();Doublethreshold=(Double)redisTemplate.opsForValue().get(alertKey);if(threshold!=null&&data.getValue()>threshold){// 触发告警Alertalert=Alert.builder().sensorId(data.getSensorId()).alertType("THRESHOLD_EXCEEDED").message("传感器值超过阈值: "+data.getValue()+" > "+threshold).severity("HIGH").timestamp(Instant.now()).build();// 发送告警通知sendAlertNotification(alert);}}privateList<SensorData>getHistoricalData(StringsensorId,InstantstartTime,InstantendTime){Selectselect=QueryBuilder.select().from("time_series_data").where(QueryBuilder.eq("metric_name","sensor_reading")).and(QueryBuilder.eq("sensor_id",sensorId)).and(QueryBuilder.gte("timestamp",startTime)).and(QueryBuilder.lte("timestamp",endTime));returncassandraTemplate.select(select,SensorData.class);}privateList<SensorData>cleanData(List<SensorData>data){returndata.stream().filter(d->d.getQuality()>0.8)// 质量分数过滤.filter(d->d.getValue()>=0&&d.getValue()<=100)// 异常值过滤.collect(Collectors.toList());}privatevoidstoreForAnalysis(List<SensorData>data){// 存储到MongoDB用于后续分析mongoTemplate.insert(data,"sensor_data_analysis");}privatevoidgenerateDataQualityReport(List<SensorData>data){if(data.isEmpty()){return;}longtotalCount=data.size();longvalidCount=data.stream().filter(d->d.getQuality()>0.8).count();doublecompleteness=(double)validCount/totalCount;doubleavgValue=data.stream().mapToDouble(SensorData::getValue).average().orElse(0.0);doublestdDev=calculateStandardDeviation(data);DataQualityReportreport=DataQualityReport.builder().sensorId(data.get(0).getSensorId()).totalRecords(totalCount).validRecords(validCount).completeness(completeness).averageValue(avgValue).standardDeviation(stdDev).generatedAt(Instant.now()).build();mongoTemplate.save(report);}privateList<AnalysisResult>processInfluxDBResult(QueryResultqueryResult){// 处理InfluxDB查询结果returnCollections.emptyList();// 简化实现}privateStatisticscalculateStatistics(List<AnalysisResult>results){// 计算统计指标returnStatistics.builder().count(results.size()).average(results.stream().mapToDouble(AnalysisResult::getAverage).average().orElse(0.0)).maximum(results.stream().mapToDouble(AnalysisResult::getMaximum).max().orElse(0.0)).minimum(results.stream().mapToDouble(AnalysisResult::getMinimum).min().orElse(0.0)).build();}privateMap<String,Double>extractFeatures(List<SensorData>data){// 特征提取逻辑Map<String,Double>features=newHashMap<>();doubleavg=data.stream().mapToDouble(SensorData::getValue).average().orElse(0.0);doublevariance=calculateVariance(data);doubletrend=calculateTrend(data);features.put("average",avg);features.put("variance",variance);features.put("trend",trend);returnfeatures;}privatedoublepredictFailureProbability(Map<String,Double>features){// 简化的预测模型doubleavg=features.getOrDefault("average",0.0);doublevariance=features.getOrDefault("variance",0.0);doubletrend=features.getOrDefault("trend",0.0);// 简单的线性模型returnMath.min(1.0,Math.max(0.0,0.3*(avg/100.0)+0.4*(variance/100.0)+0.3*Math.abs(trend)));}privateMaintenanceRecommendationgenerateMaintenanceRecommendation(doublefailureProbability){if(failureProbability>0.8){returnMaintenanceRecommendation.builder().priority("HIGH").action("立即安排维护").description("设备故障概率很高,建议立即停机检查").build();}elseif(failureProbability>0.5){returnMaintenanceRecommendation.builder().priority("MEDIUM").action("计划维护").description("设备存在故障风险,建议在一周内安排维护").build();}else{returnMaintenanceRecommendation.builder().priority("LOW").action("继续监控").description("设备状态正常,继续常规监控").build();}}privatevoidaggregateHourlyData(){// 小时级数据聚合逻辑log.info("执行小时级数据聚合");}privatevoidaggregateDailyData(){// 日级数据聚合逻辑log.info("执行日级数据聚合");}privatevoidcleanupOldData(){// 清理过期数据逻辑log.info("清理过期原始数据");}privatevoidsendAlertNotification(Alertalert){// 发送告警通知逻辑log.warn("发送告警通知: {}",alert.getMessage());}privatedoublecalculateStandardDeviation(List<SensorData>data){doubleavg=data.stream().mapToDouble(SensorData::getValue).average().orElse(0.0);doublevariance=data.stream().mapToDouble(d->Math.pow(d.getValue()-avg,2)).average().orElse(0.0);returnMath.sqrt(variance);}privatedoublecalculateVariance(List<SensorData>data){doubleavg=data.stream().mapToDouble(SensorData::getValue).average().orElse(0.0);returndata.stream().mapToDouble(d->Math.pow(d.getValue()-avg,2)).average().orElse(0.0);}privatedoublecalculateTrend(List<SensorData>data){// 简单的线性趋势计算if(data.size()<2)return0.0;double[]values=data.stream().mapToDouble(SensorData::getValue).toArray();intn=values.length;doublesumX=0,sumY=0,sumXY=0,sumX2=0;for(inti=0;i<n;i++){sumX+=i;sumY+=values[i];sumXY+=i*values[i];sumX2+=i*i;}return(n*sumXY-sumX*sumY)/(n*sumX2-sumX*sumX);}}总结
非结构化数据存储架构法则为现代应用提供了处理海量、多样化数据的有效方案。通过合理选择和使用NoSQL数据库,我们能够:
核心原则
- 数据结构匹配:根据数据的结构特征选择最适合的NoSQL类型
- 访问模式优化:基于读写比例和查询模式优化存储方案
- 性能需求平衡:在性能、扩展性和一致性之间找到最佳平衡
- 成本效益考虑:综合考虑硬件成本、运维成本和开发成本
关键技术
- 文档数据库(MongoDB):适合复杂嵌套数据结构,提供强大的查询能力
- 键值数据库(Redis):提供极高的性能和丰富的数据结构,适合缓存和会话存储
- 列族数据库(Cassandra):适合时序数据和写密集型应用,支持线性扩展
- 图数据库(Neo4j):擅长处理复杂关系网络,提供高效的关系查询
- 数据建模优化:根据查询需求设计数据模型,合理使用嵌入和引用
成功要素
- 深入理解业务:分析数据特征、访问模式和性能要求
- 科学选择技术:基于实际场景选择最适合的NoSQL解决方案
- 合理数据建模:遵循查询驱动的设计原则,优化数据结构
- 性能监控优化:建立完善的监控体系,持续优化性能
- 容量规划管理:提前规划系统容量,支持业务增长
非结构化数据存储不是对传统关系型数据库的替代,而是对其的有力补充。通过合理的技术选型和架构设计,我们能够构建出既满足当前需求,又具备未来扩展性的数据存储解决方案,为业务创新提供强有力的技术支撑。
非结构化数据存储架构的核心在于:理解数据的本质特征,选择最适合的存储技术,通过合理的数据建模和性能优化,实现数据价值最大化。