feat: power loss

This commit is contained in:
td-zhangshun 2025-04-23 10:33:03 +08:00 committed by Netlops
parent 8ce3e5139f
commit 56cc4309d3
8 changed files with 748 additions and 207 deletions

View File

@ -57,4 +57,37 @@ go run cmd/client/main.go -server localhost:8080 -test all
5. 断电恢复测试
6. 长期稳定性测试
详细测试指标和结果分析请参阅测试报告。
详细测试指标和结果分析请参阅测试报告。
## 新增功能: 实时进度和数据完整性检测
### 实时进度追踪
客户端现在支持从服务器实时获取测试进度使用流式传输技术Server-Sent Events实现。这使得客户端可以实时查看测试的进度和状态而不需要频繁轮询。
### 断电数据完整性检测
系统现在支持在服务器意外断电后进行数据完整性检测,能够详细报告:
1. 有多少数据块丢失
2. 有多少数据块已损坏
3. 总共丢失了多少MB的数据
4. 提供数据恢复结果
### 使用方法
#### 运行常规测试(实时进度)
```bash
./bin/client -test power_loss -server localhost:8080
```
#### 运行断电后的恢复测试
在服务器断电并重启后,执行以下命令检查数据完整性:
```bash
./bin/client -recovery -server localhost:8080
```
这将执行数据完整性检测,并提供详细的数据丢失报告。

View File

