成人无码视频,亚洲精品久久久久av无码,午夜精品久久久久久毛片,亚洲 中文字幕 日韩 无码

資訊專欄INFORMATION COLUMN

Dubbo壓測(cè)插件的實(shí)現(xiàn)——基于Gatling

BigTomato / 2702人閱讀

摘要:為了控制壓測(cè)時(shí)的,則需要實(shí)現(xiàn)邏輯。則是獲取屬性并初始化客戶端客戶端配置則提供了設(shè)置泛化調(diào)用入?yún)⒌囊约敖酉聛硪榻B的部分的全鏈路壓測(cè)中,我們都使用校驗(yàn)請(qǐng)求結(jié)果,壓測(cè)插件中,我們也實(shí)現(xiàn)了基于的校驗(yàn)。

Dubbo 壓測(cè)插件已開源,本文涉及代碼詳見gatling-dubbo

Gatling 是一個(gè)開源的基于 Scala、Akka、Netty 實(shí)現(xiàn)的高性能壓測(cè)框架,較之其他基于線程實(shí)現(xiàn)的壓測(cè)框架,Gatling 基于 AKKA Actor 模型實(shí)現(xiàn),請(qǐng)求由事件驅(qū)動(dòng),在系統(tǒng)資源消耗上低于其他壓測(cè)框架(如內(nèi)存、連接池等),使得單臺(tái)施壓機(jī)可以模擬更多的用戶。此外,Gatling 提供了一套簡(jiǎn)單高效的 DSL(領(lǐng)域特定語言)方便我們編排業(yè)務(wù)場(chǎng)景,同時(shí)也具備流量控制、壓力控制的能力并提供了良好的壓測(cè)報(bào)告,所以有贊選擇在 Gatling 基礎(chǔ)上擴(kuò)展分布式能力,開發(fā)了自己的全鏈路壓測(cè)引擎 MAXIM。全鏈路壓測(cè)中我們主要模擬用戶實(shí)際使用場(chǎng)景,使用 HTTP 接口作為壓測(cè)入口,但有贊目前后端服務(wù)中 Dubbo 應(yīng)用比重越來越高,如果可以知道 Dubbo 應(yīng)用單機(jī)水位將對(duì)我們把控系統(tǒng)后端服務(wù)能力大有裨益。基于 Gatling 的優(yōu)勢(shì)和在有贊的使用基礎(chǔ),我們擴(kuò)展 Gatling 開發(fā)了 gatling-dubbo 壓測(cè)插件。

插件主要結(jié)構(gòu)

實(shí)現(xiàn) Dubbo 壓測(cè)插件,需實(shí)現(xiàn)以下四部分內(nèi)容:

Protocol 和 ProtocolBuild

協(xié)議部分,這里主要定義 Dubbo 客戶端相關(guān)內(nèi)容,如協(xié)議、泛化調(diào)用、服務(wù) URL、注冊(cè)中心等內(nèi)容,ProtocolBuild 則為 DSL 使用 Protocol 的輔助類

Action 和 ActionBuild

執(zhí)行部分,這里的作用是發(fā)起 Dubbo 請(qǐng)求,校驗(yàn)請(qǐng)求結(jié)果并記錄日志以便后續(xù)生成壓測(cè)報(bào)告。ActionBuild 則為 DSL 使用 Action 的輔助類

Check 和 CheckBuild

檢查部分,全鏈路壓測(cè)中我們都使用Json Path檢查請(qǐng)求結(jié)果,這里我們實(shí)現(xiàn)了一樣的檢查邏輯。CheckBuild 則為 DSL 使用 Check 的輔助類

DSL

Dubbo 插件的領(lǐng)域特定語言,我們提供了一套簡(jiǎn)單易用的 API 方便編寫 Duboo 壓測(cè)腳本,風(fēng)格上與原生 HTTP DSL 保持一致

Protocol

協(xié)議部分由 5 個(gè)屬性組成,這些屬性將在 Action 初始化 Dubbo 客戶端時(shí)使用,分別是:

protocol

協(xié)議,設(shè)置為dubbo

generic

泛化調(diào)用設(shè)置,Dubbo 壓測(cè)插件使用泛化調(diào)用發(fā)起請(qǐng)求,所以這里設(shè)置為true,有贊優(yōu)化了泛化調(diào)用的性能,為了使用該特性,引入了一個(gè)新值result_no_change(去掉優(yōu)化前泛化調(diào)用的序列化開銷以提升性能)

