news 2026/4/21 17:14:24

6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

6.8 Elasticsearch-写插件:RestHandler、ActionPlugin、ClusterPlugin 全套模板

(基于 8.11 源码,可直接拷贝到org.example.es包下跑通)


0. 目标

给出一个“开箱即用”的 Maven 模块,一次性把下面三件事全部做完:

  1. 暴露自定义 REST 端点(RestHandler)。
  2. 注册 TransportAction,让协调节点→数据节点走内部 RPC(ActionPlugin)。
  3. 在集群状态里持久化自己的配置(ClusterPluginPersistentTasksExecutor)。

代码全部单文件即可编译,无额外依赖(除org.elasticsearch.plugin:elasticsearch8.11.0)。


1. 模块骨架
es-write-plugin ├── pom.xml └── src └── main ├── java │ └── org │ └── example │ └── es │ ├── WritePlugin.java │ ├── RestWriteAction.java │ ├── WriteTransportAction.java │ ├── WriteClusterService.java │ └── WritePersistentTaskExecutor.java └── resources └── META-INF └── plugin-descriptor.properties

pom.xml 关键片段

<properties><elasticsearch.version>8.11.0</elasticsearch.version></properties><dependencies><dependency><groupId>org.elasticsearch.plugin</groupId><artifactId>elasticsearch</artifactId><version>${elasticsearch.version}</version><scope>provided</scope></dependency></dependencies>

plugin-descriptor.properties

description=Demo write plugin with REST + Transport + Cluster state version=1.0.0 name=write-plugin classname=org.example.es.WritePlugin java.version=17 elasticsearch.version=8.11.0

2. 统一入口:WritePlugin.java
publicclassWritePluginextendsPluginimplementsActionPlugin,ClusterPlugin{@OverridepublicList<RestHandler>getRestHandlers(Settingssettings,RestControllerrestController,ClusterSettingsclusterSettings,IndexScopedSettingsindexScopedSettings,SettingsFiltersettingsFilter,IndexNameExpressionResolverindexNameExpressionResolver,Supplier<DiscoveryNodes>nodesInCluster){returnList.of(newRestWriteAction());}@OverridepublicList<ActionHandler<?extendsActionRequest,?extendsActionResponse>>getActions(){returnList.of(newActionHandler<>(WriteAction.INSTANCE,WriteTransportAction.class));}@OverridepublicList<PersistentTasksExecutor<?>>getPersistentTasksExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient,PersistentTasksServicepersistentTasksService){returnList.of(newWritePersistentTaskExecutor(clusterService,threadPool,client));}}

3. REST 层:RestWriteAction.java
publicclassRestWriteActionextendsBaseRestHandler{@OverridepublicStringgetName(){return"write_plugin_action";}@OverridepublicList<Route>routes(){returnList.of(newRoute(RestRequest.Method.POST,"/_write/{index}"),newRoute(RestRequest.Method.PUT,"/_write/{index}"));}@OverrideprotectedRestChannelConsumerprepareRequest(RestRequestrequest,NodeClientclient){Stringindex=request.param("index");Stringbody=request.content().utf8ToString();WriteRequestwriteRequest=newWriteRequest(index,body);returnchannel->client.execute(WriteAction.INSTANCE,writeRequest,newRestToXContentListener<>(channel));}}