@ -2,15 +2,20 @@ package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"net/http"
"os"
"time"
"plp-test/internal/config"
"plp-test/internal/model"
"plp-test/internal/utils"
"bufio"
"strings"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
@ -25,6 +30,7 @@ var (
dataSizeMB int
blockSize int
concurrent bool
recovery bool
)
func init() {
@ -36,6 +42,7 @@ func init() {
flag.IntVar(&dataSizeMB, "data-size", 0, "测试数据大小MB")
flag.IntVar(&blockSize, "block-size", 0, "数据块大小KB")
flag.BoolVar(&concurrent, "concurrent", false, "是否并发执行所有测试")
flag.BoolVar(&recovery, "recovery", false, "是否请求恢复测试")
}
// Client 客户端
@ -122,6 +129,102 @@ func (c *Client) RunTest(testType string, dataSizeMB, blockSize int) error {
func (c *Client) MonitorTestStatus(testID string) error {
c.logger.Infof("监控测试状态: %s", testID)
// 使用流式API监控进度
errCh := make(chan error, 1)
doneCh := make(chan struct{})
// 启动流式监控
go func() {
defer close(doneCh)
url := fmt.Sprintf("http://%s/stream?test_id=%s&client_id=%s", c.serverAddr, testID, c.clientID)
// 设置超时时间 30分钟
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
errCh <- fmt.Errorf("创建请求失败: %v", err)
return
}
resp, err := c.httpClient.Do(req)
if err != nil {
errCh <- fmt.Errorf("发送流请求失败: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
errCh <- fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode)
return
}
// 创建事件扫描器
scanner := bufio.NewScanner(resp.Body)
// 处理Server-Sent Events
for scanner.Scan() {
line := scanner.Text()
// 跳过空行
if line == "" {
continue
}
// 处理数据行
if strings.HasPrefix(line, "data: ") {
data := line[6:] // 去掉 "data: " 前缀
// 解析更新数据
var update model.StreamUpdate
if err := json.Unmarshal([]byte(data), &update); err != nil {
c.logger.Warnf("解析更新数据失败: %v", err)
continue
}
// 处理不同类型的更新
switch update.Type {
case "status":
c.logger.Infof("测试状态: 进度: %.2f%%, 阶段: %s",
update.Progress, update.CurrentPhase)
case "error":
c.logger.Errorf("测试错误: %s", update.Message)
case "completion":
c.logger.Infof("测试完成: %s", update.Message)
return
case "integrity":
if info, ok := update.Data.(map[string]interface{}); ok {
c.logger.Infof("数据完整性: 可用块: %.0f, 损坏块: %.0f, 丢失块: %.0f, 数据丢失: %.2f MB",
info["available_blocks"], info["corrupted_blocks"], info["missing_blocks"], info["data_loss_mb"])
}
}
}
}
if err := scanner.Err(); err != nil {
// 如果是客户端主动断开连接,不报错
if !strings.Contains(err.Error(), "use of closed network connection") {
errCh <- fmt.Errorf("读取流数据失败: %v", err)
}
}
}()
// 等待流结束或出错
select {
case err := <-errCh:
// 如果流模式失败,回退到轮询模式
c.logger.Warnf("流式监控失败: %v, 回退到轮询模式", err)
return c.pollTestStatus(testID)
case <-doneCh:
c.logger.Info("流式监控完成")
return nil
}
}
// pollTestStatus 使用轮询方式监控测试状态
func (c *Client) pollTestStatus(testID string) error {
c.logger.Infof("使用轮询监控测试状态: %s", testID)
for {
// 获取测试状态
url := fmt.Sprintf("http://%s/status?test_id=%s", c.serverAddr, testID)
@ -231,6 +334,82 @@ func (c *Client) RunAllTests(concurrent bool) error {
return nil
}
// CheckDataIntegrity 检查断电后的数据完整性
func (c *Client) CheckDataIntegrity(testID string) error {
c.logger.Info("检查数据完整性")
url := fmt.Sprintf("http://%s/integrity?test_id=%s", c.serverAddr, testID)
resp, err := c.httpClient.Get(url)
if err != nil {
return fmt.Errorf("获取数据完整性信息失败: %v", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode)
}
var integrity model.IntegrityInfo
if err := json.NewDecoder(resp.Body).Decode(&integrity); err != nil {
return fmt.Errorf("解析响应失败: %v", err)
}
c.logger.Infof("数据完整性检查结果:")
c.logger.Infof(" 测试ID: %s", integrity.TestID)
c.logger.Infof(" 检查时间: %s", integrity.CheckTime.Format("2006-01-02 15:04:05"))
c.logger.Infof(" 总块数: %d", integrity.TotalBlocks)
c.logger.Infof(" 可用块数: %d (%.2f%%)", integrity.AvailableBlocks, float64(integrity.AvailableBlocks)/float64(integrity.TotalBlocks)*100)
c.logger.Infof(" 损坏块数: %d (%.2f%%)", integrity.CorruptedBlocks, float64(integrity.CorruptedBlocks)/float64(integrity.TotalBlocks)*100)
c.logger.Infof(" 丢失块数: %d (%.2f%%)", integrity.MissingBlocks, float64(integrity.MissingBlocks)/float64(integrity.TotalBlocks)*100)
c.logger.Infof(" 数据丢失: %.2f MB", integrity.DataLossMB)
c.logger.Infof(" 恢复成功: %v", integrity.RecoverySuccess)
return nil
}
// RequestRecoveryTest 请求执行恢复测试
func (c *Client) RequestRecoveryTest(testType string) error {
c.logger.Infof("请求恢复测试: %s", testType)
// 准备请求数据
req := struct {
TestType string `json:"test_type"`
TestDir string `json:"test_dir,omitempty"`
}{
TestType: testType,
}
// 序列化请求数据
reqData, err := json.Marshal(req)
if err != nil {
return fmt.Errorf("序列化请求数据失败: %v", err)
}
// 发送请求
url := fmt.Sprintf("http://%s/recovery", c.serverAddr)
resp, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(reqData))
if err != nil {
return fmt.Errorf("发送请求失败: %v", err)
}
defer resp.Body.Close()
// 检查响应状态
if resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode)
}
// 解析响应
var testResp model.TestResponse
if err := json.NewDecoder(resp.Body).Decode(&testResp); err != nil {
return fmt.Errorf("解析响应失败: %v", err)
}
c.logger.Infof("恢复测试请求已接受RequestID: %s", testResp.RequestID)
// 监控测试状态
return c.MonitorTestStatus(testResp.RequestID)
}
func main() {
flag.Parse()
@ -249,7 +428,7 @@ func main() {
level = logrus.InfoLevel
}
// 初始化日志
// 配置日志
logger := logrus.New()
logger.SetLevel(level)
logger.SetFormatter(&logrus.TextFormatter{
@ -257,20 +436,22 @@ func main() {
TimestampFormat: "2006-01-02 15:04:05",
})
// 添加日志文件输出
logFile, err := os.OpenFile("client.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err == nil {
multiWriter := io.MultiWriter(os.Stdout, logFile)
logger.SetOutput(multiWriter)
} else {
logger.Warnf("无法打开日志文件: %v", err)
}
// 加载配置
logger.Infof("加载配置文件: %s", configFile)
cfg, err := config.Load(configFile)
if err != nil {
logger.Fatalf("加载配置失败: %v", err)
}
// 初始化日志文件
if cfg.Client.LogFile != "" {
utils.InitLogger(cfg.Client.LogFile, level)
logger = utils.Logger
}
// 设置超时时间
// 如果指定了timeout覆盖配置
if timeout > 0 {
cfg.Client.TimeoutSec = timeout
}
@ -283,16 +464,33 @@ func main() {
logger.Fatalf("服务器健康检查失败: %v", err)
}
// 运行测试
if testType == "all" {
if err := client.RunAllTests(concurrent); err != nil {
logger.Fatalf("运行所有测试失败: %v", err)
// 处理恢复测试
if recovery {
if testType == "" {
testType = "power_loss"
}
} else {
if err := client.RunTest(testType, dataSizeMB, blockSize); err != nil {
logger.Fatalf("运行测试 %s 失败: %v", testType, err)
logger.Infof("开始执行恢复测试: %s", testType)
if err := client.RequestRecoveryTest(testType); err != nil {
logger.Fatalf("恢复测试失败: %v", err)
}
logger.Info("恢复测试完成")
return
}
logger.Info("所有测试已完成")
// 执行指定的测试
if testType != "" {
if testType == "all" {
if err := client.RunAllTests(concurrent); err != nil {
logger.Fatalf("执行所有测试失败: %v", err)
}
} else {
if err := client.RunTest(testType, dataSizeMB, blockSize); err != nil {
logger.Fatalf("执行测试 %s 失败: %v", testType, err)
}
}
} else {
logger.Fatal("未指定测试类型,请使用 -test 参数")
}
logger.Info("所有测试完成")
}

View File

@ -32,23 +32,29 @@ func init() {
// TestRunner 测试运行器
type TestRunner struct {
config *config.Config
logger *logrus.Logger
factory *testcase.TestCaseFactory
tests map[string]testcase.TestCase
testsMu sync.RWMutex
testResult map[string]*model.TestResult
resultMu sync.RWMutex
config *config.Config
logger *logrus.Logger
factory *testcase.TestCaseFactory
tests map[string]testcase.TestCase
testsMu sync.RWMutex
testResult map[string]*model.TestResult
resultMu sync.RWMutex
streams map[string]map[string]http.ResponseWriter
streamsMu sync.RWMutex
integrityInfo map[string]*model.IntegrityInfo
integrityMu sync.RWMutex
}
// NewTestRunner 创建测试运行器
func NewTestRunner(cfg *config.Config, logger *logrus.Logger) *TestRunner {
return &TestRunner{
config: cfg,
logger: logger,
factory: testcase.NewTestCaseFactory(cfg, logger),
tests: make(map[string]testcase.TestCase),
testResult: make(map[string]*model.TestResult),
config: cfg,
logger: logger,
factory: testcase.NewTestCaseFactory(cfg, logger),
tests: make(map[string]testcase.TestCase),
testResult: make(map[string]*model.TestResult),
streams: make(map[string]map[string]http.ResponseWriter),
integrityInfo: make(map[string]*model.IntegrityInfo),
}
}
@ -75,18 +81,42 @@ func (r *TestRunner) RunTest(testType string) (*model.TestResult, error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 发送测试开始状态更新
r.sendStatusUpdate(test)
// 设置测试环境
r.logger.Info("设置测试环境")
if err := test.Setup(ctx); err != nil {
r.logger.Errorf("设置测试环境失败: %v", err)
r.sendErrorUpdate(testID, fmt.Sprintf("设置测试环境失败: %v", err))
return nil, err
}
// 启动状态监控协程
statusDone := make(chan struct{})
go func() {
defer close(statusDone)
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// 发送状态更新
r.sendStatusUpdate(test)
}
}
}()
// 运行测试
r.logger.Info("运行测试")
result, err := test.Run(ctx)
if err != nil {
r.logger.Errorf("测试运行失败: %v", err)
r.sendErrorUpdate(testID, fmt.Sprintf("测试运行失败: %v", err))
// 尝试清理
cleanupErr := test.Cleanup(ctx)
if cleanupErr != nil {
@ -99,9 +129,14 @@ func (r *TestRunner) RunTest(testType string) (*model.TestResult, error) {
r.logger.Info("清理测试环境")
if err := test.Cleanup(ctx); err != nil {
r.logger.Errorf("测试清理失败: %v", err)
r.sendErrorUpdate(testID, fmt.Sprintf("测试清理失败: %v", err))
return nil, err
}
// 停止状态监控
cancel()
<-statusDone
// 存储测试结果
r.resultMu.Lock()
r.testResult[testID] = result
@ -112,10 +147,52 @@ func (r *TestRunner) RunTest(testType string) (*model.TestResult, error) {
delete(r.tests, testID)
r.testsMu.Unlock()
// 发送完成通知
r.sendCompletionUpdate(testID, result)
r.logger.Infof("测试 %s 完成", testType)
return result, nil
}
// sendStatusUpdate 发送状态更新
func (r *TestRunner) sendStatusUpdate(test testcase.TestCase) {
status := test.Status()
update := model.StreamUpdate{
Type: "status",
TestID: status.TestID,
Timestamp: time.Now(),
Progress: status.Progress,
CurrentPhase: status.CurrentPhase,
Message: status.Message,
Data: status,
}
r.SendStreamUpdate(status.TestID, update)
}
// sendErrorUpdate 发送错误更新
func (r *TestRunner) sendErrorUpdate(testID, message string) {
update := model.StreamUpdate{
Type: "error",
TestID: testID,
Timestamp: time.Now(),
Message: message,
}
r.SendStreamUpdate(testID, update)
}
// sendCompletionUpdate 发送完成更新
func (r *TestRunner) sendCompletionUpdate(testID string, result *model.TestResult) {
update := model.StreamUpdate{
Type: "completion",
TestID: testID,
Timestamp: time.Now(),
Progress: 100,
Message: "测试完成",
Data: result,
}
r.SendStreamUpdate(testID, update)
}
// GetTestStatus 获取测试状态
func (r *TestRunner) GetTestStatus(testID string) *model.TestStatus {
r.testsMu.RLock()
@ -139,6 +216,72 @@ func (r *TestRunner) GetAllTestStatus() []*model.TestStatus {
return statuses
}
// RegisterStream 注册流式连接
func (r *TestRunner) RegisterStream(testID, clientID string, w http.ResponseWriter) {
r.streamsMu.Lock()
defer r.streamsMu.Unlock()
if _, ok := r.streams[testID]; !ok {
r.streams[testID] = make(map[string]http.ResponseWriter)
}
r.streams[testID][clientID] = w
r.logger.Infof("客户端 %s 已连接到测试 %s 的流", clientID, testID)
}
// UnregisterStream 注销流式连接
func (r *TestRunner) UnregisterStream(testID, clientID string) {
r.streamsMu.Lock()
defer r.streamsMu.Unlock()
if clients, ok := r.streams[testID]; ok {
delete(clients, clientID)
r.logger.Infof("客户端 %s 已断开与测试 %s 的流连接", clientID, testID)
}
}
// SendStreamUpdate 发送流式更新
func (r *TestRunner) SendStreamUpdate(testID string, update interface{}) {
r.streamsMu.RLock()
defer r.streamsMu.RUnlock()
clients, ok := r.streams[testID]
if !ok || len(clients) == 0 {
return
}
data, err := json.Marshal(update)
if err != nil {
r.logger.Errorf("无法序列化流更新: %v", err)
return
}
for clientID, w := range clients {
// 使用Server-Sent Events格式
_, err := fmt.Fprintf(w, "data: %s\n\n", data)
if err != nil {
r.logger.Warnf("向客户端 %s 发送更新失败: %v", clientID, err)
} else {
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
}
}
}
// SaveIntegrityInfo 保存完整性信息
func (r *TestRunner) SaveIntegrityInfo(testID string, info *model.IntegrityInfo) {
r.integrityMu.Lock()
defer r.integrityMu.Unlock()
r.integrityInfo[testID] = info
}
// GetIntegrityInfo 获取完整性信息
func (r *TestRunner) GetIntegrityInfo(testID string) *model.IntegrityInfo {
r.integrityMu.RLock()
defer r.integrityMu.RUnlock()
return r.integrityInfo[testID]
}
// StartServer 启动HTTP服务器
func StartServer(cfg *config.Config, runner *TestRunner, logger *logrus.Logger) *http.Server {
mux := http.NewServeMux()
@ -217,6 +360,152 @@ func StartServer(cfg *config.Config, runner *TestRunner, logger *logrus.Logger)
json.NewEncoder(w).Encode(status)
})
// 新增: 实时数据进度流式API
mux.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) {
testID := r.URL.Query().Get("test_id")
if testID == "" {
http.Error(w, "Missing test_id", http.StatusBadRequest)
return
}
// 设置响应头支持SSE (Server-Sent Events)
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// 创建完成通道
doneCh := make(chan struct{})
defer close(doneCh)
// 注册客户端连接
clientID := r.URL.Query().Get("client_id")
runner.RegisterStream(testID, clientID, w)
defer runner.UnregisterStream(testID, clientID)
// 保持连接直到客户端断开
select {
case <-r.Context().Done():
runner.logger.Infof("connection closed by client %s", clientID)
return
case <-doneCh:
runner.logger.Infof("connection closed by server for client %s", clientID)
return
}
})
// 新增: 数据完整性检测API
mux.HandleFunc("/integrity", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
testID := r.URL.Query().Get("test_id")
if testID == "" {
http.Error(w, "Missing test_id", http.StatusBadRequest)
return
}
// 获取测试的数据完整性信息
integrityInfo := runner.GetIntegrityInfo(testID)
if integrityInfo == nil {
http.Error(w, "Integrity info not found", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(integrityInfo)
})
// 新增: 恢复测试API用于断电测试后的恢复与校验
mux.HandleFunc("/recovery", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
var req struct {
TestType string `json:"test_type"`
TestDir string `json:"test_dir"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
logger.Infof("收到恢复测试请求: %+v", req)
// 创建恢复测试实例
test, err := runner.factory.CreateTestCase(req.TestType)
if err != nil || test == nil {
http.Error(w, fmt.Sprintf("无法创建测试实例: %v", err), http.StatusBadRequest)
return
}
// 获取测试ID
testID := test.Status().TestID
// 执行恢复和数据完整性检查
go func() {
ctx := context.Background()
// 设置测试环境
logger.Info("设置恢复测试环境")
if err := test.Setup(ctx); err != nil {
logger.Errorf("设置恢复测试环境失败: %v", err)
runner.sendErrorUpdate(testID, fmt.Sprintf("设置恢复测试环境失败: %v", err))
return
}
// 数据完整性检查
logger.Info("执行数据完整性检查")
runner.sendStatusUpdate(test)
// 检查并获取数据完整性信息
if powerTest, ok := test.(*testcase.PowerLossTest); ok {
integrityInfo := powerTest.CheckIntegrity()
// 保存完整性信息
runner.SaveIntegrityInfo(testID, integrityInfo)
// 发送完整性信息
update := model.StreamUpdate{
Type: "integrity",
TestID: testID,
Timestamp: time.Now(),
Message: "数据完整性检查完成",
Data: integrityInfo,
}
runner.SendStreamUpdate(testID, update)
logger.Infof("恢复测试完成: 丢失数据: %.2f MB", integrityInfo.DataLossMB)
} else {
logger.Error("不是断电测试实例,无法执行数据完整性检查")
runner.sendErrorUpdate(testID, "不是断电测试实例,无法执行数据完整性检查")
}
// 清理测试环境
logger.Info("清理恢复测试环境")
if err := test.Cleanup(ctx); err != nil {
logger.Errorf("清理恢复测试环境失败: %v", err)
}
}()
// 返回接受响应
resp := model.TestResponse{
RequestID: testID,
Status: "accepted",
Message: "恢复测试已接受并开始执行",
ServerTime: time.Now(),
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(resp)
})
// 启动服务器
addr := fmt.Sprintf("%s:%d", cfg.Server.ListenAddr, cfg.Server.Port)
server := &http.Server{

View File

@ -114,3 +114,38 @@ type HealthStatus struct {
MemoryUsage float64 `json:"memory_usage"`
Message string `json:"message,omitempty"`
}
// IntegrityInfo 表示数据完整性信息
type IntegrityInfo struct {
TestID string `json:"test_id"`
TestType string `json:"test_type"`
CheckTime time.Time `json:"check_time"`
TotalBlocks int `json:"total_blocks"`
ExpectedBlocks int `json:"expected_blocks"`
AvailableBlocks int `json:"available_blocks"`
CorruptedBlocks int `json:"corrupted_blocks"`
MissingBlocks int `json:"missing_blocks"`
DataLossMB float64 `json:"data_loss_mb"`
RecoverySuccess bool `json:"recovery_success"`
RecoveryDuration float64 `json:"recovery_duration_ms"`
BlocksMap map[int]BlockStatus `json:"blocks_map,omitempty"`
}
// BlockStatus 表示数据块状态
type BlockStatus struct {
Available bool `json:"available"`
Corrupted bool `json:"corrupted"`
Checksum string `json:"checksum,omitempty"`
FilePath string `json:"file_path,omitempty"`
}
// StreamUpdate 表示流式更新的数据
type StreamUpdate struct {
Type string `json:"type"` // "status", "progress", "integrity", "error"
TestID string `json:"test_id"`
Timestamp time.Time `json:"timestamp"`
Progress float64 `json:"progress,omitempty"`
CurrentPhase string `json:"current_phase,omitempty"`
Message string `json:"message,omitempty"`
Data interface{} `json:"data,omitempty"`
}

View File

@ -2,9 +2,13 @@ package testcase
import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"os"
"path/filepath"
"sync"
"syscall"
"time"
"plp-test/internal/config"
@ -26,6 +30,9 @@ type PowerLossTest struct {
blocks []*model.TestBlock
recoveryTimeMs float64
powerCutInfo *model.PowerCutInfo
integrityInfo *model.IntegrityInfo
blocksMu sync.RWMutex // 保护数据块访问
blocksMap map[int]model.BlockStatus // 数据块状态映射
}
// NewPowerLossTest 创建断电测试
@ -39,8 +46,9 @@ func NewPowerLossTest(cfg *config.Config, logger *logrus.Logger) *PowerLossTest
return &PowerLossTest{
BaseTestCase: baseTest,
blockSize: utils.MBToBytes(float64(cfg.Test.BlockSize)),
totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.MBToBytes(float64(cfg.Test.BlockSize)),
blockSize: utils.KBToBytes(float64(cfg.Test.BlockSize)),
totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.KBToBytes(float64(cfg.Test.BlockSize)),
blocksMap: make(map[int]model.BlockStatus),
}
}
@ -95,6 +103,21 @@ func (t *PowerLossTest) Setup(ctx context.Context) error {
t.blocks = make([]*model.TestBlock, 0, t.totalBlocks)
t.powerCutInfo = &model.PowerCutInfo{}
// 初始化完整性信息
t.integrityInfo = &model.IntegrityInfo{
TestID: t.testID,
TestType: t.name,
CheckTime: time.Time{},
TotalBlocks: t.totalBlocks,
ExpectedBlocks: t.totalBlocks,
AvailableBlocks: 0,
CorruptedBlocks: 0,
MissingBlocks: 0,
DataLossMB: 0,
RecoverySuccess: false,
BlocksMap: make(map[int]model.BlockStatus),
}
t.setProgress(10)
return nil
}
@ -105,18 +128,17 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) {
startTime := time.Now()
var totalBytesWritten int
// 写入阶段 - 只写入一部分数据,断电前
t.setMessage("写入数据 (断电前)")
blocksBeforePowerCut := t.totalBlocks / 2 // 写入一半的数据块
// 第一阶段 - 持续写入数据,直到手动断电
t.setMessage("写入数据 (请在适当时手动断电)")
for i := 0; i < blocksBeforePowerCut; i++ {
for i := 0; i < t.totalBlocks; i++ {
select {
case <-ctx.Done():
t.setStatus(StatusAborted)
return nil, ctx.Err()
default:
// 生成随机数据
data, err := utils.GenerateRandomData(t.blockSize)
data, err := utils.GenerateRandomData(utils.KBToBytes(float64(t.blockSize)))
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("生成随机数据失败: %v", err)
@ -124,11 +146,32 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) {
// 创建测试数据块
block := model.NewTestBlock(data, i)
// 添加到数据块列表
t.blocksMu.Lock()
t.blocks = append(t.blocks, block)
// 记录数据块状态
blockStatus := model.BlockStatus{
Available: true,
Corrupted: false,
Checksum: block.Checksum,
FilePath: fmt.Sprintf("block_%d.dat", i),
}
t.blocksMap[i] = blockStatus
t.blocksMu.Unlock()
// 写入文件
filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i))
err = os.WriteFile(filePath, data, 0644)
// direct IO
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|syscall.O_DIRECT, 0644)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("写入文件 %s 失败: %v", filePath, err)
}
defer file.Close()
_, err = file.Write(data)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("写入文件 %s 失败: %v", filePath, err)
@ -137,182 +180,40 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) {
t.writtenBlocks++
totalBytesWritten += len(data)
// 每写入一定数量的块后执行同步
if i > 0 && i%10 == 0 {
t.setMessage(fmt.Sprintf("同步数据到磁盘 (已写入 %d/%d 块)", i, t.totalBlocks))
_, err := utils.ExecuteCommand("sync")
if err != nil {
t.logger.Warnf("执行sync命令失败: %v", err)
}
}
// 更新进度
progress := float64(i+1) / float64(t.totalBlocks) * 30 // 第一阶段占30%
progress := float64(i+1) / float64(t.totalBlocks) * 100
t.setProgress(progress)
// 每写入一定数量的块后暂停一下,给用户断电的机会
if i > 0 && i%100 == 0 {
t.setMessage(fmt.Sprintf("已写入 %d/%d 块数据, 共 %.2f MB", i, t.totalBlocks, float64(i*t.blockSize)/(1024*1024)))
time.Sleep(1 * time.Second)
}
}
}
// 记录写入数据的时间点
// 记录写入数据的信息
t.powerCutInfo.BlocksWritten = t.writtenBlocks
// 执行同步以确保部分数据已经写入到磁盘,但部分仍在缓存中
t.setMessage("执行sync同步部分数据到磁盘")
// 完成所有数据写入后,同步到磁盘
t.setMessage("同步所有数据到磁盘")
_, err := utils.ExecuteCommand("sync")
if err != nil {
t.logger.Warnf("执行sync命令失败: %v", err)
}
// 再写入一些数据但不同步,保证有缓存中的数据
t.setMessage("写入额外数据到缓存中 (这些数据可能会在断电后丢失)")
additionalBlocks := t.totalBlocks / 4 // 额外写入1/4的数据块
for i := blocksBeforePowerCut; i < blocksBeforePowerCut+additionalBlocks; i++ {
select {
case <-ctx.Done():
t.setStatus(StatusAborted)
return nil, ctx.Err()
default:
// 生成随机数据
data, err := utils.GenerateRandomData(t.blockSize)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("生成随机数据失败: %v", err)
}
// 创建测试数据块
block := model.NewTestBlock(data, i)
t.blocks = append(t.blocks, block)
// 写入文件
filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i))
err = os.WriteFile(filePath, data, 0644)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("写入文件 %s 失败: %v", filePath, err)
}
t.writtenBlocks++
totalBytesWritten += len(data)
// 更新进度
progress := 30 + float64(i-blocksBeforePowerCut+1)/float64(additionalBlocks)*10 // 额外写入占10%
t.setProgress(progress)
}
}
// 模拟断电
t.setMessage("模拟断电...")
t.powerCutInfo.Timestamp = time.Now()
err = t.casManager.SimulatePowerCut(t.config.Server.CacheInstanceID)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("模拟断电失败: %v", err)
}
// 模拟等待一段时间,就像真正断电后的重启
t.setMessage("模拟系统重启中...")
time.Sleep(2 * time.Second)
// 恢复阶段
t.setMessage("恢复阶段")
recoveryStart := time.Now()
// 重新创建缓存实例
id := t.config.Server.CacheInstanceID
nvme := t.config.Server.DevicesNVMe
hdd := t.config.Server.DevicesHDD
// 尝试修复/加载现有缓存
t.setMessage("尝试加载和修复缓存")
err = t.casManager.RepairCache(id, nvme)
if err != nil {
// 如果修复失败,尝试重新创建
t.logger.Warnf("修复缓存失败,尝试重新创建: %v", err)
err = t.casManager.CreateCacheInstance(id, nvme, hdd, "wb")
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("断电后重新创建缓存实例失败: %v", err)
}
}
// 获取缓存设备路径
cacheDevice := fmt.Sprintf("/dev/cas%s-1", id)
// 重新挂载缓存设备
mountPoint := t.config.Server.MountPoint
t.setMessage(fmt.Sprintf("重新挂载缓存设备到 %s", mountPoint))
err = t.casManager.MountDevice(cacheDevice, mountPoint)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("断电后重新挂载缓存设备失败: %v", err)
}
// 记录恢复时间
t.recoveryTimeMs = float64(time.Since(recoveryStart).Milliseconds())
t.setProgress(50)
// 验证阶段 - 检查断电前写入的数据是否完整
t.setMessage("验证阶段 - 检查数据完整性")
t.corruptedBlocks = 0
for i, block := range t.blocks {
// 检查文件是否存在
filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i))
if !utils.FileExists(filePath) {
t.logger.Warnf("文件 %s 在断电后丢失", filePath)
t.corruptedBlocks++
continue
}
// 读取文件数据
data, err := os.ReadFile(filePath)
if err != nil {
t.logger.Warnf("读取文件 %s 失败: %v", filePath, err)
t.corruptedBlocks++
continue
}
// 验证数据完整性
if string(data) != string(block.Data) {
t.logger.Warnf("文件 %s 数据损坏", filePath)
t.corruptedBlocks++
continue
}
t.verifiedBlocks++
// 更新进度
progress := 50 + float64(i+1)/float64(len(t.blocks))*40 // 验证占40%
t.setProgress(progress)
}
// 写入断电后的额外数据
t.setMessage("断电后写入额外数据")
for i := t.writtenBlocks; i < t.totalBlocks; i++ {
select {
case <-ctx.Done():
t.setStatus(StatusAborted)
return nil, ctx.Err()
default:
// 生成随机数据
data, err := utils.GenerateRandomData(t.blockSize)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("生成随机数据失败: %v", err)
}
// 写入文件
filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i))
err = os.WriteFile(filePath, data, 0644)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("断电后写入文件 %s 失败: %v", filePath, err)
}
// 更新进度
progress := 90 + float64(i-t.writtenBlocks+1)/float64(t.totalBlocks-t.writtenBlocks)*10 // 最后10%
t.setProgress(progress)
}
}
// 记录断电信息
t.powerCutInfo.RecoverySuccess = t.corruptedBlocks == 0
t.powerCutInfo.DataLossMB = utils.BytesToMB(t.corruptedBlocks * t.blockSize)
t.setProgress(100)
t.setStatus(StatusCompleted)
t.setMessage("断电测试完成")
t.setMessage("数据写入完成")
// 构造测试结果
result := t.getTestResult()
@ -328,6 +229,86 @@ func (t *PowerLossTest) Run(ctx context.Context) (*model.TestResult, error) {
return result, nil
}
// CheckIntegrity 检查数据完整性
func (t *PowerLossTest) CheckIntegrity() *model.IntegrityInfo {
t.setMessage("开始检查数据完整性")
t.integrityInfo.CheckTime = time.Now()
// 重置计数器
t.integrityInfo.AvailableBlocks = 0
t.integrityInfo.CorruptedBlocks = 0
t.integrityInfo.MissingBlocks = 0
// 为所有块创建状态记录
for i := 0; i < t.totalBlocks; i++ {
filePath := filepath.Join(t.testDir, fmt.Sprintf("block_%d.dat", i))
if !utils.FileExists(filePath) {
// 文件不存在
t.integrityInfo.MissingBlocks++
t.integrityInfo.BlocksMap[i] = model.BlockStatus{
Available: false,
Corrupted: false,
FilePath: fmt.Sprintf("block_%d.dat", i),
}
continue
}
// 读取文件数据
data, err := os.ReadFile(filePath)
if err != nil {
// 无法读取文件
t.integrityInfo.CorruptedBlocks++
t.integrityInfo.BlocksMap[i] = model.BlockStatus{
Available: true,
Corrupted: true,
FilePath: fmt.Sprintf("block_%d.dat", i),
}
continue
}
// 计算和验证校验和
hash := sha256.Sum256(data)
checksum := hex.EncodeToString(hash[:])
var blockChecksum string
t.blocksMu.RLock()
if i < len(t.blocks) && t.blocks[i] != nil {
blockChecksum = t.blocks[i].Checksum
}
t.blocksMu.RUnlock()
if blockChecksum != "" && checksum != blockChecksum {
// 数据损坏
t.integrityInfo.CorruptedBlocks++
t.integrityInfo.BlocksMap[i] = model.BlockStatus{
Available: true,
Corrupted: true,
Checksum: checksum,
FilePath: fmt.Sprintf("block_%d.dat", i),
}
} else {
// 数据完好
t.integrityInfo.AvailableBlocks++
t.integrityInfo.BlocksMap[i] = model.BlockStatus{
Available: true,
Corrupted: false,
Checksum: checksum,
FilePath: fmt.Sprintf("block_%d.dat", i),
}
}
}
// 计算数据丢失量
t.integrityInfo.DataLossMB = utils.BytesToMB((t.integrityInfo.MissingBlocks + t.integrityInfo.CorruptedBlocks) * t.blockSize)
t.integrityInfo.RecoverySuccess = t.integrityInfo.CorruptedBlocks == 0 && t.integrityInfo.MissingBlocks == 0
t.setMessage(fmt.Sprintf("数据完整性检查完成: %d 个块正常, %d 个块丢失, %d 个块损坏",
t.integrityInfo.AvailableBlocks, t.integrityInfo.MissingBlocks, t.integrityInfo.CorruptedBlocks))
return t.integrityInfo
}
// Cleanup 清理测试环境
func (t *PowerLossTest) Cleanup(ctx context.Context) error {
if err := t.BaseTestCase.Cleanup(ctx); err != nil {

View File

@ -40,8 +40,8 @@ func NewRandomWriteTest(cfg *config.Config, logger *logrus.Logger) *RandomWriteT
return &RandomWriteTest{
BaseTestCase: baseTest,
blockSize: utils.MBToBytes(float64(cfg.Test.BlockSize)),
totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.MBToBytes(float64(cfg.Test.BlockSize)),
blockSize: utils.KBToBytes(float64(cfg.Test.BlockSize)),
totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.KBToBytes(float64(cfg.Test.BlockSize)),
blocks: make(map[int]*model.TestBlock),
writeSequence: make([]int, 0),
writeLatencies: make([]float64, 0),
@ -128,7 +128,7 @@ func (t *RandomWriteTest) Run(ctx context.Context) (*model.TestResult, error) {
return nil, ctx.Err()
default:
// 生成随机数据
data, err := utils.GenerateRandomData(t.blockSize)
data, err := utils.GenerateRandomData(t.blockSize * 1024)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("生成随机数据失败: %v", err)

View File

@ -38,8 +38,8 @@ func NewSequentialWriteTest(cfg *config.Config, logger *logrus.Logger) *Sequenti
return &SequentialWriteTest{
BaseTestCase: baseTest,
blockSize: utils.MBToBytes(float64(cfg.Test.BlockSize)),
totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.MBToBytes(float64(cfg.Test.BlockSize)),
blockSize: utils.KBToBytes(float64(cfg.Test.BlockSize)),
totalBlocks: utils.MBToBytes(float64(cfg.Test.DataSizeMB)) / utils.KBToBytes(float64(cfg.Test.BlockSize)),
}
}
@ -114,7 +114,7 @@ func (t *SequentialWriteTest) Run(ctx context.Context) (*model.TestResult, error
return nil, ctx.Err()
default:
// 生成随机数据
data, err := utils.GenerateRandomData(t.blockSize)
data, err := utils.GenerateRandomData(t.blockSize * 1024)
if err != nil {
t.setStatus(StatusFailed)
return nil, fmt.Errorf("生成随机数据失败: %v", err)

View File

@ -57,6 +57,11 @@ func MBToBytes(mb float64) int {
return int(mb * 1024 * 1024)
}
// KBToBytes 将KB转换为字节
func KBToBytes(kb float64) int {
return int(kb * 1024)
}
// FormatDuration 格式化持续时间
func FormatDuration(d time.Duration) string {
if d < time.Minute {