url

Dubbo 服務(wù)的地址:dubbo://IP地址:端口

registryProtocol

Dubbo 注冊(cè)中心的協(xié)議,設(shè)置為ETCD3

registryAddress

Dubbo 注冊(cè)中心的地址

如果是測(cè)試 Dubbo 單機(jī)水位,則設(shè)置 url,注冊(cè)中心設(shè)置為空;如果是測(cè)試 Dubbo 集群水位,則設(shè)置注冊(cè)中心(目前支持 ETCD3),url 設(shè)置為空。由于目前注冊(cè)中心只支持 ETCD3,插件在 Dubbo 集群上使用缺乏靈活性,所以我們又實(shí)現(xiàn)了客戶端層面的負(fù)載均衡,如此便可拋開特定的注冊(cè)中心來測(cè)試 Dubbo 集群水位。該特性目前正在內(nèi)測(cè)中。

object DubboProtocol {
  val DubboProtocolKey = new ProtocolKey {
    type Protocol = DubboProtocol
    type Components = DubboComponents

    def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[DubboProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]]

    def defaultProtocolValue(configuration: GatlingConfiguration): DubboProtocol = throw new IllegalStateException("Can"t provide a default value for DubboProtocol")

    def newComponents(system: ActorSystem, coreComponents: CoreComponents): DubboProtocol => DubboComponents = {
      dubboProtocol => DubboComponents(dubboProtocol)
    }
  }
}

case class DubboProtocol(
    protocol: String, //dubbo
    generic:  String, //泛化調(diào)用?
    url:      String, //use url or
    registryProtocol: String,  //use registry
    registryAddress:  String   //use registry
) extends Protocol {
  type Components = DubboComponents
}

為了方便 Action 中使用上面這些屬性,我們將其裝進(jìn)了 Gatling 的 ProtocolComponents:

case class DubboComponents(dubboProtocol: DubboProtocol) extends ProtocolComponents {
  def onStart: Option[Session => Session] = None
  def onExit: Option[Session => Unit] = None
}

以上就是關(guān)于 Protocol 的定義。為了能在 DSL 中配置上述 Protocol,我們定義了 DubboProtocolBuilder,包含了 5 個(gè)方法分別設(shè)置 Protocol 的 protocol、generic、url、registryProtocol、registryAddress 5 個(gè)屬性。

object DubboProtocolBuilderBase {
  def protocol(protocol: String) = DubboProtocolBuilderGenericStep(protocol)
}

case class DubboProtocolBuilderGenericStep(protocol: String) {
  def generic(generic: String) = DubboProtocolBuilderUrlStep(protocol, generic)
}

case class DubboProtocolBuilderUrlStep(protocol: String, generic: String) {
  def url(url: String) = DubboProtocolBuilderRegistryProtocolStep(protocol, generic, url)
}

case class DubboProtocolBuilderRegistryProtocolStep(protocol: String, generic: String, url: String) {
  def registryProtocol(registryProtocol: String) = DubboProtocolBuilderRegistryAddressStep(protocol, generic, url, registryProtocol)
}

case class DubboProtocolBuilderRegistryAddressStep(protocol: String, generic: String, url: String, registryProtocol: String) {
  def registryAddress(registryAddress: String) = DubboProtocolBuilder(protocol, generic, url, registryProtocol, registryAddress)
}

case class DubboProtocolBuilder(protocol: String, generic: String, url: String, registryProtocol: String, registryAddress: String) {
  def build = DubboProtocol(
    protocol = protocol,
    generic = generic,
    url = url,
    registryProtocol = registryProtocol,
    registryAddress = registryAddress
  )
}
Action

DubboAction 包含了 Duboo 請(qǐng)求邏輯、請(qǐng)求結(jié)果校驗(yàn)邏輯以及壓力控制邏輯,需要擴(kuò)展 ExitableAction 并實(shí)現(xiàn) execute 方法。

DubboAction 類的域 argTypes、argValues 分別是泛化調(diào)用請(qǐng)求參數(shù)類型和請(qǐng)求參數(shù)值,需為 Expression[] 類型,這樣當(dāng)使用數(shù)據(jù) Feeder 作為壓測(cè)腳本參數(shù)輸入時(shí),可以使用類似 ${args_types}、${args_values}這樣的表達(dá)式從數(shù)據(jù) Feeder 中解析對(duì)應(yīng)字段的值。

