05-aeron-cluster-basic - Aeron Cluster 基础示例

项目说明:这是一个完整的 Aeron Cluster 3节点集群应用示例,演示了 ClusteredService 的基本实现和集群的核心功能。

核心特性

  • 完整的 Cluster 实现: MediaDriver + Archive + ConsensusModule + ClusteredServiceContainer
  • 3节点集群: 支持 Leader 选举和故障切换
  • 计数器服务: 简单但完整的业务逻辑示例
  • 客户端示例: 包含重连机制和错误处理
  • 快照和恢复: 支持状态持久化和快速恢复
  • 详细日志: 清晰的运行状态输出

项目架构

graph TB subgraph Client["客户端层"] CC[ClusterClient
集群客户端] end subgraph Cluster["Aeron Cluster 3节点集群"] subgraph Node0["Node 0 - Leader"] N0[节点 0
9000-9300, 8010] end subgraph Node1["Node 1 - Follower"] N1[节点 1
9001-9301, 8011] end subgraph Node2["Node 2 - Follower"] N2[节点 2
9002-9302, 8012] end end subgraph Core["核心组件"] CM[Consensus Module
Raft 共识算法
- Leader 选举
- 日志复制
- 消息排序] AR[Archive
日志持久化
- Recording
- Snapshot] CS[ClusteredService
计数器服务
- INCREMENT
- DECREMENT
- GET
- RESET] end CC -->|UDP Ingress| N0 CC -->|UDP Ingress| N1 CC -->|UDP Ingress| N2 N0 -.->|集群通信| N1 N1 -.->|集群通信| N2 N2 -.->|集群通信| N0 N0 --> CM N1 --> CM N2 --> CM CM --> AR AR --> CS style CC fill:#e3f2fd style N0 fill:#c8e6c9 style N1 fill:#fff9c4 style N2 fill:#fff9c4 style CM fill:#ffccbc style AR fill:#d1c4e9 style CS fill:#f8bbd0

节点间通信端口

节点 0
  • Client Facing: 9000 (客户端连接)
  • Member Facing: 9100 (集群成员通信)
  • Log Replication: 9200 (日志复制)
  • Transfer: 9300 (快照传输)
  • Archive Control: 8010 (Archive 控制)
节点 1
  • Client Facing: 9001
  • Member Facing: 9101
  • Log Replication: 9201
  • Transfer: 9301
  • Archive Control: 8011
节点 2
  • Client Facing: 9002
  • Member Facing: 9102
  • Log Replication: 9202
  • Transfer: 9302
  • Archive Control: 8012

核心文件

Java 源文件

graph LR subgraph Java["src/main/java/com/aeron/cluster/"] CN[ClusterNode.java
集群节点启动器] BCS[BasicClusteredService.java
ClusteredService 实现] CCL[ClusterClient.java
集群客户端] end subgraph Components["核心组件"] MD[MediaDriver
Aeron 底层通信] ARC[Archive
日志持久化] CM[ConsensusModule
Raft 共识] CSC[ClusteredServiceContainer
业务服务容器] end CN --> MD CN --> ARC CN --> CM CN --> CSC BCS -.实现.-> CSC CCL -.连接.-> CN style CN fill:#bbdefb style BCS fill:#c5e1a5 style CCL fill:#ffccbc

  • MediaDriver - Aeron 底层通信
  • Archive - 日志持久化
  • ConsensusModule - Raft 共识
  • ClusteredServiceContainer - 业务服务容器

  • onStart() - 启动和快照恢复
  • onSessionMessage() - 处理客户端请求
  • onTakeSnapshot() - 生成快照
  • onRoleChange() - 角色变更处理

  • 连接集群
  • 发送命令
  • 接收响应
  • Leader 切换处理

运行脚本

run.sh - 统一运行脚本
  • compile - 编译项目
  • node0/node1/node2 - 启动节点
  • client - 运行客户端
  • clean - 清理数据
  • help - 显示帮助

快速开始

1. 环境要求

必备环境:
  • Java: JDK 11 或更高版本
  • Maven: 3.6 或更高版本
  • 操作系统: Linux / macOS / Windows

验证环境:

java -version   # 应该显示 11 或更高版本
mvn -version    # 应该显示 3.6 或更高版本

2. 编译项目

# 方式 1: 使用 run.sh 脚本(推荐)
./run.sh compile

# 方式 2: 直接使用 Maven
mvn clean compile

编译成功后会看到:

[SUCCESS] 编译成功!
编译结果: /path/to/project/target/classes

3. 启动集群节点

重要: 需要在 3 个不同的终端窗口分别启动 3 个节点
终端 1 - 启动节点 0
./run.sh node0
终端 2 - 启动节点 1
./run.sh node1
终端 3 - 启动节点 2
./run.sh node2