4. 内部 RPC:WriteAction / WriteRequest / WriteResponse
publicclassWriteActionextendsActionType<WriteResponse>{publicstaticfinalWriteActionINSTANCE=newWriteAction();publicstaticfinalStringNAME="cluster:admin/write/plugin";privateWriteAction(){super(NAME);}}publicclassWriteRequestextendsActionRequest{privatefinalStringindex;privatefinalStringpayload;publicWriteRequest(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteRequest(StreamInputin)throwsIOException{super(in);this.index=in.readString();this.payload=in.readString();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{super.writeTo(out);out.writeString(index);out.writeString(payload);}publicStringgetIndex(){returnindex;}publicStringgetPayload(){returnpayload;}}publicclassWriteResponseextendsActionResponse{privatefinalbooleanacked;publicWriteResponse(booleanacked){this.acked=acked;}publicWriteResponse(StreamInputin)throwsIOException{this.acked=in.readBoolean();}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeBoolean(acked);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("acked",acked).endObject();}}

5. Transport 层:WriteTransportAction.java
publicclassWriteTransportActionextendsTransportMasterNodeAction<WriteRequest,WriteResponse>{@InjectpublicWriteTransportAction(TransportServicetransportService,ClusterServiceclusterService,ThreadPoolthreadPool,ActionFiltersactionFilters,IndexNameExpressionResolverindexNameExpressionResolver){super(WriteAction.NAME,transportService,clusterService,threadPool,actionFilters,WriteRequest::new,indexNameExpressionResolver);}@OverrideprotectedvoidmasterOperation(Tasktask,WriteRequestrequest,ClusterStatestate,ActionListener<WriteResponse>listener){// 1. 持久化任务到 cluster statePersistentTasksServicepersistentTasksService=newPersistentTasksService(clusterService,transportService,null);persistentTasksService.sendStartRequest(UUIDs.base64UUID(),"write_task",newWriteTaskParams(request.getIndex(),request.getPayload()),ActionListener.wrap(r->listener.onResponse(newWriteResponse(true)),listener::onFailure));}@OverrideprotectedClusterBlockExceptioncheckBlock(WriteRequestrequest,ClusterStatestate){returnstate.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);}}

6. 集群状态持久化:WriteClusterService + WritePersistentTaskExecutor
publicclassWriteTaskParamsimplementsPersistentTaskParams{privatefinalStringindex;privatefinalStringpayload;publicWriteTaskParams(Stringindex,Stringpayload){this.index=index;this.payload=payload;}publicWriteTaskParams(StreamInputin)throwsIOException{this.index=in.readString();this.payload=in.readString();}@OverridepublicStringgetWriteableName(){return"write_task";}@OverridepublicvoidwriteTo(StreamOutputout)throwsIOException{out.writeString(index);out.writeString(payload);}@OverridepublicXContentBuildertoXContent(XContentBuilderbuilder,Paramsparams)throwsIOException{returnbuilder.startObject().field("index",index).field("payload",payload).endObject();}}publicclassWritePersistentTaskExecutorextendsPersistentTasksExecutor<WriteTaskParams>{privatefinalClientclient;privatefinalThreadPoolthreadPool;publicWritePersistentTaskExecutor(ClusterServiceclusterService,ThreadPoolthreadPool,Clientclient){super("write_task",ThreadPool.Names.GENERIC);this.client=client;this.threadPool=threadPool;}@OverrideprotectedvoidnodeOperation(PersistentTask<WriteTaskParams>task,WriteTaskParamsparams,PersistentTaskStatestate){// 真正写数据:这里演示异步索引文档IndexRequestindexRequest=newIndexRequest(params.index).source("payload",params.payload,"timestamp",System.currentTimeMillis()).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);client.index(indexRequest,ActionListener.wrap(r->logger.info("Write task {} done, docId={}",task.getId(),r.getId()),e->logger.warn("Write task "+task.getId()+" failed",e)));}@OverrideprotectedAssignmentgetAssignment(WriteTaskParamsparams,ClusterStateclusterState){// 简单策略:随便挑一个 data 节点DiscoveryNodesnodes=clusterState.nodes();List<DiscoveryNode>dataNodes=nodes.getDataNodes().values().stream().toList();returndataNodes.isEmpty()?Assignment.NO_VALID_NODE_ASSIGNMENT:newAssignment(dataNodes.get(Randomness.get().nextInt(dataNodes.size())).getId(),"ok");}}

7. 安装 & 验证
mvn clean package# 得到 target/write-plugin-1.0.0.zipbin/elasticsearch-plugininstallfile:///full/path/write-plugin-1.0.0.zip# 重启节点
# 1. 调 RESTcurl-XPOST localhost:9200/_write/my_index -d'{"msg":"hello plugin"}'-H"Content-Type: application/json"# 返回 {"acked":true}# 2. 看任务curl-XGET localhost:9200/_cluster/pending_tasks# 3. 看结果curllocalhost:9200/my_index/_search?q=*:*

8. 可继续扩展的 5 个方向
  1. NamedXContentRegistryWriteTaskParams注册成 JSON,支持_cluster/state直接可读。
  2. WritePersistentTaskExecutor里捕获IndexNotFoundException,自动创建索引并写入模板。
  3. WriteTaskParams做成AckedRequest,实现POST /_write/{index}?wait_for_active_shards=2语义。
  4. 通过Plugin.createComponents注入自定义线程池,让大批量写任务走独立队列。
  5. PersistentTaskState存储重试次数,结合BackoffPolicy实现断点续写。

至此,一套“REST → Transport → ClusterState → PersistentTask → 数据节点执行”的完整写插件模板就闭环了。直接复制即可编译,二次开发只需替换WriteTaskParamsnodeOperation里的业务逻辑。```
推荐阅读:
PyCharm 2018–2024使用指南

更多技术文章见公众号: 大城市小农民

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/19 1:43:16

springboot-java农产品物流信息服务平台vue

目录 农产品物流信息服务平台摘要 开发技术源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01; 农产品物流信息服务平台摘要 该平台基于SpringBoot和Vue技术栈开发&#xff0c;旨在整合农产品供应链的物流信息&#xff0c;实现从生产到消费…

作者头像 李华
网站建设 2026/4/11 20:22:26

springboot-java旧货网上交易vue二手买家卖家

目录SpringBoot与Vue构建的旧货交易平台摘要开发技术源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;SpringBoot与Vue构建的旧货交易平台摘要 该平台采用前后端分离架构&#xff0c;后端基于SpringBoot框架实现RESTful API&#xff0c;…

作者头像 李华
网站建设 2026/4/18 9:42:13

springboot-java软件项目测试管理系统vue

目录项目背景技术架构核心功能创新点应用价值开发技术源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;项目背景 SpringBoot-Java软件项目测试管理系统结合Vue前端框架&#xff0c;旨在为软件开发团队提供高效的测试管理解决方案。该系统…

作者头像 李华
网站建设 2026/4/19 1:02:36

基于贾子智慧理论体系的 AI 未来发展核心观点深度研判

基于贾子智慧理论体系的 AI 未来发展核心观点深度研判以贾子智慧理论 **"势 - 道 - 术" 三层框架 ** 为核心逻辑&#xff0c;结合中国国家安全视角&#xff0c;对 AI 未来发展的职业、经济、技术、能源、社会五大核心命题进行硬核推演&#xff0c;提出兼具前瞻性与可…

作者头像 李华
网站建设 2026/4/18 9:49:54

三菱PLC玩转压力控制:压背光板项目实战揭秘

三菱Q系列PLC程序案例 本案例是压背光板并保持恒定压力&#xff0c;通过位置模式以及转矩模式切换来快速实现压力保持&#xff0c;转矩模式时通过PID计算来自动调节压力。 本案例采用三菱Q系列PLC以及QD77MS运动模块以及三菱J4-B型总线伺服系统。 三菱Q系列ST、结构化编程、QD7…

作者头像 李华