一、背景与挑战
1.1 业务场景
UCP是某省级电信运营商的大数据处理平台,需要实时采集和处理多种数据流:
数据来源:
- 基站日志:10万个基站,每5秒上报一次性能指标
- 核心网信令:通话建立、切换、释放等信令消息
- 用户行为:APP使用记录、网页浏览日志
- 网络告警:设备故障、链路中断等告警信息
技术指标要求:
- 日均消息量:**2亿+**条
- 峰值吞吐量:100万条/秒
- 端到端延迟:< 10秒(从产生到可查询)
- 数据可靠性:99.9%(允许少量丢失,可重新采集)
技术栈
<!-- Kafka客户端 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.1.0</version> </dependency> <!-- Hadoop生态 --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.3.1</version> </dependency> <!-- 连接池 --> <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency>1.2 遇到的问题
项目初期,我们采用最简单的单线程消费方式:
// 初始实现(简化版) public Iterator<ConsumerRecord<String, String>> poll() { while(true) { // ❌ 每次只拉取少量消息 ConsumerRecords<String, String> records = consumer.poll(1000L); if(records != null && records.count() > 0) { consumer.commitAsync(); return records.iterator(); } ThreadUtil.sleep(20L); // ❌ 空转浪费CPU } }问题表现:
❌吞吐量低:仅10万条/秒,远低于100万条/秒的目标
❌消费延迟高:高峰期延迟超过30秒
❌资源利用率差:CPU仅30%,网络带宽不足20%
❌消息积压严重:Lag经常超过100万条
通过监控发现,系统瓶颈不在CPU或网络IO,而在消费策略不合理和参数配置不当。
二、问题分析
2.1 性能瓶颈定位
我们通过JProfiler分析了消费流程的耗时分布:
总耗时:
100ms/批次
├─ poll()拉取消息: 15ms (15%)
├─ 反序列化: 25ms (25%)
├─ 业务处理: 40ms (40%) ← 主要瓶颈
├─ offset提交: 10ms (10%)
└─ 线程切换: 10ms (10%)
关键问题:
- 单线程串行处理:无法充分利用多核CPU
- 批量大小不合理:默认每次拉取500条,太小
- 频繁offset提交:每次poll都commit,增加网络开销
- 反序列化效率低:StringDeserializer性能不佳
- GC压力大:频繁创建临时对象
2.2 优化方向评估
| 优化方案 | 预期收益 | 实现难度 | 风险 |
|---|---|---|---|
| 增加批量大小 | TPS提升2-3倍 | ⭐ 极低 | 无 |
| 并行消费 | TPS提升3-5倍 | ⭐⭐ 低 | 需保证线程安全 |
| 零拷贝优化 | TPS提升1.5倍 | ⭐⭐⭐ 中 | 需自定义Serializer |
| JVM调优 | GC优化30% | ⭐⭐ 低 | 需充分测试 |
| 升级Kafka版本 | TPS提升20% | ⭐⭐ 低 | 兼容性风险 |
决策:采用批量优化 + 并行消费作为核心方案,配合JVM调优和零拷贝优化。
三、解决方案
3.1 核心方案:批量拉取优化
优化前:小批量拉取
// 初始配置 Properties props = new Properties(); props.put("bootstrap.servers", url); props.put("group.id", group); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); // ❌ 缺少关键配置 consumer = new KafkaConsumer<>(props);问题:
- 未设置
max.poll.records,默认500条 - 未设置
fetch.min.bytes,立即返回 - 未设置
fetch.max.wait.ms,等待时间短
优化后:大批量拉取
public void initConsumer(){ if(url == null || deserializer == null || topic == null || group == null){ LOGGER.warn("请设置必要的kafka连接信息"); return; } Properties props = new Properties(); props.put("bootstrap.servers", url); props.put("group.id", group); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", deserializer); props.put("value.deserializer", deserializer); // ✅ 关键优化1:增加批量大小 props.put("max.poll.records", 10000); // 每次最多拉取1万条 // ✅ 关键优化2:批量拉取策略 props.put("fetch.min.bytes", 1048576); // 最少1MB才返回 props.put("fetch.max.wait.ms", 500); // 最多等待500ms // ✅ 关键优化3:增加会话超时时间 props.put("session.timeout.ms", 30000); // 30秒 props.put("heartbeat.interval.ms", 10000); // 10秒心跳 // ✅ 关键优化4:增加最大拉取间隔 props.put("max.poll.interval.ms", 300000); // 5分钟 // ✅ 关键优化5:禁用自动提交(手动控制) props.put("enable.auto.commit", "false"); // 1.创建消费者 consumer = new KafkaConsumer<>(props); // 2.订阅Topic if (StringUtil.isNotEmpty(topic)) { if (partition != null) { // 指定分区 List<TopicPartition> topicPartitions = new ArrayList<>(); String[] partitions = partition.split(","); for (int i = 0; i < partitions.length; i++) { TopicPartition topicPartition = new TopicPartition( topic, Integer.parseInt(partitions[i]) ); topicPartitions.add(topicPartition); } consumer.assign(topicPartitions); } else { // 订阅整个Topic this.consumer.subscribe(Collections.singletonList(topic)); } } }关键参数说明
1. max.poll.records(单次拉取数量)
props.put("max.poll.records", 10000); // 从500提升到10000调优过程:
| 值 | TPS | P99延迟 | 内存占用 | 评价 |
|---|---|---|---|---|
| 500(默认) | 10万 | 15ms | 低 | ❌ 批量太小 |
| 1000 | 25万 | 20ms | 中 | ✓ 有提升 |
| 5000 | 60万 | 30ms | 中高 | ✓ 推荐 |
| 10000 | 80万 | 40ms | 高 | ✓ 最佳 |
| 50000 | 75万 | 60ms | 很高 | ❌ GC压力增大 |
结论:10000是吞吐量和延迟的最佳平衡点。
2. fetch.min.bytes & fetch.max.wait.ms(批量拉取策略)
props.put("fetch.min.bytes", 1048576); // 1MB props.put("fetch.max.wait.ms", 500); // 500ms工作原理:
- Broker会等待直到满足以下任一条件:
- 累积数据达到
fetch.min.bytes(1MB) - 等待时间达到
fetch.max.wait.ms(500ms)
- 累积数据达到
效果:
- 减少网络往返次数
- 提高单次传输效率
- 降低Broker负载
3. session.timeout.ms & heartbeat.interval.ms(会话管理)
props.put("session.timeout.ms", 30000); // 30秒 props.put("heartbeat.interval.ms", 10000); // 10秒为什么调整:
- 默认值:session.timeout.ms=10秒,heartbeat.interval.ms=3秒
- 问题:大批量处理时,容易超时被踢出消费组
- 解决:增加超时时间,避免误判
规则:heartbeat.interval.ms≈session.timeout.ms / 3
4. max.poll.interval.ms(最大拉取间隔)
props.put("max.poll.interval.ms", 300000); // 5分钟作用:
- 两次poll()之间的最大时间间隔
- 超过此时间会被认为消费失败,触发Rebalance
- 默认值:300秒(5分钟),保持不变即可
5. enable.auto.commit(手动提交)
props.put("enable.auto.commit", "false"); // 禁用自动提交为什么手动提交:
- 自动提交:可能导致消息丢失(提交后处理失败)
- 手动提交:处理成功后再提交,保证可靠性
3.2 并行消费:多线程架构
架构设计
┌──────────────────────────────────────────────────────┐ │ Kafka Consumer │ │ │ │ poll() → 10000条消息 │ └──────────────────┬───────────────────────────────────┘ │ ▼ ┌────────────────┐ │ 消息分发器 │ │ (Round-Robin) │ └──┬──┬──┬──┬────┘ │ │ │ │ ▼ ▼ ▼ ▼ ┌────┐┌───┐┌───┐┌───┐ │T1 ││T2 ││T3 ││T4 │ ← 4个worker线程 └─┬──┘└─┬─┘└─┬─┘└─┬─┘ │ │ │ │ ▼ ▼ ▼ ▼ ┌────────────────────┐ │ BlockingQueue │ ← 结果队列 └────────┬───────────┘ │ ▼ ┌──────────────┐ │ Offset管理器│ │ (批量提交) │ └──────────────┘代码实现
// 新增:并行消费者 public class ParallelKafkaConsumer { private KafkaConsumer<String, String> consumer; private ExecutorService workerPool; private BlockingQueue<ProcessResult> resultQueue; private int workerCount = 4; // worker线程数 private int batchSize = 10000; public ParallelKafkaConsumer(Properties props) { // 1. 创建消费者 this.consumer = new KafkaConsumer<>(props); // 2. 创建worker线程池 this.workerPool = Executors.newFixedThreadPool(workerCount, new ThreadFactory() { private AtomicInteger counter = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("kafka-worker-" + counter.incrementAndGet()); t.setDaemon(true); return t; } }); // 3. 创建结果队列 this.resultQueue = new LinkedBlockingQueue<>(100000); } /** * 启动消费循环 */ public void start() { LOGGER.info("启动并行Kafka消费者,worker数量: {}", workerCount); while (running) { try { // 1. 拉取消息 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (records.isEmpty()) { continue; } LOGGER.debug("拉取到 {} 条消息", records.count()); // 2. 分发到worker线程 List<Future<ProcessResult>> futures = new ArrayList<>(); for (ConsumerRecord<String, String> record : records) { Future<ProcessResult> future = workerPool.submit(() -> { return processRecord(record); }); futures.add(future); } // 3. 收集结果 for (Future<ProcessResult> future : futures) { try { ProcessResult result = future.get(30, TimeUnit.SECONDS); resultQueue.offer(result); } catch (Exception e) { LOGGER.error("处理消息失败", e); } } // 4. 批量提交offset commitOffsets(records); } catch (Exception e) { LOGGER.error("消费异常", e); ThreadUtil.sleep(1000L); } } } /** * 处理单条消息 */ private ProcessResult processRecord(ConsumerRecord<String, String> record) { long startTime = System.currentTimeMillis(); try { // 业务处理逻辑 ParseOutRecord parsedRecord = parseMessage(record.value()); // 写入HBase repository.transport(parsedRecord); return new ProcessResult( record.topic(), record.partition(), record.offset(), true, System.currentTimeMillis() - startTime ); } catch (Exception e) { LOGGER.error("处理消息失败: topic={}, partition={}, offset={}", record.topic(), record.partition(), record.offset(), e); return new ProcessResult( record.topic(), record.partition(), record.offset(), false, System.currentTimeMillis() - startTime ); } } /** * 批量提交offset */ private void commitOffsets(ConsumerRecords<String, String> records) { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionRecords = records.records(partition); if (!partitionRecords.isEmpty()) { long lastOffset = partitionRecords.get( partitionRecords.size() - 1 ).offset(); offsets.put(partition, new OffsetAndMetadata(lastOffset + 1)); } } if (!offsets.isEmpty()) { consumer.commitSync(offsets); LOGGER.debug("提交offset: {}", offsets); } } /** * 优雅停机 */ public void shutdown() { LOGGER.info("开始优雅停机..."); running = false; // 1. 停止接收新任务 workerPool.shutdown(); try { // 2. 等待正在执行的任务完成 if (!workerPool.awaitTermination(60, TimeUnit.SECONDS)) { workerPool.shutdownNow(); } } catch (InterruptedException e) { workerPool.shutdownNow(); } // 3. 提交最后的offset consumer.commitSync(); // 4. 关闭消费者 consumer.close(); LOGGER.info("优雅停机完成"); } }Worker线程数选择
我们通过压测对比了不同worker数量的效果:
| Worker数 | TPS | CPU利用率 | 上下文切换 | 评价 |
|---|---|---|---|---|
| 1 | 10万 | 25% | 低 | ❌ 单线程瓶颈 |
| 2 | 30万 | 50% | 中 | ✓ 有提升 |
| 4 | 80万 | 75% | 中高 | ✓ 最佳 |
| 8 | 85万 | 90% | 高 | ⚠️ 边际效益递减 |
| 16 | 82万 | 95% | 很高 | ❌ 过度竞争 |
结论:4个worker线程是最佳选择(假设4核CPU)。
公式:workerCount = CPU核心数 × 0.8
3.3 零拷贝优化:自定义Serializer
问题分析
默认的StringDeserializer会将byte[]转换为String,存在以下问题:
- 编码转换开销:UTF-8解码
- 对象创建:每个消息创建一个String对象
- GC压力:大量临时对象
优化方案:直接操作byte[]
// 自定义Deserializer(零拷贝) public class ByteArrayDeserializer implements Deserializer<byte[]> { @Override public void configure(Map<String, ?> configs, boolean isKey) { // 无需配置 } @Override public byte[] deserialize(String topic, byte[] data) { // ✅ 直接返回byte[],无拷贝 return data; } @Override public void close() { // 无需清理 } }配置修改
// 使用自定义Deserializer props.put("key.deserializer", "cn.ugeee.framework.serializer.ByteArrayDeserializer"); props.put("value.deserializer", "cn.ugeee.framework.serializer.ByteArrayDeserializer");性能对比
| Deserializer | TPS | GC频率 | CPU利用率 |
|---|---|---|---|
| StringDeserializer | 80万 | 5次/秒 | 75% |
| ByteArrayDeserializer | 95万 | 2次/秒 | 65% |
提升:TPS提升18%,GC频率降低60%。
3.4 JVM调优
初始配置
#默认配置 -Xms2g -Xmx2g -XX:+UseParallelGC问题:
- Heap太小,频繁Full GC
- ParallelGC停顿时间长
优化后配置
#G1 GC配置 -Xms8g -Xmx8g -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:G1HeapRegionSize=16m -XX:InitiatingHeapOccupancyPercent=45 -XX:+ParallelRefProcEnabled -XX:MaxTenuringThreshold=10 #GC日志 -Xloggc:/var/log/kafka-consumer/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=100M关键参数说明
1. UseG1GC(G1垃圾收集器)
-XX:+UseG1GC为什么选择G1:
- 适合大Heap(>4GB)
- 可预测的停顿时间
- 并行和并发结合
2. MaxGCPauseMillis(最大GC停顿)
-XX:MaxGCPauseMillis = 200作用:
- G1会尝试将GC停顿控制在200ms以内
- 默认值:200ms,保持不变
3. G1HeapRegionSize(Region大小)
-XX:G1HeapRegionSize=16m选择依据:
- Heap / Region数 ≈ 2048个Region
- 8GB / 16MB = 512个Region ✓
4. InitiatingHeapOccupancyPercent(触发并发标记阈值)
-XX:InitiatingHeapOccupancyPercent = 45作用:
- Heap使用率达到45%时,触发并发标记
- 默认值:45%,保持不变
GC效果对比
| 配置 | Young GC频率 | Full GC频率 | P99停顿 |
|---|---|---|---|
| ParallelGC | 10次/秒 | 1次/分钟 | 500ms |
| G1GC | 5次/秒 | 0次 | 150ms |
提升:GC停顿降低70%,无Full GC。
四、性能测试
4.1 测试环境
硬件配置:
- Kafka Cluster:5节点
- CPU:16核 Intel Xeon Gold 6248
- 内存:32GB DDR4
- 磁盘:2TB NVMe SSD
- 网络:万兆以太网
- Consumer:4台应用服务器
- CPU:8核
- 内存:16GB
- JVM:8GB Heap,G1 GC
软件版本:
- Kafka:1.1.0
- JDK:1.8.0_292
测试数据:
- 总消息数:1亿条
- 单条大小:500字节
- 总数据量:50GB
- Topic分区数:20
4.2 测试结果对比
吞吐量对比
| 优化阶段 | TPS(条/秒) | 相对提升 |
|---|---|---|
| 初始版本(baseline) | 10万 | - |
| + 批量拉取优化 | 30万 | ↑ 200% |
| + 并行消费(4线程) | 80万 | ↑ 700% |
| + 零拷贝优化 | 95万 | ↑ 850% |
| + JVM调优 | 100万 | ↑ 900% |
延迟对比
| 优化阶段 | P50延迟 | P95延迟 | P99延迟 |
|---|---|---|---|
| 初始版本 | 20ms | 50ms | 100ms |
| + 批量拉取 | 25ms | 60ms | 120ms |
| + 并行消费 | 30ms | 70ms | 150ms |
| + 零拷贝 | 28ms | 65ms | 140ms |
| + JVM调优 | 25ms | 60ms | 130ms |
分析:
- 批量拉取会增加单次poll的延迟,但仍在可接受范围
- 并行消费略微增加延迟(线程调度开销)
- 整体端到端延迟仍 < 10秒
资源利用率对比
| 指标 | 优化前 | 优化后 | 变化 |
|---|---|---|---|
| Consumer CPU | 30% | 75% | ↑ 充分利用 |
| 网络带宽 | 20% | 70% | ↑ 大幅提升 |
| JVM Heap | 2GB | 6GB | ↑ 适中 |
| GC频率 | 10次/秒 | 5次/秒 | ↓ 降低50% |
五、生产实践
5.1 监控体系
我们将以下指标接入Prometheus + Grafana监控:
关键监控项
// 使用Micrometer采集指标 MeterRegistry registry = PrometheusMeterRegistry(); // 1. 消费TPS Counter consumeCounter = Counter.builder("kafka.consume.total") .tag("topic", topic) .tag("group", group) .register(registry); // 在消费时记录 consumeCounter.increment(records.count()); // 2. 消费延迟(Lag) Gauge lagGauge = Gauge.builder("kafka.consumer.lag", () -> getConsumerLag()) .tag("topic", topic) .tag("group", group) .register(registry); // 3. poll()耗时 Timer pollTimer = Timer.builder("kafka.poll.duration") .register(registry); long start = System.nanoTime(); ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); pollTimer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS); // 4. 处理耗时 Timer processTimer = Timer.builder("kafka.process.duration") .register(registry); processTimer.record(processTime, TimeUnit.MILLISECONDS);Grafana看板配置
看板包含:
- 实时消费TPS:折线图,每秒刷新
- 消费延迟(Lag):柱状图,按分区展示
- poll()耗时百分位:P50/P95/P99三线图
- Worker线程池状态:活跃线程数、队列长度
- JVM GC监控:Young GC/Full GC频率和停顿时间
- 错误率告警:消费失败率 > 1% 触发告警
5.2 异常处理最佳实践
场景1:Rebalance频繁
现象:消费者频繁Rebalance,消费中断
原因:
max.poll.interval.ms设置过小- 处理速度慢于拉取速度
修复:
// 增加最大拉取间隔 props.put("max.poll.interval.ms", 300000); // 5分钟 // 减少批量大小 props.put("max.poll.records", 5000); // 从10000降到5000场景2:消息积压
现象:Lag持续增长,超过100万条
原因:
- 消费速度慢于生产速度
- Worker线程阻塞
修复:
// 1. 增加Worker线程数 workerCount = 8; // 从4增加到8 // 2. 增加Consumer实例(水平扩展) // 启动多个Consumer实例,同一消费组 // 3. 优化业务处理逻辑 // 异步写入HBase,减少阻塞场景3:Offset提交失败
现象:CommitFailedException
原因:
- 超过
max.poll.interval.ms未poll - 消费者被踢出消费组
修复:
try { consumer.commitSync(offsets); } catch (CommitFailedException e) { LOGGER.error("Offset提交失败,可能已触发Rebalance", e); // 重新订阅 consumer.subscribe(Collections.singletonList(topic)); }5.3 踩坑记录
❌ 坑1:忘记禁用自动提交
现象:消息重复消费或丢失
原因:自动提交可能在处理失败前就提交了offset
修复:
// ❌ 错误配置 props.put("enable.auto.commit", "true"); // ✅ 正确配置 props.put("enable.auto.commit", "false"); // 处理成功后手动提交 consumer.commitSync(offsets);❌ 坑2:Worker线程数过多
现象:CPU利用率100%,但TPS不增反降
原因:线程竞争和上下文切换开销过大
修复:
// ❌ 过多线程 workerCount = 16; // CPU只有8核 // ✅ 合理线程数 workerCount = 6; // CPU核心数 × 0.8❌ 坑3:Heap设置过大
现象:GC停顿时间长,影响实时性
原因:Heap太大,GC扫描时间长
修复:
#❌ Heap过大 -Xms32g -Xmx32g #✅ 合理Heap -Xms8g -Xmx8g✅ 最佳实践总结
- 批量大小:10000条
- Worker线程数:CPU核心数 × 0.8
- Heap大小:8GB(G1 GC)
- 手动提交offset
- 监控先行:TPS、Lag、GC
- 灰度发布:先10%流量验证
六、总结与展望
6.1 优化成果
经过为期3周的优化,我们取得了显著成果:
✅消费吞吐量:10万 → 100万条/秒(提升10倍)
✅端到端延迟:稳定在10秒以内(满足业务需求)
✅资源利用率:CPU从30%提升到75%,充分发挥硬件性能
✅稳定性:连续运行6个月,零故障
✅可扩展性:支持水平扩展,增加Consumer实例即可线性提升吞吐量
6.2 关键经验
1. 批量操作是性能优化的银弹
- 减少网络往返次数
- 提高CPU缓存命中率
- 降低系统调用频率
2. 并行消费充分利用多核CPU
- Worker线程数 = CPU核心数 × 0.8
- 避免过度并行导致竞争
- 使用线程池管理生命周期
3. JVM调优不可忽视
- G1 GC适合大Heap场景
- 合理设置Heap大小(8-16GB)
- 监控GC日志,及时发现问题
4. 监控先行,数据驱动决策
- 优化前建立基线(Baseline)
- 优化过程中实时监控
- 用数据说话,避免凭感觉调优
6.3 后续优化方向
短期(1-3个月):
- 升级Kafka版本:从1.1.0升级到3.x(性能提升30%)
- 引入Kafka Streams:实时计算替代部分批处理
- 优化网络配置:启用TCP Zero Copy
中期(3-6个月):
- 云原生改造:Kubernetes部署,弹性伸缩
- 引入Schema Registry:统一管理消息格式
- 多租户隔离:不同业务使用独立Consumer Group
长期(6-12个月):
- Serverless集成:按需消费,降低成本
- AI优化:基于机器学习的动态调参
- 跨地域复制:MirrorMaker2实现异地灾备
七、附录
7.1 完整配置文件
Kafka Consumer优化配置
bootstrap.servers=192.168.1.100:9092,192.168.1.101:9092 group.id=ucp-consumer-group auto.offset.reset=earliest批量拉取
max.poll.records=10000 fetch.min.bytes=1048576 fetch.max.wait.ms=500会话管理
session.timeout.ms=30000 heartbeat.interval.ms=10000 max.poll.interval.ms=300000提交策略
enable.auto.commit=false序列化
key.deserializer=cn.ugeee.framework.serializer.ByteArrayDeserializer value.deserializer=cn.ugeee.framework.serializer.ByteArrayDeserializer网络
receive.buffer.bytes=1048576 send.buffer.bytes=10485767.2 JVM启动参数
#!/bin/bash JAVA_OPTS = "-Xms8g -Xmx8g" JAVA_OPTS = "$JAVA_OPTS -XX:+UseG1GC" JAVA_OPTS = "$JAVA_OPTS -XX:MaxGCPauseMillis=200" JAVA_OPTS = "$JAVA_OPTS -XX:G1HeapRegionSize=16m" JAVA_OPTS = "$JAVA_OPTS -XX:InitiatingHeapOccupancyPercent=45" JAVA_OPTS = "$JAVA_OPTS -XX:+ParallelRefProcEnabled" JAVA_OPTS = "$JAVA_OPTS -XX:MaxTenuringThreshold=10"GC日志
JAVA_OPTS = "$JAVA_OPTS -Xloggc:/var/log/kafka-consumer/gc.log" JAVA_OPTS = "$JAVA_OPTS -XX:+PrintGCDetails" JAVA_OPTS = "$JAVA_OPTS -XX:+PrintGCDateStamps" JAVA_OPTS = "$JAVA_OPTS -XX:+UseGCLogFileRotation" JAVA_OPTS = "$JAVA_OPTS -XX:NumberOfGCLogFiles=5" JAVA_OPTS = "$JAVA_OPTS -XX:GCLogFileSize=100M"启动应用
java $JAVA_OPTS -jar kafka-consumer.jar