当至少 2 个节点启动后,集群会自动选举出 Leader。你会看到类似的输出:

╔════════════════════════════════════════════════════════╗
║      Aeron Cluster 节点启动 - Node 0                ║
╚════════════════════════════════════════════════════════╝

========================================
角色变更: LEADER
========================================

4. 运行客户端

在第 4 个终端窗口运行客户端:

./run.sh client

客户端会自动连接到集群 Leader,并发送一系列测试命令:

──────────────────────────────────────
  → 发送命令: INCREMENT
  ← 收到响应: OK: counter=1
──────────────────────────────────────
  → 发送命令: INCREMENT
  ← 收到响应: OK: counter=2
──────────────────────────────────────
  → 发送命令: GET
  ← 收到响应: VALUE: 2

5. 测试 Leader 故障切换

sequenceDiagram participant Client as 客户端 participant N0 as 节点 0 (Leader) participant N1 as 节点 1 (Follower) participant N2 as 节点 2 (Follower) Client->>N0: 发送请求 N0->>Client: 返回响应 Note over N0: Ctrl+C 停止节点 0 N0--xN1: 失去连接 N0--xN2: 失去连接 Note over N1,N2: 开始新的选举 N1->>N2: 请求投票 N2->>N1: 投票确认 Note over N1: 成为新 Leader Client->>N1: 重连并发送请求 N1->>Client: 返回响应
  1. 观察当前 Leader 是哪个节点(假设是节点 0)
  2. 在节点 0 的终端按 Ctrl+C 停止节点
  3. 观察其他节点的输出,会看到新的选举过程:
╔════════════════════════════════════════════════════════╗
║  新 Leader 选出                                        ║
╚════════════════════════════════════════════════════════╝
  Leader Member ID: 1
  Term ID: 2
  1. 客户端会自动重连到新的 Leader,业务不中断

6. 清理数据

测试完成后,清理集群数据:

# 清理集群数据(保留编译结果)
./run.sh clean

# 清理所有数据(包括编译结果)
./run.sh cleanall

学习重点

1. ClusteredService 接口

BasicClusteredService.java 实现了完整的 ClusteredService 接口:

public interface ClusteredService {
    void onStart(Cluster cluster, Image snapshotImage);
    void onSessionOpen(ClientSession session, long timestamp);
    void onSessionClose(ClientSession session, long timestamp, CloseReason closeReason);
    void onSessionMessage(ClientSession session, long timestamp,
                         DirectBuffer buffer, int offset, int length, Header header);
    void onTimerEvent(long correlationId, long timestamp);
    void onRoleChange(Cluster.Role newRole);
    void onTakeSnapshot(ExclusivePublication snapshotPublication);
    void onTerminate(Cluster cluster);
}

2. Consensus Module 配置

ClusterNode.java 展示了完整的 Consensus Module 配置:

final ConsensusModule.Context consensusModuleContext = new ConsensusModule.Context()
    .clusterMemberId(nodeId)
    .clusterMembers(CLUSTER_MEMBERS)
    .clusterDir(new File(clusterDir, "consensus"))
    .aeronDirectoryName(aeronDir.getAbsolutePath())
    .archiveContext(archiveContext.clone())
    .electionTimeoutNs(TimeUnit.SECONDS.toNanos(2))
    .heartbeatIntervalNs(TimeUnit.MILLISECONDS.toNanos(200))
    .sessionTimeoutNs(TimeUnit.SECONDS.toNanos(10))
    .snapshotIntervalLength(1000);  // 每 1000 条日志生成快照

3. Archive 集成

每个节点都包含一个 Archive 实例,负责:

4. 3节点集群部署

集群成员配置格式:

memberId,clientFacingEndpoint,memberFacingEndpoint,logEndpoint,transferEndpoint,archiveEndpoint

示例:

0,localhost:9000,localhost:9100,localhost:9200,localhost:9300,localhost:8010|
1,localhost:9001,localhost:9101,localhost:9201,localhost:9301,localhost:8011|
2,localhost:9002,localhost:9102,localhost:9202,localhost:9302,localhost:8012

5. Leader 选举和故障切换

配置项 说明
最小集群 需要至少 2 个节点才能选举出 Leader 2 节点
推荐配置 3 个节点(容忍 1 个节点故障) 3 节点
选举超时 选举超时时间 2 秒
心跳间隔 Leader 发送心跳的间隔 200 毫秒
自动切换 Leader 故障后自动选举新 Leader 支持

运行脚本详解

run.sh 命令

