使用Gatling进行HTTP认证测试:Basic、Digest、OAuth 2.0与JWT令牌流程模拟
1. Gatling认证测试架构设计
认证测试的主要组件
Gatling的认证测试架构建立在异步非阻塞I/O模型之上,通过Session状态管理和协议配置层实现复杂的认证流程。主要组件包括:
HTTP协议配置器:管理基础认证参数和全局HTTP头
Session状态机:维护用户认证状态和令牌生命周期
检查点验证器:验证认证响应和令牌有效性
Feeders数据源:提供动态认证凭据和测试数据
认证流程的状态管理
scala
// Session状态键定义
object AuthSessionKeys {
val ACCESS_TOKEN = "accessToken"
val REFRESH_TOKEN = "refreshToken"
val TOKEN_EXPIRY = "tokenExpiry"
val AUTH_STATE = "authState"
val CSRF_TOKEN = "csrfToken"
}
2. Basic认证深度实现
基础Basic认证配置
scala
import io.gatling.core.Predef._
import io.gatling.http.Predef._
class BasicAuthSimulation extends Simulation {
// Basic认证HTTP协议配置
val httpProtocol = http
.baseUrl("https://api.example.com")
.basicAuth("username", "password")
.acceptHeader("application/json")
.contentTypeHeader("application/json")
// 动态Basic认证场景
val basicAuthScenario = scenario("Dynamic Basic Authentication")
.feed(credentialsFeeder) // 从数据源获取凭据
.exec(http("Basic Auth Request")
.get("/secure-endpoint")
.basicAuth("${username}", "${password}")
.check(status.is(200))
.check(jsonPath("$.authenticated").is("true")))
}
高级Basic认证策略
scala
// 多租户Basic认证测试
val multiTenantBasicAuth = scenario("Multi-tenant Basic Auth")
.feed(tenantCredentialsFeeder)
.exec(session => {
val tenantId = session("tenantId").as[String]
val credentials = s"${session("username").as[String]}:${session("password").as[String]}"
val encodedCredentials = java.util.Base64.getEncoder.encodeToString(credentials.getBytes)
session.set("authHeader", s"Basic $encodedCredentials")
})
.exec(http("Tenant-specific Auth")
.get("/${tenantId}/resources")
.header("Authorization", "${authHeader}")
.check(header("X-Tenant-Id").is("${tenantId}")))
3.Digest认证详细实现
Digest认证流程自动化
scala
class DigestAuthSimulation extends Simulation {
// Digest认证专用配置
val digestProtocol = http
.baseUrl("https://digest.example.com")
.digestAuth("username", "password")
.disableWarmUp // Digest认证需要禁用预热以避免nonce冲突
val digestScenario = scenario("Digest Authentication Flow")
.exec(http("Initial Challenge Request")
.get("/protected")
.check(status.is(401))
.check(header("WWW-Authenticate").saveAs("authChallenge")))
.exec(session => {
// 解析Digest挑战参数
val challenge = session("authChallenge").as[String]
val realm = extractDigestParam(challenge, "realm")
val nonce = extractDigestParam(challenge, "nonce")
// 计算Digest响应
val digestResponse = calculateDigestResponse(
session("username").as[String],
session("password").as[String],
realm, nonce, "GET", "/protected"
)
session.set("digestResponse", digestResponse)
})
.exec(http("Authenticated Request")
.get("/protected")
.header("Authorization", "${digestResponse}")
.check(status.is(200)))
private def extractDigestParam(challenge: String, param: String): String = {
val pattern = s"""$param=\"([^\"]+)\"""".r
pattern.findFirstMatchIn(challenge).map(_.group(1)).getOrElse("")
}
}
4. OAuth 2.0完整流程模拟
OAuth 2.0授权码流程
scala
class OAuth2AuthorizationCodeSimulation extends Simulation {
val oauth2Scenario = scenario("OAuth 2.0 Authorization Code Flow")
.exec(initiateAuthorization)
.exec(handleAuthorizationCallback)
.exec(exchangeCodeForToken)
.exec(refreshAccessToken)
.exec(apiCallsWithToken)
// 步骤1: 初始化授权请求
private def initiateAuthorization =
http("Initiate OAuth Authorization")
.get("${authorizationEndpoint}")
.queryParam("response_type", "code")
.queryParam("client_id", "${clientId}")
.queryParam("redirect_uri", "${redirectUri}")
.queryParam("scope", "${scope}")
.queryParam("state", "${state}")
.check(status.is(302))
.check(header("Location").saveAs("redirectLocation"))
// 步骤2: 处理授权回调(模拟用户同意)
private def handleAuthorizationCallback =
exec(session => {
// 从重定向URL中提取授权码
val location = session("redirectLocation").as[String]
val code = extractAuthorizationCode(location)
session.set("authorizationCode", code)
})
// 步骤3: 交换令牌
private def exchangeCodeForToken =
http("Exchange Code for Token")
.post("${tokenEndpoint}")
.formParam("grant_type", "authorization_code")
.formParam("code", "${authorizationCode}")
.formParam("redirect_uri", "${redirectUri}")
.formParam("client_id", "${clientId}")
.formParam("client_secret", "${clientSecret}")
.check(status.is(200))
.check(jsonPath("$.access_token").saveAs("accessToken"))
.check(jsonPath("$.refresh_token").saveAs("refreshToken"))
.check(jsonPath("$.expires_in").saveAs("tokenExpiry"))
// 步骤4: 令牌刷新
private def refreshAccessToken =
http("Refresh Access Token")
.post("${tokenEndpoint}")
.formParam("grant_type", "refresh_token")
.formParam("refresh_token", "${refreshToken}")
.formParam("client_id", "${clientId}")
.formParam("client_secret", "${clientSecret}")
.check(status.is(200))
.check(jsonPath("$.access_token").saveAs("newAccessToken"))
// 步骤5: 使用令牌调用API
private def apiCallsWithToken =
http("API Call with Bearer Token")
.get("${apiEndpoint}")
.header("Authorization", "Bearer ${newAccessToken}")
.check(status.is(200))
.check(jsonPath("$.user.id").is("${userId}"))
}
OAuth 2.0客户端凭证流程
scala
class OAuth2ClientCredentialsSimulation extends Simulation {
val clientCredentialsScenario = scenario("OAuth 2.0 Client Credentials Flow")
.feed(clientCredentialsFeeder)
.exec(acquireClientCredentialsToken)
.during(300 seconds) {
exec(makeAuthenticatedApiCall)
.pause(1 second, 5 seconds)
}
private def acquireClientCredentialsToken =
http("Get Client Credentials Token")
.post("${tokenEndpoint}")
.formParam("grant_type", "client_credentials")
.formParam("client_id", "${clientId}")
.formParam("client_secret", "${clientSecret}")
.formParam("scope", "${scope}")
.check(status.is(200))
.check(jsonPath("$.access_token").saveAs("accessToken"))
.check(jsonPath("$.expires_in").saveAs("expiresIn"))
.exec(session => {
val expiryTime = System.currentTimeMillis() + session("expiresIn").as[Long] * 1000
session.set("tokenExpiry", expiryTime)
})
private def makeAuthenticatedApiCall =
doIf(session => System.currentTimeMillis() > session("tokenExpiry").as[Long]) {
exec(acquireClientCredentialsToken)
}.exec(
http("Service-to-Service API Call")
.get("${apiBaseUrl}/resources")
.header("Authorization", "Bearer ${accessToken}")
.check(status.in(200, 204))
)
}
5. JWT令牌流程高级实现
JWT生成和验证流程
scala
class JWTFlowSimulation extends Simulation {
val jwtScenario = scenario("JWT Authentication Flow")
.exec(loginAndAcquireJWT)
.exec(verifyJWTStructure)
.exec(makeJWTProtectedCalls)
.exec(handleJWTExpiration)
// JWT登录获取令牌
private def loginAndAcquireJWT =
http("Login to Get JWT")
.post("${authEndpoint}/login")
.body(StringBody("""{"username":"${username}","password":"${password}"}"""))
.check(status.is(200))
.check(jsonPath("$.token").saveAs("jwtToken"))
.check(header("X-Refresh-Token").optional.saveAs("refreshToken"))
// JWT结构验证
private def verifyJWTStructure =
exec(session => {
val jwtToken = session("jwtToken").as[String]
try {
// 解析JWT载荷(不验证签名,仅用于测试)
val parts = jwtToken.split("\\.")
val payload = new String(java.util.Base64.getUrlDecoder.decode(parts(1)))
val claims = ujson.read(payload)
session
.set("jwtExpiry", claims("exp").num.toLong)
.set("jwtSubject", claims("sub").str)
.set("jwtIssuer", claims("iss").str)
} catch {
case e: Exception => session.markAsFailed
}
})
// JWT保护API调用
private def makeJWTProtectedCalls =
during(10 minutes) {
exec(session => {
val currentTime = System.currentTimeMillis() / 1000
val tokenExpiry = session("jwtExpiry").as[Long]
// 在令牌过期前30秒触发刷新
if (tokenExpiry - currentTime < 30) {
session.set("shouldRefresh", true)
} else {
session.set("shouldRefresh", false)
}
})
.doIf("${shouldRefresh}") {
exec(refreshJWTToken)
}
.exec(
http("JWT Protected API")
.get("${apiBaseUrl}/user/${jwtSubject}/profile")
.header("Authorization", "Bearer ${jwtToken}")
.check(status.is(200))
.check(jsonPath("$.user.id").is("${jwtSubject}"))
)
.pause(5 seconds, 15 seconds)
}
// JWT令牌刷新
private def refreshJWTToken =
http("Refresh JWT Token")
.post("${authEndpoint}/refresh")
.header("Authorization", "Bearer ${jwtToken}")
.body(StringBody("""{"refresh_token":"${refreshToken}"}"""))
.check(status.is(200))
.check(jsonPath("$.token").saveAs("jwtToken"))
}
自定义JWT生成器(用于测试)
scala
object JWTGenerator {
def generateTestJWT(username: String, roles: List[String], expiryMinutes: Int = 60): String = {
val header = """{"alg":"HS256","typ":"JWT"}"""
val payload = ujson.Obj(
"sub" -> username,
"iss" -> "test-issuer",
"exp" -> (System.currentTimeMillis() / 1000 + expiryMinutes * 60),
"iat" -> (System.currentTimeMillis() / 1000),
"roles" -> roles
)
val encodedHeader = java.util.Base64.getUrlEncoder.encodeToString(header.getBytes)
val encodedPayload = java.util.Base64.getUrlEncoder.encodeToString(payload.toString().getBytes)
s"$encodedHeader.$encodedPayload.signature" // 注意:这是测试用的无效签名
}
// 在场景中使用自定义JWT
val customJwtScenario = scenario("Custom JWT Testing")
.exec(session => {
val customJWT = generateTestJWT(
session("username").as[String],
List("user", "admin")
)
session.set("customJWT", customJWT)
})
.exec(
http("API with Custom JWT")
.get("${apiBaseUrl}/admin/resources")
.header("Authorization", "Bearer ${customJWT}")
.check(status.in(200, 403)) // 可能因为无效签名返回403
)
}
6. 混合认证策略和性能测试
多认证类型混合负载测试
scala
class MixedAuthLoadTest extends Simulation {
val httpProtocol = http
.baseUrl("https://api.example.com")
.acceptEncodingHeader("gzip, deflate")
.userAgentHeader("Gatling-Auth-Test/1.0")
// 定义不同认证类型的用户比例
val userWeights = Map(
"basic" -> 30, // 30% Basic认证用户
"digest" -> 20, // 20% Digest认证用户
"oauth2" -> 40, // 40% OAuth2用户
"jwt" -> 10 // 10% JWT用户
)
def apply(): SetUp = {
val basicUsers = scenario("Basic Auth Users")
.exec(BasicAuthSimulation.basicAuthScenario)
val digestUsers = scenario("Digest Auth Users")
.exec(DigestAuthSimulation.digestScenario)
val oauth2Users = scenario("OAuth2 Users")
.exec(OAuth2AuthorizationCodeSimulation.oauth2Scenario)
val jwtUsers = scenario("JWT Users")
.exec(JWTFlowSimulation.jwtScenario)
setUp(
basicUsers.inject(rampUsers(userWeights("basic")).during(60)),
digestUsers.inject(rampUsers(userWeights("digest")).during(60)),
oauth2Users.inject(rampUsers(userWeights("oauth2")).during(60)),
jwtUsers.inject(rampUsers(userWeights("jwt")).during(60))
).protocols(httpProtocol)
}
}
认证性能指标和断言
scala
class AuthPerformanceAssertions {
val globalAssertions = Seq(
// 全局性能断言
global.responseTime.percentile4.lt(1000), // 99%响应时间 < 1秒
global.failedRequests.percent.lt(1.0), // 失败率 < 1%
global.successfulRequests.percent.gt(99) // 成功率 > 99%
)
val authSpecificAssertions = Seq(
// 认证特定断言
details("Exchange Code for Token").responseTime.percentile4.lt(2000),
details("Refresh Access Token").responseTime.max.lt(3000),
details("Login to Get JWT").failedRequests.percent.lt(0.5),
// 认证令牌有效性验证
forAll.responseTime.max.lt(5000),
// 并发用户下的认证性能
details("API Call with Bearer Token").requestsPerSec.gt(50)
)
}
7. 高级配置和优化
认证测试环境配置
scala
object AuthTestConfig {
// 环境特定配置
val environments = Map(
"dev" -> Map(
"baseUrl" -> "https://dev-api.example.com",
"authEndpoint" -> "https://dev-auth.example.com",
"tokenEndpoint" -> "https://dev-auth.example.com/oauth/token"
),
"staging" -> Map(
"baseUrl" -> "https://staging-api.example.com",
"authEndpoint" -> "https://staging-auth.example.com",
"tokenEndpoint" -> "https://staging-auth.example.com/oauth/token"
),
"prod" -> Map(
"baseUrl" -> "https://api.example.com",
"authEndpoint" -> "https://auth.example.com",
"tokenEndpoint" -> "https://auth.example.com/oauth/token"
)
)
def getConfig(env: String): Map[String, String] = {
environments.getOrElse(env, environments("dev"))
}
// 认证超时配置
val authTimeouts = http
.requestTimeout(30000)
.connectTimeout(10000)
.maxConnectionsPerHost(100)
}
这种专业的Gatling认证测试实现提供了完整的HTTP认证覆盖,从简单的Basic认证到复杂的OAuth 2.0和JWT流程,保证了在各种认证场景下的性能测试准确性和可靠性。