execute 方法必須以異步方式執(zhí)行 Dubbo 請(qǐng)求,這樣前一個(gè) Dubbo 請(qǐng)求執(zhí)行后但還未等響應(yīng)返回時(shí)虛擬用戶就可以通過 AKKA Message 立即發(fā)起下一個(gè)請(qǐng)求,如此一個(gè)虛擬用戶可以在很短的時(shí)間內(nèi)構(gòu)造大量請(qǐng)求。請(qǐng)求方式方面,相比于泛化調(diào)用,原生 API 調(diào)用需要客戶端載入 Dubbo 服務(wù)相應(yīng)的 API 包,但有時(shí)候卻拿不到,此外,當(dāng)被測(cè) Dubbo 應(yīng)用多了,客戶端需要載入多個(gè) API 包,所以出于使用上的便利性,Dubbo 壓測(cè)插件使用泛化調(diào)用發(fā)起請(qǐng)求。

異步請(qǐng)求響應(yīng)后會(huì)執(zhí)行 onComplete 方法,校驗(yàn)請(qǐng)求結(jié)果,并根據(jù)校驗(yàn)結(jié)果記錄請(qǐng)求成功或失敗日志,壓測(cè)報(bào)告就是使用這些日志統(tǒng)計(jì)計(jì)算的。

為了控制壓測(cè)時(shí)的 RPS,則需要實(shí)現(xiàn) throttle 邏輯。實(shí)踐中發(fā)現(xiàn),高并發(fā)情況下,泛化調(diào)用性能遠(yuǎn)不如原生 API 調(diào)用性能,且響應(yīng)時(shí)間成倍增長(zhǎng)(如此不能表征 Dubbo 應(yīng)用的真正性能),導(dǎo)致 Dubbo 壓測(cè)插件壓力控制不準(zhǔn),解決辦法是優(yōu)化泛化調(diào)用性能,使之與原生 API 調(diào)用的性能相近,請(qǐng)參考dubbo 泛化調(diào)用性能優(yōu)化

class DubboAction(
    interface:        String,
    method:           String,
    argTypes:         Expression[Array[String]],
    argValues:        Expression[Array[Object]],
    genericService:   GenericService,
    checks:           List[DubboCheck],
    coreComponents:   CoreComponents,
    throttled:        Boolean,
    val objectMapper: ObjectMapper,
    val next:         Action
) extends ExitableAction with NameGen {

  override def statsEngine: StatsEngine = coreComponents.statsEngine

  override def name: String = genName("dubboRequest")

  override def execute(session: Session): Unit = recover(session) {
    argTypes(session) flatMap { argTypesArray =>
      argValues(session) map { argValuesArray =>
        val startTime = System.currentTimeMillis()
        val f = Future {
          try {
            genericService.$invoke(method, argTypes(session).get, argValues(session).get)
          } finally {
          }
        }

        f.onComplete {
          case Success(result) =>
            val endTime = System.currentTimeMillis()
            val resultMap = result.asInstanceOf[JMap[String, Any]]
            val resultJson = objectMapper.writeValueAsString(resultMap)
            val (newSession, error) = Check.check(resultJson, session, checks)
            error match {
              case None =>
                statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("OK"), None, None)
                throttle(newSession(session))
              case Some(Failure(errorMessage)) =>
                statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(errorMessage))
                throttle(newSession(session).markAsFailed)
            }
          case FuFailure(e) =>
            val endTime = System.currentTimeMillis()
            statsEngine.logResponse(session, interface + "." + method, ResponseTimings(startTime, endTime), Status("KO"), None, Some(e.getMessage))
            throttle(session.markAsFailed)
        }
      }
    }
  }

  private def throttle(s: Session): Unit = {
    if (throttled) {
      coreComponents.throttler.throttle(s.scenario, () => next ! s)
    } else {
      next ! s
    }
  }
}

DubboActionBuilder 則是獲取 Protocol 屬性并初始化 Dubbo 客戶端:

case class DubboActionBuilder(interface: String, method: String, argTypes: Expression[Array[String]], argValues: Expression[Array[Object]], checks: List[DubboCheck]) extends ActionBuilder {
  private def components(protocolComponentsRegistry: ProtocolComponentsRegistry): DubboComponents =
    protocolComponentsRegistry.components(DubboProtocol.DubboProtocolKey)

  override def build(ctx: ScenarioContext, next: Action): Action = {
    import ctx._
    val protocol = components(protocolComponentsRegistry).dubboProtocol
    //Dubbo客戶端配置
    val reference = new ReferenceConfig[GenericService]
    val application = new ApplicationConfig
    application.setName("gatling-dubbo")
    reference.setApplication(application)
    reference.setProtocol(protocol.protocol)
    reference.setGeneric(protocol.generic)
    if (protocol.url == "") {
      val registry = new RegistryConfig
      registry.setProtocol(protocol.registryProtocol)
      registry.setAddress(protocol.registryAddress)
      reference.setRegistry(registry)
    } else {
      reference.setUrl(protocol.url)
    }
    reference.setInterface(interface)
    val cache = ReferenceConfigCache.getCache
    val genericService = cache.get(reference)
    val objectMapper: ObjectMapper = new ObjectMapper()
    new DubboAction(interface, method, argTypes, argValues, genericService, checks, coreComponents, throttled, objectMapper, next)
  }
}

LambdaProcessBuilder 則提供了設(shè)置 Dubbo 泛化調(diào)用入?yún)⒌?DSL 以及接下來要介紹的 Check 部分的 DSL

case class DubboProcessBuilder(interface: String, method: String, argTypes: Expression[Array[String]] = _ => Success(Array.empty[String]), argValues: Expression[Array[Object]] = _ => Success(Array.empty[Object]), checks: List[DubboCheck] = Nil) extends DubboCheckSupport {

  def argTypes(argTypes: Expression[Array[String]]): DubboProcessBuilder = copy(argTypes = argTypes)

  def argValues(argValues: Expression[Array[Object]]): DubboProcessBuilder = copy(argValues = argValues)

  def check(dubboChecks: DubboCheck*): DubboProcessBuilder = copy(checks = checks ::: dubboChecks.toList)

  def build(): ActionBuilder = DubboActionBuilder(interface, method, argTypes, argValues, checks)
}
Check

全鏈路壓測(cè)中,我們都使用Json Path校驗(yàn) HTTP 請(qǐng)求結(jié)果,Dubbo 壓測(cè)插件中,我們也實(shí)現(xiàn)了基于Json Path的校驗(yàn)。實(shí)現(xiàn) Check,必須實(shí)現(xiàn) Gatling check 中的 Extender 和 Preparer:

package object dubbo {
  type DubboCheck = Check[String]

  val DubboStringExtender: Extender[DubboCheck, String] =
    (check: DubboCheck) => check

  val DubboStringPreparer: Preparer[String, String] =
    (result: String) => Success(result)
}

基于Json Path的校驗(yàn)邏輯:

trait DubboJsonPathOfType {
  self: DubboJsonPathCheckBuilder[String] =>

  def ofType[X: JsonFilter](implicit extractorFactory: JsonPathExtractorFactory) = new DubboJsonPathCheckBuilder[X](path, jsonParsers)
}

object DubboJsonPathCheckBuilder {
  val CharsParsingThreshold = 200 * 1000

  def preparer(jsonParsers: JsonParsers): Preparer[String, Any] =
    response => {
      if (response.length() > CharsParsingThreshold || jsonParsers.preferJackson)
        jsonParsers.safeParseJackson(response)
      else
        jsonParsers.safeParseBoon(response)
    }

  def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
    new DubboJsonPathCheckBuilder[String](path, jsonParsers) with DubboJsonPathOfType
}

class DubboJsonPathCheckBuilder[X: JsonFilter](
    private[check] val path:        Expression[String],
    private[check] val jsonParsers: JsonParsers
)(implicit extractorFactory: JsonPathExtractorFactory)
  extends DefaultMultipleFindCheckBuilder[DubboCheck, String, Any, X](
    DubboStringExtender,
    DubboJsonPathCheckBuilder.preparer(jsonParsers)
  ) {
  import extractorFactory._

  def findExtractor(occurrence: Int) = path.map(newSingleExtractor[X](_, occurrence))
  def findAllExtractor = path.map(newMultipleExtractor[X])
  def countExtractor = path.map(newCountExtractor)
}

DubboCheckSupport 則提供了設(shè)置 jsonPath 表達(dá)式的 DSL