命令 说明
./run.sh compile 编译项目
./run.sh node0 启动节点 0
./run.sh node1 启动节点 1
./run.sh node2 启动节点 2
./run.sh client 运行客户端
./run.sh clean 清理集群数据
./run.sh cleanall 清理所有数据
./run.sh help 显示帮助

脚本功能

自动检查

检查 Java 和 Maven 版本

依赖管理

自动构建完整的 classpath

错误处理

友好的错误提示和处理

彩色输出

清晰的日志和状态显示

测试验证方法

# 1. 编译
./run.sh compile

# 2. 启动 3 个节点(3 个终端)
./run.sh node0
./run.sh node1
./run.sh node2

# 3. 运行客户端(第 4 个终端)
./run.sh client

# 预期结果: 客户端成功连接,所有命令正常响应

# 1. 只启动 1 个节点
./run.sh node0
# 预期: 节点保持 Candidate 状态,无法选举出 Leader

# 2. 启动第 2 个节点
./run.sh node1
# 预期: 成功选举出 Leader(node0 或 node1)

# 3. 启动第 3 个节点
./run.sh node2
# 预期: 加入集群作为 Follower

# 1. 启动 3 个节点并运行客户端
# 2. 确认当前 Leader(假设是 node0)
# 3. 停止 Leader 节点(Ctrl+C)
# 4. 观察其他节点重新选举(约 2 秒)
# 5. 客户端自动重连到新 Leader
# 预期: 业务不中断,新 Leader 选举成功

# 1. 启动集群并运行客户端多次
./run.sh client  # 多次运行,counter 会累加

# 2. 停止所有节点(Ctrl+C)

# 3. 重新启动节点
./run.sh node0
./run.sh node1
./run.sh node2

# 4. 运行客户端发送 GET 命令
./run.sh client

# 预期: counter 从快照恢复,保持之前的值

# 1. 启动 3 个节点
# 2. 停止 node0(Ctrl+C)
# 3. 运行客户端(连接到 node1 或 node2)
# 预期: 客户端仍然正常工作

# 4. 重新启动 node0
./run.sh node0
# 预期: node0 自动同步日志,加入集群

常见问题 (FAQ)

原因: 端口被占用(可能是之前的进程未正常关闭)

解决方法:

# 查找占用端口的进程
lsof -i :9000
lsof -i :9100

# 杀死进程
kill -9 <PID>

# 或者清理所有 Java 进程
pkill -9 java

# 重新启动节点
./run.sh node0

原因: Raft 算法要求多数节点在线才能选举 Leader

解决方法: 至少启动 2 个节点

# 终端 1
./run.sh node0

# 终端 2
./run.sh node1

原因:

  1. 节点未启动
  2. 集群未选举出 Leader
  3. 端口配置错误

解决方法:

# 1. 确认至少 2 个节点在运行
# 2. 检查节点日志,确认 Leader 已选举
# 3. 检查客户端端点配置:
#    INGRESS_ENDPOINTS = "localhost:9000,localhost:9001,localhost:9002"

修改配置:

编辑 ClusterNode.java,将 localhost 替换为实际 IP:

private static String buildClusterMembers() {
    return
        "0,192.168.1.10:9000,192.168.1.10:9100,192.168.1.10:9200,192.168.1.10:9300,192.168.1.10:8010|" +
        "1,192.168.1.11:9001,192.168.1.11:9101,192.168.1.11:9201,192.168.1.11:9301,192.168.1.11:8011|" +
        "2,192.168.1.12:9002,192.168.1.12:9102,192.168.1.12:9202,192.168.1.12:9302,192.168.1.12:8012";
}

同时修改 ClusterClient.java:

private static final String INGRESS_ENDPOINTS =
    "192.168.1.10:9000,192.168.1.11:9001,192.168.1.12:9002";

目录结构

项目目录

05-aeron-cluster-basic/
├── src/
│   └── main/
│       └── java/
│           └── com/
│               └── aeron/
│                   └── cluster/
│                       ├── ClusterNode.java
│                       ├── BasicClusteredService.java
│                       └── ClusterClient.java
├── pom.xml
├── run.sh
└── README.md

运行时数据目录

/tmp/aeron-cluster/
├── node-0/
│   ├── aeron/              # Aeron 媒体驱动目录
│   ├── consensus/          # Consensus Module 数据
│   ├── service/            # ClusteredService 数据
│   └── archive/            # Archive 日志和快照
├── node-1/
│   └── ...
└── node-2/
    └── ...

扩展学习

下一步

修改业务逻辑
BasicClusteredService 中实现更复杂的业务
添加定时器
使用 cluster.scheduleTimer() 实现定时任务
优化快照
实现增量快照,提高性能
监控和调优
添加性能指标和监控
分布式部署
在多台机器上部署集群

参考资料

检查清单

完成本示例后,你应该能够: