测试动态 / 质量专栏 / 软件压力测试工具Gatling虚拟用户注入策略详解:rampUsers、atOnceUsers、constantUsersPerSec与分层人口模型
软件压力测试工具Gatling虚拟用户注入策略详解:rampUsers、atOnceUsers、constantUsersPerSec与分层人口模型
2025-11-24 作者:cwb 浏览次数:1

Gatling虚拟用户注入方式详解:rampUsers、atOnceUsers、constantUsersPerSec和分层人口模型


1. Gatling负载注入架构和主要概念

注入策略执行引擎

Gatling的虚拟用户注入建立在响应式流控制架构上,通过用户注入器和场景调度器的协同工作实现精确的负载控制。


Gatling注入策略内部执行流程

Injection Profile → User Stream Generator → Scenario Scheduler → Virtual User Lifecycle


虚拟用户生命周期状态


scala

trait UserState {

  // 用户状态转换

  case object Waiting extends UserState

  case object Starting extends UserState  

  case object Running extends UserState

  case object Finished extends UserState

  case object Failed extends UserState

}


2. 基础注入深度解析

rampUsers-线性递增负载


scala

class RampUsersInjectionSimulation extends Simulation {

  

  val httpProtocol = http.baseUrl("https://api.example.com")

  

  val scn = scenario("Ramp Users Load Test")

    .exec(http("API Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  // 基础rampUsers用法

  setUp(

    scn.inject(

      rampUsers(100).during(60) // 60秒内线性增加到100用户

    )

  ).protocols(httpProtocol)

  

  // 高级rampUsers配置

  val advancedRampSetup = setUp(

    scn.inject(

      nothingFor(10), // 先等待10秒

      rampUsers(50).during(30), // 30秒内增加到50用户

      nothingFor(20), // 保持50用户20秒

      rampUsers(100).during(45) // 45秒内再增加到100用户

    )

  )

  

  // rampUsers的数学原理

  object RampUsersMath {

    /**

     * rampUsers的线性增长函数

     * @param totalUsers 总用户数

     * @param duration 持续时间(秒)

     * @return 每秒应该启动的用户数序列

     */

    def calculateRampInjection(totalUsers: Int, duration: Int): Seq[Double] = {

      val usersPerSecond = totalUsers.toDouble / duration

      (1 to duration).map(_ => usersPerSecond)

    }

    

    // 实际每秒启动用户数会受系统性能影响

    val expectedRamp = calculateRampInjection(100, 60)

    // 结果: 每秒约启动1.67个用户

  }

}

atOnceUsers-瞬时并发冲击


scala

class AtOnceUsersInjectionSimulation extends Simulation {

  

  val spikeScenario = scenario("Instant User Spike")

    .exec(http("Spike Request")

      .get("/stress-endpoint")

      .check(status.is(200)))

  

  // 基础瞬时并发测试

  setUp(

    spikeScenario.inject(

      atOnceUsers(100) // 立即同时启动100个用户

    )

  ).protocols(httpProtocol)

  

  // 多阶段瞬时冲击

  val multiPhaseSpike = setUp(

    spikeScenario.inject(

      atOnceUsers(50),  // 第一阶段:50用户瞬时冲击

      nothingFor(30),   // 观察30秒

      atOnceUsers(100), // 第二阶段:100用户瞬时冲击  

      nothingFor(30),   // 观察30秒

      atOnceUsers(200)  // 第三阶段:200用户瞬时冲击

    )

  )

  

  // atOnceUsers的技术实现细节

  object AtOnceTechnicalDetails {

    /**

     * atOnceUsers的执行时序

     * 所有用户在同一时刻开始执行,但实际启动时间会有微小差异

     */

    def executeAtOnceUsers(userCount: Int): Unit = {

      // Gatling使用Akka Actor系统并行启动用户

      val userActors = (1 to userCount).map { userId =>

        system.actorOf(UserActor.props, s"user-$userId")

      }

      

      // 所有Actor同时收到启动消息

      userActors.foreach { actor =>

        actor ! StartScenario

      }

    }

  }

  

  // 应用场景:系统容量测试

  val capacityTesting = scenario("System Capacity Test")

    .exec(

      atOnceUsers(500), // 测试系统瞬时处理能力

      atOnceUsers(1000) // 逐步增加找到系统极限

    )

}


constantUsersPerSec-恒定压力负载


scala

class ConstantUsersPerSecSimulation extends Simulation {

  

  val steadyLoadScenario = scenario("Constant Load")

    .exec(http("Steady Request")

      .get("/api")

      .check(status.is(200)))

  

  // 基础恒定负载

  setUp(

    steadyLoadScenario.inject(

      constantUsersPerSec(10).during(300) // 5分钟内保持每秒10用户

    )

  ).protocols(httpProtocol)

  

  // 多级恒定负载

  val multiLevelConstant = setUp(

    steadyLoadScenario.inject(

      constantUsersPerSec(5).during(60),   // 第1分钟:5用户/秒

      constantUsersPerSec(10).during(120), // 第2-3分钟:10用户/秒  

      constantUsersPerSec(15).during(120), // 第4-5分钟:15用户/秒

      constantUsersPerSec(5).during(60)    // 第6分钟:降回5用户/秒

    )

  )

  

  // constantUsersPerSec的调度算法

  object ConstantUsersScheduler {

    /**

     * 恒定用户速率调度算法

     * 使用令牌桶算法保证稳定的用户注入速率

     */

    class ConstantRateScheduler(usersPerSecond: Double, duration: Int) {

      private val tokenBucket = new TokenBucket(usersPerSecond)

      

      def scheduleUsers(): Seq[Double] = {

        val totalUsers = (usersPerSecond * duration).toInt

        val interval = 1.0 / usersPerSecond

        

        (0 until totalUsers).map { userIndex =>

          userIndex * interval

        }

      }

    }

    

    // 实际用户启动时间计算

    val schedule = new ConstantRateScheduler(10, 60)

    val userStartTimes = schedule.scheduleUsers()

    // 用户将在0.0s, 0.1s, 0.2s, ... 59.9s时刻启动

  }

  

  // 高级配置:随机化思考时间

  val realisticConstantLoad = scenario("Realistic Constant Load")

    .during(300) {

      exec(http("API Call")

        .get("/endpoint"))

      .pause(1, 5) // 1-5秒随机思考时间

    }

    .inject(

      constantUsersPerSec(20).during(300) // 考虑思考时间后的实际并发

    )

}


3. 高级注入策略和复合模式

rampUsersPerSec-速率线性递增


scala

class RampUsersPerSecSimulation extends Simulation {

  

  val rampRateScenario = scenario("Ramping Rate Load")

    .exec(http("Ramping Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  // 基础速率递增

  setUp(

    rampRateScenario.inject(

      rampUsersPerSec(1).to(10).during(60) // 1分钟内从1用户/秒增加到10用户/秒

    )

  ).protocols(httpProtocol)

  

  // 多段速率递增

  val multiStageRamp = setUp(

    rampRateScenario.inject(

      rampUsersPerSec(1).to(5).during(30),   // 0-30秒: 1→5用户/秒

      rampUsersPerSec(5).to(20).during(60),  // 30-90秒: 5→20用户/秒

      rampUsersPerSec(20).to(5).during(30)   // 90-120秒: 20→5用户/秒

    )

  )

  

  // rampUsersPerSec的数学建模

  object RampRateMathematics {

    /**

     * 计算线性递增速率曲线

     */

    case class RampRate(from: Double, to: Double, duration: Int) {

      val slope: Double = (to-from) / duration

      

      def rateAtTime(t: Int): Double = {

        require(t >= 0 && t <= duration, s"Time $t out of range [0, $duration]")

        from + slope * t

      }

      

      def totalUsers: Double = {

        // 计算曲线下面积(积分)

        (from + to) * duration / 2.0

      }

    }

    

    val ramp = RampRate(1, 10, 60)

    println(s"总用户数: ${ramp.totalUsers}") // 330用户

    println(s"30秒时的速率: ${ramp.rateAtTime(30)}") // 5.5用户/秒

  }

}


heavisideUsers-S形增长曲线


scala

class HeavisideUsersSimulation extends Simulation {

  

  val smoothScenario = scenario("Smooth Load Increase")

    .exec(http("Smooth Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  // Heaviside函数注入(S形曲线)

  setUp(

    smoothScenario.inject(

      heavisideUsers(1000).during(60) // 60秒内S形曲线增长到1000用户

    )

  ).protocols(httpProtocol)

  

  // Heaviside函数的数学原理

  object HeavisideMathematics {

    /**

     * Heaviside函数的变体-平滑S形曲线

     * 避免线性增长的尖锐拐点,更符合真实用户增长模式

     */

    def smoothHeaviside(t: Double, totalTime: Double): Double = {

      // 使用Sigmoid函数实现平滑过渡

      1.0 / (1.0 + math.exp(-10.0 * (t / totalTime-0.5)))

    }

    

    def calculateHeavisideInjection(totalUsers: Int, duration: Int): Seq[Int] = {

      (0 until duration).map { t =>

        val progress = smoothHeaviside(t, duration)

        (progress * totalUsers).toInt

      }

    }

    

    // 生成用户注入计划

    val injectionPlan = calculateHeavisideInjection(1000, 60)

    // 开始和结束阶段增长缓慢,中间阶段增长迅速

  }

  

  // 应用场景:模拟真实用户登录模式

  val realisticLoginPattern = scenario("Realistic Login Pattern")

    .inject(

      heavisideUsers(500).during(300) // 5分钟内模拟用户陆续登录

    )

}


4. 分层人口模型和用户行为模拟

多用户角色分层


scala

class LayeredPopulationSimulation extends Simulation {

  

  // 定义不同用户角色的行为模式

  val casualUsers = scenario("Casual Users")

    .exec(

      pause(5, 10), // 长时间思考

      http("Browse Action")

        .get("/browse")

        .check(status.is(200)),

      pause(2, 5)

    )

  

  val powerUsers = scenario("Power Users")  

    .exec(

      pause(1, 3), // 短时间思考

      http("Complex Action")

        .get("/complex-operation")

        .check(status.is(200)),

      pause(1, 2)

    )

  

  val adminUsers = scenario("Administrator Users")

    .exec(

      http("Admin Action")

        .get("/admin")

        .check(status.is(200)),

      pause(3, 8)

    )

  

  // 分层人口注入策略

  val layeredInjection = setUp(

    // 普通用户:70%-缓慢增长

    casualUsers.inject(

      rampUsers(70).during(120)

    ),

    

    // 高级用户:25%-中等速度增长  

    powerUsers.inject(

      rampUsers(25).during(90)

    ),

    

    // 管理员:5%-快速启动

    adminUsers.inject(

      atOnceUsers(5)

    )

  ).protocols(httpProtocol)

  

  // 高级分层策略:基于时间的动态比例

  object DynamicPopulationModel {

    

    case class UserSegment(

      name: String,

      scenario: ScenarioBuilder, 

      percentage: Double,

      behavior: String

    )

    

    val segments = Seq(

      UserSegment("Mobile Users", casualUsers, 0.4, "rampUsers(40).during(60)"),

      UserSegment("Desktop Users", powerUsers, 0.5, "rampUsers(50).during(45)"),

      UserSegment("Tablet Users", adminUsers, 0.1, "atOnceUsers(10)")

    )

    

    def createDynamicInjection(): SetUp = {

      val injections = segments.map { segment =>

        segment.scenario.inject(

          rampUsers((segment.percentage * 100).toInt).during(60)

        )

      }

      

      setUp(injections: _*).protocols(httpProtocol)

    }

  }

}


地理分布分层模型


scala

class GeographicDistributionSimulation extends Simulation {

  

  // 模拟不同地理区域的用户行为

  val northAmericaUsers = scenario("North America Users")

    .exec(

      http("NA Request")

        .get("/us-endpoint")

        .header("X-Region", "north-america")

        .check(status.is(200))

    )

  

  val europeUsers = scenario("Europe Users")

    .exec(

      http("EU Request")  

        .get("/eu-endpoint")

        .header("X-Region", "europe")

        .check(status.is(200))

    )

  

  val asiaPacificUsers = scenario("Asia Pacific Users")

    .exec(

      http("APAC Request")

        .get("/apac-endpoint") 

        .header("X-Region", "asia-pacific")

        .check(status.is(200))

    )

  

  // 基于地理时区的分层注入

  val geographicInjection = setUp(

    // 亚洲用户-亚洲工作时间

    asiaPacificUsers.inject(

      rampUsersPerSec(0).to(20).during(10800), // 3小时递增 (08:00-11:00)

      constantUsersPerSec(20).during(14400),   // 4小时稳定 (11:00-15:00)

      rampUsersPerSec(20).to(0).during(10800)  // 3小时递减 (15:00-18:00)

    ),

    

    // 欧洲用户-欧洲工作时间  

    europeUsers.inject(

      rampUsersPerSec(0).to(15).during(10800), // 3小时递增 (08:00-11:00 CET)

      constantUsersPerSec(15).during(14400),   // 4小时稳定 (11:00-15:00 CET)

      rampUsersPerSec(15).to(0).during(10800)  // 3小时递减 (15:00-18:00 CET)

    ),

    

    // 北美用户-北美工作时间

    northAmericaUsers.inject(

      rampUsersPerSec(0).to(25).during(10800), // 3小时递增 (08:00-11:00 EST)

      constantUsersPerSec(25).during(14400),   // 4小时稳定 (11:00-15:00 EST)  

      rampUsersPerSec(25).to(0).during(10800)  // 3小时递减 (15:00-18:00 EST)

    )

  ).protocols(httpProtocol)

}


5. 复合注入策略和高级模式

复合注入


scala

class CompositeInjectionSimulation extends Simulation {

  

  val complexScenario = scenario("Complex Load Pattern")

    .exec(http("Composite Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  // 复杂复合注入策略

  val compositeSetup = setUp(

    complexScenario.inject(

      // 阶段1: 基线负载

      constantUsersPerSec(5).during(60),

      

      // 阶段2: 线性增长

      rampUsersPerSec(5).to(20).during(120),

      

      // 阶段3: 峰值冲击

      atOnceUsers(50),

      nothingFor(10),

      atOnceUsers(50),

      

      // 阶段4: 稳定高压

      constantUsersPerSec(25).during(180),

      

      // 阶段5: 缓慢下降

      rampUsersPerSec(25).to(5).during(120),

      

      // 阶段6: 波动负载

      constantUsersPerSec(5).during(30),

      atOnceUsers(25),

      constantUsersPerSec(5).during(30)

    )

  ).protocols(httpProtocol)

  

  // 基于业务指标的智能注入

  object BusinessDrivenInjection {

    

    case class BusinessHour(hour: Int, expectedUsers: Int)

    

    val businessHours = Seq(

      BusinessHour(9, 100),  // 9AM: 100用户

      BusinessHour(10, 200), // 10AM: 200用户  

      BusinessHour(11, 300), // 11AM: 300用户

      BusinessHour(12, 250), // 12PM: 250用户

      BusinessHour(13, 280), // 1PM: 280用户

      BusinessHour(14, 320), // 2PM: 320用户

      BusinessHour(15, 300), // 3PM: 300用户

      BusinessHour(16, 220), // 4PM: 220用户

      BusinessHour(17, 180)  // 5PM: 180用户

    )

    

    def createBusinessInjection(): Seq[InjectionStep] = {

      businessHours.flatMap { hour =>

        Seq(

          rampUsersPerSec(0).to(hour.expectedUsers / 60.0).during(600), // 10分钟递增

          constantUsersPerSec(hour.expectedUsers / 60.0).during(3000)   // 50分钟稳定

        )

      }

    }

  }

}


自定义注入开发


scala

// 自定义正弦波注入策略

class SineWaveInjection(

  minUsers: Double,

  maxUsers: Double, 

  period: Int,

  duration: Int

) extends InjectionStep {

  

  override def chain(chained: Iterator[FiniteDuration]): Iterator[FiniteDuration] = {

    val amplitude = (maxUsers-minUsers) / 2.0

    val center = minUsers + amplitude

    val angularFrequency = 2.0 * Math.PI / period

    

    val userStream = (0 until duration).iterator.flatMap { second =>

      val rate = center + amplitude * Math.sin(angularFrequency * second)

      val usersThisSecond = Math.max(0, Math.round(rate)).toInt

      

      if (usersThisSecond > 0) {

        val interval = 1.0 / usersThisSecond

        (0 until usersThisSecond).map { userIndex =>

          FiniteDuration((second + userIndex * interval).toLong, TimeUnit.SECONDS)

        }

      } else {

        Seq.empty

      }

    }

    

    userStream

  }

}


// 使用自定义正弦波注入

val sineWaveScenario = scenario("Sine Wave Load")

  .inject(

    new SineWaveInjection(

      minUsers = 10,

      maxUsers = 50, 

      period = 300, // 5分钟周期

      duration = 1800 // 30分钟测试

    )

  )


6. 性能优化

注入性能调优


scala

object InjectionOptimization {

  

  // 基于系统资源的自适应注入

  def adaptiveInjection(maxSystemLoad: Double): InjectionStep = {

    constantUsersPerSec(10).during(60) // 基础负载

      .andThen(

        rampUsersPerSec(10).to(100).during(300) // 渐进增加

          .throttle(

            reachRps(100).in(60),

            holdFor(300) 

          )

      )

  }

  

  // 内存优化的注入策略

  def memoryOptimizedInjection(totalUsers: Int, duration: Int): InjectionStep = {

    // 使用分批次注入避免内存溢出

    val batchSize = 1000

    val batches = (totalUsers + batchSize-1) / batchSize

    

    (0 until batches).foldLeft(nothingFor(0)) { (acc, batch) =>

      val batchUsers = Math.min(batchSize, totalUsers-batch * batchSize)

      acc.andThen(

        rampUsers(batchUsers).during(duration / batches)

      )

    }

  }

}



监控和断言配置


scala

class InjectionMonitoringSimulation extends Simulation {

  

  val monitoredScenario = scenario("Monitored Load Test")

    .exec(http("Monitored Request")

      .get("/endpoint")

      .check(status.is(200)))

  

  setUp(

    monitoredScenario.inject(

      rampUsers(100).during(60)

    )

  ).protocols(httpProtocol)

  .assertions(

    // 注入性能断言

    global.allRequests.percent.is(100),

    global.responseTime.percentile4.lt(800),

    

    // 用户注入速率断言

    details("Monitored Load Test").requestsPerSec.between(1, 2),

    

    // 并发用户数断言  

    details("Monitored Load Test").concurrentUsers.max.lt(110)

  )

  

  // 实时监控配置

  val monitoringConfig = setUp(

    monitoredScenario.inject(

      rampUsersPerSec(1).to(10).during(60)

    )

  ).maxDuration(300)

   .assertions(

     global.successfulRequests.percent.gt(99.5)

   )

}


这种专业的Gatling虚拟用户注入实现了从基础线性增长到复杂分层人口模型,保证了性能测试负载模式的控制和真实用户行为模拟。

文章标签: 测试工具 测评网站并发压力 并发压力测试 压力测试 软件测试
热门标签 换一换
数据库测试 H5应用测试 软件质检机构 第三方质检机构 第三方权威质检机构 信创测评机构 信息技术应用创新测评机构 信创测试 软件信创测试 软件系统第三方测试 软件系统测试 软件测试标准 工业软件测试 软件应用性能测试 应用性能测试 可用性测试 软件可用性测试 软件可靠性测试 可靠性测试 系统应用测试 软件系统应用测试 软件应用测试 软件负载测试 API自动化测试 软件结题测试 软件结题测试报告 软件登记测试 软件登记测试报告 软件测试中心 第三方软件测试中心 应用测试 第三方应用测试 软件测试需求 软件检测报告定制 软件测试外包公司 第三方软件检测报告厂家 CMA资质 软件产品登记测试 软件产品登记 软件登记 CNAS资质 cma检测范围 cma检测报告 软件评审 软件项目评审 软件项目测试报告书 软件项目验收 软件质量测试报告书 软件项目验收测试 软件验收测试 软件测试机构 软件检验 软件检验检测 WEB应用测试 API接口测试 接口性能测试 第三方系统测试 第三方网站系统测试 数据库系统检测 第三方数据库检测 第三方数据库系统检测 第三方软件评估 课题认证 第三方课题认证 小程序测试 app测试 区块链业务逻辑 智能合约代码安全 区块链 区块链智能合约 软件数据库测试 第三方数据库测试 第三方软件数据库测试 软件第三方测试 软件第三方测试方案 软件测试报告内容 网站测试报告 网站测试总结报告 信息系统测试报告 信息系统评估报告 信息系统测评 语言模型安全 语言模型测试 软件报告书 软件测评报告书 第三方软件测评报告 检测报告厂家 软件检测报告厂家 第三方网站检测 第三方网站测评 第三方网站测试 检测报告 软件检测流程 软件检测报告 第三方软件检测 第三方软件检测机构 第三方检测机构 软件产品确认测试 软件功能性测试 功能性测试 软件崩溃 稳定性测试 API测试 API安全测试 网站测试测评 敏感数据泄露测试 敏感数据泄露 敏感数据泄露测试防护 课题软件交付 科研经费申请 软件网站系统竞赛 竞赛CMA资质补办通道 中学生软件网站系统CMA资质 大学生软件网站系统CMA资质 科研软件课题cma检测报告 科研软件课题cma检测 国家级科研软件CMA检测 科研软件课题 国家级科研软件 web测评 网站测试 网站测评 第三方软件验收公司 第三方软件验收 软件测试选题 软件测试课题是什么 软件测试课题研究报告 软件科研项目测评报告 软件科研项目测评内容 软件科研项目测评 长沙第三方软件测评中心 长沙第三方软件测评公司 长沙第三方软件测评机构 软件科研结项强制清单 软件课题验收 软件申报课题 数据脱敏 数据脱敏传输规范 远程测试实操指南 远程测试 易用性专业测试 软件易用性 政府企业软件采购验收 OA系统CMA软件测评 ERP系统CMA软件测评 CMA检测报告的法律价值 代码原创性 软件著作登记 软件著作权登记 教育APP备案 教育APP 信息化软件项目测评 信息化软件项目 校园软件项目验收标准 智慧软件项目 智慧校园软件项目 CSRF漏洞自动化测试 漏洞自动化测试 CSRF漏洞 反序列化漏洞测试 反序列化漏洞原理 反序列化漏洞 命令执行 命令注入 漏洞检测 文件上传漏洞 身份验证 出具CMA测试报告 cma资质认证 软件验收流程 软件招标文件 软件开发招标 卓码软件测评 WEB安全测试 漏洞挖掘 身份验证漏洞 测评网站并发压力 测评门户网站 Web软件测评 XSS跨站脚本 XSS跨站 C/S软件测评 B/S软件测评 渗透测试 网站安全 网络安全 WEB安全 并发压力测试 常见系统验收单 CRM系统验收 ERP系统验收 OA系统验收 软件项目招投 软件项目 软件投标 软件招标 软件验收 App兼容性测试 CNAS软件检测 CNAS软件检测资质 软件检测 软件检测排名 软件检测机构排名 Web安全测试 Web安全 Web兼容性测试 兼容性测试 web测试 黑盒测试 白盒测试 负载测试 软件易用性测试 软件测试用例 软件性能测试 科技项目验收测试 首版次软件 软件鉴定测试 软件渗透测试 软件安全测试 第三方软件测试报告 软件第三方测试报告 第三方软件测评机构 湖南软件测评公司 软件测评中心 软件第三方测试机构 软件安全测试报告 第三方软件测试公司 第三方软件测试机构 CMA软件测试 CNAS软件测试 第三方软件测试 移动app测试 软件确认测试 软件测评 第三方软件测评 软件测试公司 软件测试报告 跨浏览器测试 软件更新 行业资讯 软件测评机构 大数据测试 测试环境 网站优化 功能测试 APP测试 软件兼容测试 安全测评 第三方测试 测试工具 软件测试 验收测试 系统测试 测试外包 压力测试 测试平台 bug管理 性能测试 测试报告 测试框架 CNAS认可 CMA认证 自动化测试
专业测试,找专业团队,请联系我们!
咨询软件测试 400-607-0568