Gatling中进行数据库性能关联测试将JDBC协议和HTTP协议集成在同一场景中,建立前端用户压力和后端数据库负载之间的因果关系,精准定位从应用层到数据层的性能瓶颈。
Gatling JDBC协议集成
Gatling JDBC模块允许您直接对数据库进行负载测试,和HTTP请求混合编排,模拟真实业务中应用服务器和数据库的交互。
1. 基础环境配置
在build.sbt或pom.xml中必须显式引入JDBC依赖:
scala
// build.sbt
libraryDependencies += "io.gatling" % "gatling-jdbc" % "3.9.5"
在Scala测试类中导入:
scala
import io.gatling.jdbc.Predef._
import java.sql.PreparedStatement
2. JDBC连接池配置
连接池配置直接决定测试的真实性和数据库压力模式:
scala
val jdbcConfig = jdbc
.url("jdbc:mysql://${DB_HOST}:3306/${DB_NAME}")
.username("${DB_USER}")
.password("${DB_PASSWORD}")
.driver("com.mysql.cj.jdbc.Driver")
// 连接池核心配置(建议和生产环境对齐)
.maximumPoolSize(50) // 最大连接数
.minimumIdle(10) // 最小空闲连接
.connectionTimeout(30000) // 连接超时(ms)
.idleTimeout(600000) // 空闲连接超时
.maxLifetime(1800000) // 连接最大生命周期
.validationQuery("SELECT 1") // 连接健康检查SQL
SQL执行时间监控的实现方式
1. 基础SQL执行和计时
Gatling会自动记录每条SQL的执行时间,但需要写好检查点:
scala
val scn = scenario("数据库性能测试")
.exec(
jdbc("查询用户订单")
.select("SELECT * FROM orders WHERE user_id = ?")
.params("${userId}") // 参数化查询
.check(
// 检查执行结果
jdbcResponseTime.mean.lt(50), // 平均响应时间<50ms
jdbcResponseTime.max.lt(200), // 最大响应时间<200ms
jdbcResponseTime.percentile4.lt(100), // P95<100ms
// 验证数据正确性
jdbcColumn("order_id").count.gt(0)
)
)
2. 监控SQL执行
要实现更详细的监控,需要捕获执行计划:
scala
.exec(
jdbc("复杂查询剖析")
.select("""
EXPLAIN ANALYZE
SELECT o.*, u.username
FROM orders o
JOIN users u ON o.user_id = u.id
WHERE o.created_at > ?
ORDER BY o.total_amount DESC
LIMIT 100
""")
.params("2024-01-01")
.check(
jdbcColumn("Execution Time").findAll.saveAs("execution_plan")
)
)
.exec { session =>
// 解析执行计划中的主要指标
val plan = session("execution_plan").as[Seq[String]]
val planningTime = extractTime(plan, "Planning Time")
val executionTime = extractTime(plan, "Execution Time")
println(s"SQL剖析结果 - 规划时间: ${planningTime}ms, 执行时间: ${executionTime}ms")
session
}
数据库和HTTP请求的关联测试
1. 混合场景设计模拟完整事务
scala
val mixedScenario = scenario("完整事务流程")
.feed(userIds.feeder)
.exec(
http("用户登录API")
.post("/api/login")
.body(StringBody("""{"username":"${username}"}"""))
.check(jsonPath("$.userId").saveAs("loggedInUserId"))
)
.pause(1.second)
// 关键:验证API调用是否触发预期数据库操作
.exec(
jdbc("验证登录记录")
.select("SELECT COUNT(*) as cnt FROM login_log WHERE user_id = ?")
.params("${loggedInUserId}")
.check(jdbcColumn("cnt").is(1)) // 断言恰好一条记录
)
.exec(
http("提交订单API")
.post("/api/order")
.body(ElFileBody("templates/order.json"))
.check(jsonPath("$.orderId").saveAs("apiOrderId"))
)
// 验证订单是否持久化且数据一致
.exec(
jdbc("验证订单数据一致性")
.select("""
SELECT o.status, o.total_amount, oi.item_count
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE o.id = ?
""")
.params("${apiOrderId}")
.check(
jdbcColumn("status").is("PENDING"),
jdbcColumn("total_amount").notNull
)
)
2. 竞态条件和并发问题测试
scala
val concurrencyScenario = scenario("库存扣减并发测试")
.feed(productIds.feeder)
.exec(
jdbc("读取初始库存")
.select("SELECT stock FROM products WHERE id = ?")
.params("${productId}")
.check(jdbcColumn("stock").saveAs("initialStock"))
)
// 模拟100个并发用户同时扣减库存
.exec(
jdbc("并发扣减库存")
.update("""
UPDATE products
SET stock = stock - 1
WHERE id = ? AND stock > 0
""")
.params("${productId}")
.check(jdbcRowsUpdated.is(1)) // 断言只影响一行
)
.exec(
jdbc("验证最终库存")
.select("SELECT stock FROM products WHERE id = ?")
.params("${productId}")
.check(jdbcColumn("stock").is(session => {
val initial = session("initialStock").as[Int]
initial - 1 // 应正好减少1
}))
)
监控诊断高级配置
1. 慢SQL捕获和告警
scala
val slowSqlThreshold = 100 // 定义慢SQL阈值(ms)
val scn = scenario("慢SQL监控")
.exec(
jdbc("潜在慢查询")
.select("""
SELECT * FROM large_table
WHERE created_at BETWEEN ? AND ?
ORDER BY complex_calculation(column)
""")
.params("${startDate}", "${endDate}")
.check(
jdbcResponseTime.max.saveAs("sqlDuration")
)
)
.doIf(session => session("sqlDuration").as[Int] > slowSqlThreshold) {
exec(session => {
val duration = session("sqlDuration").as[Int]
// 触发告警逻辑
println(s"慢SQL告警: 执行时间 ${duration}ms")
// 可集成到外部监控系统
sendToMonitoringSystem(session)
session
})
}
2. 连接池性能监控
scala
.exec(session => {
// 定期采样连接池状态
val jdbcStats = jdbcConfig.connectionPool.getStats
println(s"""
|连接池状态报告:
|活跃连接: ${jdbcStats.getActiveConnections}
|空闲连接: ${jdbcStats.getIdleConnections}
|等待线程: ${jdbcStats.getThreadsAwaitingConnection}
|连接获取平均等待时间: ${jdbcStats.getConnectionTimeout}
""".stripMargin)
// 连接泄漏检测
if (jdbcStats.getActiveConnections > jdbcStats.getMaxConnections * 0.8) {
println("连接池接近饱和,可能存在连接泄漏")
}
session
})
3. 和APM工具集成
通过自定义检查点将数据发送到New Relic、Datadog等APM:
scala
.check(
jdbcResponseTime.max.transform(duration => {
// 发送自定义指标到APM
val tags = Map(
"sql_operation" -> "select_user_orders",
"test_scenario" -> "checkout_flow"
)
apmClient.sendMetric("database.query.duration", duration, tags)
duration
})
)
数据库压力测试示例
scala
class EcommerceDbTest extends Simulation {
val jdbcConfig = jdbc
.url("jdbc:mysql://localhost:3306/ecommerce")
.username("perf_test")
.password("test123")
.maximumPoolSize(30)
val httpProtocol = http.baseUrl("http://localhost:8080")
val dbFeeder = csv("data/product_ids.csv").circular
val orderCheckoutScenario = scenario("数据库密集型下单流程")
.feed(dbFeeder)
.exec(
http("浏览商品")
.get("/api/products/${productId}")
.check(jsonPath("$.price").saveAs("productPrice"))
)
.exec(
jdbc("获取实时库存")
.select("""
SELECT available_stock, warehouse_id
FROM inventory
WHERE product_id = ?
FOR UPDATE NOWAIT
""")
.params("${productId}")
.check(
jdbcColumn("available_stock").gt(0),
jdbcColumn("warehouse_id").saveAs("warehouseId")
)
)
.tryMax(3) { // 库存更新重试机制
exec(
jdbc("原子性扣减库存")
.update("""
UPDATE inventory
SET available_stock = available_stock - 1,
locked_stock = locked_stock + 1
WHERE product_id = ?
AND available_stock > 0
RETURNING updated_rows
""")
.params("${productId}")
.check(jdbcRowsUpdated.is(1))
)
}
.exec(
http("确认下单")
.post("/api/orders")
.body(StringBody(
"""{"productId":${productId},"quantity":1}"""
))
.check(status.is(201))
)
.exec(
jdbc("事务完整性验证")
.select("""
SELECT
(SELECT available_stock FROM inventory WHERE product_id = ?) as final_stock,
(SELECT COUNT(*) FROM orders WHERE product_id = ?) as order_count
FROM dual
""")
.params("${productId}", "${productId}")
.check(
jdbcColumn("final_stock").transform(_ == session("initialStock").as[Int] - 1),
jdbcColumn("order_count").is(1)
)
)
// 注入策略:重点测试数据库并发处理能力
setUp(
orderCheckoutScenario.inject(
rampUsersPerSec(10).to(100).during(5.minutes)
)
).protocols(jdbcConfig, httpProtocol)
.assertions(
global.jdbcResponseTime.percentile4.lt(150), // P95<150ms
jdbcAllRequests.percentile4.lt(200), // 所有SQL的P95<200ms
jdbcFailedRequests.percent.is(0) // 数据库零失败
)
}
指标诊断
数据库指标:
SQL响应时间分布:P50/P95/P99值,识别长尾效应
连接池使用率:活跃连接数/最大连接数,>80%需告警
事务成功率:提交和回滚比例
锁等待时间:特别是FOR UPDATE查询
关联分析:
比较HTTP P95响应时间和对应SQL的P95执行时间
识别N+1查询问题:单个HTTP请求触发的SQL数量
验证数据库操作是否和业务事务边界一致
这种测试方法能精确暴露连接池配置不当、缺失索引、事务隔离问题、锁竞争等数据库层瓶颈,实现真正的全链路性能可见。