trait DubboCheckSupport {
  def jsonPath(path: Expression[String])(implicit extractorFactory: JsonPathExtractorFactory, jsonParsers: JsonParsers) =
    DubboJsonPathCheckBuilder.jsonPath(path)
}

Dubbo 壓測(cè)腳本中可以設(shè)置一個(gè)或多個(gè) check 校驗(yàn)請(qǐng)求結(jié)果,使用 DSL check 方法*

DSL

trait AwsDsl提供頂層 DSL。我們還定義了 dubboProtocolBuilder2DubboProtocol、dubboProcessBuilder2ActionBuilder 兩個(gè) Scala 隱式方法,以自動(dòng)構(gòu)造 DubboProtocol 和 ActionBuilder。
此外,泛化調(diào)用中使用的參數(shù)類型為 Java 類型,而我們的壓測(cè)腳本使用 Scala 編寫,所以這里需要做兩種語言間的類型轉(zhuǎn)換,所以我們定義了 transformJsonDubboData 方法

trait DubboDsl extends DubboCheckSupport {
  val Dubbo = DubboProtocolBuilderBase

  def dubbo(interface: String, method: String) = DubboProcessBuilder(interface, method)

  implicit def dubboProtocolBuilder2DubboProtocol(builder: DubboProtocolBuilder): DubboProtocol = builder.build

  implicit def dubboProcessBuilder2ActionBuilder(builder: DubboProcessBuilder): ActionBuilder = builder.build()
  
  def transformJsonDubboData(argTypeName: String, argValueName: String, session: Session): Session = {
    session.set(argTypeName, toArray(session(argTypeName).as[JList[String]]))
      .set(argValueName, toArray(session(argValueName).as[JList[Any]]))
  }

  private def toArray[T:ClassTag](value: JList[T]): Array[T] = {
    value.asScala.toArray
  }
}
object Predef extends DubboDsl
Dubbo 壓測(cè)腳本和數(shù)據(jù) Feeder 示例

壓測(cè)腳本示例:

import io.gatling.core.Predef._
import io.gatling.dubbo.Predef._

import scala.concurrent.duration._

class DubboTest extends Simulation {
  val dubboConfig = Dubbo
    .protocol("dubbo")
    .generic("true")
    //直連某臺(tái)Dubbo機(jī)器,只多帶帶壓測(cè)一臺(tái)機(jī)器的水位
    .url("dubbo://IP地址:端口")
    //或設(shè)置注冊(cè)中心,壓測(cè)該Dubbo應(yīng)用集群的水位,支持ETCD3注冊(cè)中心
    .registryProtocol("")
    .registryAddress("")

  val jsonFileFeeder = jsonFile("data.json").circular  //數(shù)據(jù)Feeder
  val dubboScenario = scenario("load test dubbo")
    .forever("repeated") {
      feed(jsonFileFeeder)
        .exec(session => transformJsonDubboData("args_types1", "args_values1", session))
        .exec(dubbo("com.xxx.xxxService", "methodName")
          .argTypes("${args_types1}")
          .argValues("${args_values1}")
          .check(jsonPath("$.code").is("200"))
        )
    }

  setUp(
    dubboScenario.inject(atOnceUsers(10))
      .throttle(
        reachRps(10) in (1 seconds),
        holdFor(30 seconds))
  ).protocols(dubboConfig)
}

data.json 示例:

[
  {
  "args_types1": ["com.xxx.xxxDTO"],
  "args_values1": [{
    "field1": "111",
    "field2": "222",
    "field3": "333"
  }]
  }
]
Dubbo 壓測(cè)報(bào)告示例


我的系列博客
混沌工程 - 軟件系統(tǒng)高可用、彈性化的必由之路
異步系統(tǒng)的兩種測(cè)試方法

我的其他測(cè)試相關(guān)開源項(xiàng)目
捉蟲記:方便產(chǎn)品、開發(fā)、測(cè)試三方協(xié)同自測(cè)的管理工具

招聘
有贊測(cè)試組在持續(xù)招人中,大量崗位空缺,只要你來,就能幫你點(diǎn)亮全棧開發(fā)技能樹,有意向換工作的同學(xué)可以發(fā)簡(jiǎn)歷到 sunjun【@】youzan.com

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://m.hztianpu.com/yun/11449.html

Failed to recv the data from server completely (SIZE:0/8, REASON:closed)