499 lines
13 KiB
Go
499 lines
13 KiB
Go
package main
|
||
|
||
import (
|
||
"bytes"
|
||
"encoding/json"
|
||
"flag"
|
||
"fmt"
|
||
"io"
|
||
"net/http"
|
||
"os"
|
||
"time"
|
||
|
||
"plp-test/internal/config"
|
||
"plp-test/internal/model"
|
||
|
||
"bufio"
|
||
"strings"
|
||
|
||
"github.com/google/uuid"
|
||
"github.com/sirupsen/logrus"
|
||
)
|
||
|
||
var (
|
||
configFile string
|
||
logLevel string
|
||
serverAddr string
|
||
testType string
|
||
timeout int
|
||
dataSizeMB int
|
||
blockSize int
|
||
concurrent bool
|
||
recovery bool
|
||
)
|
||
|
||
func init() {
|
||
flag.StringVar(&configFile, "config", "config.yaml", "配置文件路径")
|
||
flag.StringVar(&logLevel, "log-level", "info", "日志级别 (debug, info, warn, error)")
|
||
flag.StringVar(&serverAddr, "server", "", "服务器地址,格式为 host:port")
|
||
flag.StringVar(&testType, "test", "sequential", "测试类型 (sequential, random, mixed, concurrent, power_loss, stability, all)")
|
||
flag.IntVar(&timeout, "timeout", 0, "测试超时时间(秒)")
|
||
flag.IntVar(&dataSizeMB, "data-size", 0, "测试数据大小(MB)")
|
||
flag.IntVar(&blockSize, "block-size", 0, "数据块大小(KB)")
|
||
flag.BoolVar(&concurrent, "concurrent", false, "是否并发执行所有测试")
|
||
flag.BoolVar(&recovery, "recovery", false, "是否请求恢复测试")
|
||
}
|
||
|
||
// Client 客户端
|
||
type Client struct {
|
||
config *config.Config
|
||
logger *logrus.Logger
|
||
httpClient *http.Client
|
||
serverAddr string
|
||
clientID string
|
||
}
|
||
|
||
// NewClient 创建客户端
|
||
func NewClient(cfg *config.Config, logger *logrus.Logger, serverAddr string) *Client {
|
||
if serverAddr == "" {
|
||
serverAddr = cfg.Client.ServerAddr
|
||
}
|
||
|
||
return &Client{
|
||
config: cfg,
|
||
logger: logger,
|
||
httpClient: &http.Client{
|
||
Timeout: time.Duration(cfg.Client.TimeoutSec) * time.Second,
|
||
},
|
||
serverAddr: serverAddr,
|
||
clientID: uuid.New().String(),
|
||
}
|
||
}
|
||
|
||
// RunTest 运行测试
|
||
func (c *Client) RunTest(testType string, dataSizeMB, blockSize int) error {
|
||
c.logger.Infof("运行测试 %s", testType)
|
||
|
||
// 准备请求数据
|
||
req := model.TestRequest{
|
||
TestType: testType,
|
||
DataSizeMB: dataSizeMB,
|
||
BlockSize: blockSize,
|
||
Concurrency: c.config.Client.Concurrency,
|
||
ClientID: c.clientID,
|
||
RequestTime: time.Now(),
|
||
Parameters: make(map[string]string),
|
||
}
|
||
|
||
// 设置默认值
|
||
if req.DataSizeMB == 0 {
|
||
req.DataSizeMB = c.config.Test.DataSizeMB
|
||
}
|
||
if req.BlockSize == 0 {
|
||
req.BlockSize = c.config.Test.BlockSize
|
||
}
|
||
|
||
// 序列化请求数据
|
||
reqData, err := json.Marshal(req)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化请求数据失败: %v", err)
|
||
}
|
||
|
||
// 发送请求
|
||
url := fmt.Sprintf("http://%s/run", c.serverAddr)
|
||
resp, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(reqData))
|
||
if err != nil {
|
||
return fmt.Errorf("发送请求失败: %v", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
// 检查响应状态
|
||
if resp.StatusCode != http.StatusAccepted {
|
||
return fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode)
|
||
}
|
||
|
||
// 解析响应
|
||
var testResp model.TestResponse
|
||
if err := json.NewDecoder(resp.Body).Decode(&testResp); err != nil {
|
||
return fmt.Errorf("解析响应失败: %v", err)
|
||
}
|
||
|
||
c.logger.Infof("测试请求已接受,RequestID: %s", testResp.RequestID)
|
||
|
||
// 监控测试状态
|
||
return c.MonitorTestStatus(testResp.RequestID)
|
||
}
|
||
|
||
// MonitorTestStatus 监控测试状态
|
||
func (c *Client) MonitorTestStatus(testID string) error {
|
||
c.logger.Infof("监控测试状态: %s", testID)
|
||
|
||
// 使用流式API监控进度
|
||
errCh := make(chan error, 1)
|
||
doneCh := make(chan struct{})
|
||
|
||
// 启动流式监控
|
||
go func() {
|
||
defer close(doneCh)
|
||
|
||
url := fmt.Sprintf("http://%s/stream?test_id=%s&client_id=%s", c.serverAddr, testID, c.clientID)
|
||
|
||
// 创建一个没有超时的HTTP客户端专门用于流式连接
|
||
noTimeoutClient := &http.Client{
|
||
Timeout: 0, // 0表示无超时
|
||
}
|
||
|
||
req, err := http.NewRequest("GET", url, nil)
|
||
if err != nil {
|
||
errCh <- fmt.Errorf("创建请求失败: %v", err)
|
||
return
|
||
}
|
||
|
||
resp, err := noTimeoutClient.Do(req)
|
||
if err != nil {
|
||
errCh <- fmt.Errorf("发送流请求失败: %v", err)
|
||
return
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
errCh <- fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode)
|
||
return
|
||
}
|
||
|
||
// 创建事件扫描器
|
||
scanner := bufio.NewScanner(resp.Body)
|
||
|
||
// 处理Server-Sent Events
|
||
for scanner.Scan() {
|
||
line := scanner.Text()
|
||
|
||
// 跳过空行
|
||
if line == "" {
|
||
continue
|
||
}
|
||
|
||
// 处理数据行
|
||
if strings.HasPrefix(line, "data: ") {
|
||
data := line[6:] // 去掉 "data: " 前缀
|
||
|
||
// 解析更新数据
|
||
var update model.StreamUpdate
|
||
if err := json.Unmarshal([]byte(data), &update); err != nil {
|
||
c.logger.Warnf("解析更新数据失败: %v", err)
|
||
continue
|
||
}
|
||
|
||
// 处理不同类型的更新
|
||
switch update.Type {
|
||
case "status":
|
||
c.logger.Infof("测试状态: 进度: %.2f%%, 阶段: %s",
|
||
update.Progress, update.CurrentPhase)
|
||
case "error":
|
||
c.logger.Errorf("测试错误: %s", update.Message)
|
||
case "completion":
|
||
c.logger.Infof("测试完成: %s", update.Message)
|
||
return
|
||
case "integrity":
|
||
if info, ok := update.Data.(map[string]interface{}); ok {
|
||
c.logger.Infof("数据完整性: 可用块: %.0f, 损坏块: %.0f, 丢失块: %.0f, 数据丢失: %.2f MB",
|
||
info["available_blocks"], info["corrupted_blocks"], info["missing_blocks"], info["data_loss_mb"])
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
if err := scanner.Err(); err != nil {
|
||
// 如果是客户端主动断开连接,不报错
|
||
if !strings.Contains(err.Error(), "use of closed network connection") {
|
||
errCh <- fmt.Errorf("读取流数据失败: %v", err)
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 等待流结束或出错
|
||
select {
|
||
case err := <-errCh:
|
||
// 如果流模式失败,回退到轮询模式
|
||
c.logger.Warnf("流式监控失败: %v, 回退到轮询模式", err)
|
||
return c.pollTestStatus(testID)
|
||
case <-doneCh:
|
||
c.logger.Info("流式监控完成")
|
||
return nil
|
||
}
|
||
}
|
||
|
||
// pollTestStatus 使用轮询方式监控测试状态
|
||
func (c *Client) pollTestStatus(testID string) error {
|
||
c.logger.Infof("使用轮询监控测试状态: %s", testID)
|
||
|
||
for {
|
||
// 获取测试状态
|
||
url := fmt.Sprintf("http://%s/status?test_id=%s", c.serverAddr, testID)
|
||
resp, err := c.httpClient.Get(url)
|
||
if err != nil {
|
||
c.logger.Warnf("获取测试状态失败: %v", err)
|
||
time.Sleep(2 * time.Second)
|
||
continue
|
||
}
|
||
|
||
// 检查响应状态
|
||
if resp.StatusCode == http.StatusNotFound {
|
||
c.logger.Infof("测试 %s 已完成", testID)
|
||
resp.Body.Close()
|
||
break
|
||
}
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
resp.Body.Close()
|
||
c.logger.Warnf("服务器返回错误状态码: %d", resp.StatusCode)
|
||
time.Sleep(2 * time.Second)
|
||
continue
|
||
}
|
||
|
||
// 解析响应
|
||
var status model.TestStatus
|
||
if err := json.NewDecoder(resp.Body).Decode(&status); err != nil {
|
||
resp.Body.Close()
|
||
c.logger.Warnf("解析响应失败: %v", err)
|
||
time.Sleep(2 * time.Second)
|
||
continue
|
||
}
|
||
resp.Body.Close()
|
||
|
||
// 显示测试状态
|
||
c.logger.Infof("测试状态: %s, 进度: %.2f%%, 阶段: %s",
|
||
status.Status, status.Progress, status.CurrentPhase)
|
||
|
||
// 检查测试是否结束
|
||
if status.Status == "completed" || status.Status == "failed" || status.Status == "aborted" {
|
||
c.logger.Infof("测试 %s %s", testID, status.Status)
|
||
break
|
||
}
|
||
|
||
// 等待一段时间再次检查
|
||
time.Sleep(1 * time.Second)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// CheckServerHealth 检查服务器健康状态
|
||
func (c *Client) CheckServerHealth() error {
|
||
c.logger.Info("检查服务器健康状态")
|
||
|
||
url := fmt.Sprintf("http://%s/health", c.serverAddr)
|
||
resp, err := c.httpClient.Get(url)
|
||
if err != nil {
|
||
return fmt.Errorf("连接服务器失败: %v", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
return fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode)
|
||
}
|
||
|
||
var health model.HealthStatus
|
||
if err := json.NewDecoder(resp.Body).Decode(&health); err != nil {
|
||
return fmt.Errorf("解析响应失败: %v", err)
|
||
}
|
||
|
||
c.logger.Infof("服务器状态: %s, 消息: %s", health.Status, health.Message)
|
||
return nil
|
||
}
|
||
|
||
// RunAllTests 运行所有测试
|
||
func (c *Client) RunAllTests(concurrent bool) error {
|
||
c.logger.Info("运行所有测试")
|
||
|
||
tests := c.config.Test.EnabledTests
|
||
|
||
if concurrent {
|
||
c.logger.Info("并发执行所有测试")
|
||
errCh := make(chan error, len(tests))
|
||
|
||
for _, test := range tests {
|
||
go func(t string) {
|
||
errCh <- c.RunTest(t, dataSizeMB, blockSize)
|
||
}(test)
|
||
}
|
||
|
||
// 等待所有测试完成
|
||
for range tests {
|
||
if err := <-errCh; err != nil {
|
||
c.logger.Errorf("测试失败: %v", err)
|
||
}
|
||
}
|
||
} else {
|
||
c.logger.Info("顺序执行所有测试")
|
||
for _, test := range tests {
|
||
if err := c.RunTest(test, dataSizeMB, blockSize); err != nil {
|
||
c.logger.Errorf("测试 %s 失败: %v", test, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// CheckDataIntegrity 检查断电后的数据完整性
|
||
func (c *Client) CheckDataIntegrity(testID string) error {
|
||
c.logger.Info("检查数据完整性")
|
||
|
||
url := fmt.Sprintf("http://%s/integrity?test_id=%s", c.serverAddr, testID)
|
||
resp, err := c.httpClient.Get(url)
|
||
if err != nil {
|
||
return fmt.Errorf("获取数据完整性信息失败: %v", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
return fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode)
|
||
}
|
||
|
||
var integrity model.IntegrityInfo
|
||
if err := json.NewDecoder(resp.Body).Decode(&integrity); err != nil {
|
||
return fmt.Errorf("解析响应失败: %v", err)
|
||
}
|
||
|
||
c.logger.Infof("数据完整性检查结果:")
|
||
c.logger.Infof(" 测试ID: %s", integrity.TestID)
|
||
c.logger.Infof(" 检查时间: %s", integrity.CheckTime.Format("2006-01-02 15:04:05"))
|
||
c.logger.Infof(" 总块数: %d", integrity.TotalBlocks)
|
||
c.logger.Infof(" 可用块数: %d (%.2f%%)", integrity.AvailableBlocks, float64(integrity.AvailableBlocks)/float64(integrity.TotalBlocks)*100)
|
||
c.logger.Infof(" 损坏块数: %d (%.2f%%)", integrity.CorruptedBlocks, float64(integrity.CorruptedBlocks)/float64(integrity.TotalBlocks)*100)
|
||
c.logger.Infof(" 丢失块数: %d (%.2f%%)", integrity.MissingBlocks, float64(integrity.MissingBlocks)/float64(integrity.TotalBlocks)*100)
|
||
c.logger.Infof(" 数据丢失: %.2f MB", integrity.DataLossMB)
|
||
c.logger.Infof(" 恢复成功: %v", integrity.RecoverySuccess)
|
||
|
||
return nil
|
||
}
|
||
|
||
// RequestRecoveryTest 请求执行恢复测试
|
||
func (c *Client) RequestRecoveryTest(testType string) error {
|
||
c.logger.Infof("请求恢复测试: %s", testType)
|
||
|
||
// 准备请求数据
|
||
req := struct {
|
||
TestType string `json:"test_type"`
|
||
TestDir string `json:"test_dir,omitempty"`
|
||
}{
|
||
TestType: testType,
|
||
}
|
||
|
||
// 序列化请求数据
|
||
reqData, err := json.Marshal(req)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化请求数据失败: %v", err)
|
||
}
|
||
|
||
// 发送请求
|
||
url := fmt.Sprintf("http://%s/recovery", c.serverAddr)
|
||
resp, err := c.httpClient.Post(url, "application/json", bytes.NewBuffer(reqData))
|
||
if err != nil {
|
||
return fmt.Errorf("发送请求失败: %v", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
// 检查响应状态
|
||
if resp.StatusCode != http.StatusAccepted {
|
||
return fmt.Errorf("服务器返回错误状态码: %d", resp.StatusCode)
|
||
}
|
||
|
||
// 解析响应
|
||
var testResp model.TestResponse
|
||
if err := json.NewDecoder(resp.Body).Decode(&testResp); err != nil {
|
||
return fmt.Errorf("解析响应失败: %v", err)
|
||
}
|
||
|
||
c.logger.Infof("恢复测试请求已接受,RequestID: %s", testResp.RequestID)
|
||
|
||
// 监控测试状态
|
||
return c.MonitorTestStatus(testResp.RequestID)
|
||
}
|
||
|
||
func main() {
|
||
flag.Parse()
|
||
|
||
// 初始化日志级别
|
||
var level logrus.Level
|
||
switch logLevel {
|
||
case "debug":
|
||
level = logrus.DebugLevel
|
||
case "info":
|
||
level = logrus.InfoLevel
|
||
case "warn":
|
||
level = logrus.WarnLevel
|
||
case "error":
|
||
level = logrus.ErrorLevel
|
||
default:
|
||
level = logrus.InfoLevel
|
||
}
|
||
|
||
// 配置日志
|
||
logger := logrus.New()
|
||
logger.SetLevel(level)
|
||
logger.SetFormatter(&logrus.TextFormatter{
|
||
FullTimestamp: true,
|
||
TimestampFormat: "2006-01-02 15:04:05",
|
||
})
|
||
|
||
// 添加日志文件输出
|
||
logFile, err := os.OpenFile("client.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
|
||
if err == nil {
|
||
multiWriter := io.MultiWriter(os.Stdout, logFile)
|
||
logger.SetOutput(multiWriter)
|
||
} else {
|
||
logger.Warnf("无法打开日志文件: %v", err)
|
||
}
|
||
|
||
// 加载配置
|
||
cfg, err := config.Load(configFile)
|
||
if err != nil {
|
||
logger.Fatalf("加载配置失败: %v", err)
|
||
}
|
||
|
||
// 如果指定了timeout,覆盖配置
|
||
if timeout > 0 {
|
||
cfg.Client.TimeoutSec = timeout
|
||
}
|
||
|
||
// 创建客户端
|
||
client := NewClient(cfg, logger, serverAddr)
|
||
|
||
// 检查服务器健康状态
|
||
if err := client.CheckServerHealth(); err != nil {
|
||
logger.Fatalf("服务器健康检查失败: %v", err)
|
||
}
|
||
|
||
// 处理恢复测试
|
||
if recovery {
|
||
if testType == "" {
|
||
testType = "power_loss"
|
||
}
|
||
logger.Infof("开始执行恢复测试: %s", testType)
|
||
if err := client.RequestRecoveryTest(testType); err != nil {
|
||
logger.Fatalf("恢复测试失败: %v", err)
|
||
}
|
||
logger.Info("恢复测试完成")
|
||
return
|
||
}
|
||
|
||
// 执行指定的测试
|
||
if testType != "" {
|
||
if testType == "all" {
|
||
if err := client.RunAllTests(concurrent); err != nil {
|
||
logger.Fatalf("执行所有测试失败: %v", err)
|
||
}
|
||
} else {
|
||
if err := client.RunTest(testType, dataSizeMB, blockSize); err != nil {
|
||
logger.Fatalf("执行测试 %s 失败: %v", testType, err)
|
||
}
|
||
}
|
||
} else {
|
||
logger.Fatal("未指定测试类型,请使用 -test 参数")
|
||
}
|
||
|
||
logger.Info("所有测试完成")
|
||
}
|