Spark metrics源码走读

作者 QIFAN 日期 2016-06-10
Spark metrics源码走读

Metric是Apache Spark里用于监测参数的一个package,路径是spark/core/src/main/scala/org/apache/spark/metrics/

大多数的公司都有自己的一套metrics监测系统,Spark,IBM都有,这些都是高度定制化的,别人没法用。像Spark这个其实也是引入了一个外部的包,出自codahale。这个包提供了一系列的“量具”来提供metrics监测的功能,简单来说,我们只需要找出要监测的变量或方法,然后注册到系统中,它就可以提供不同的度量方式如值(Gauge),计数器(Counters),直方图(Histograms),计时器(Timers)和Health check,并可以用不同的形式输出,像JMX,csv,console,graphite等。总之是个好工具,官网 Github

下面来说Spark:

source

source其实就是数据来源,就是这些metrics是从哪里采集来的。在source文件夹下可以看见Source.scala,这是一个trait,有点类似于java中的接口

private[spark] trait Source {
def sourceName: String
def metricRegistry: MetricRegistry
}

它包括了source name 和 MetricRegistry,MetricRegistry是codahale的metrics包里来的,理解成用来“装”metrics的
虽然source下面只有JVMsource一个具体的实现类,实际上spark还有各种各样的source实现类来提供metrics。
kivi

sink

sink比较简单,就是metrics的输出形式,spark提供了ConsoleSink, CsvSink, GraphiteSink, JmxSink, Slf4jSink这几种方式来进行metrics的输出。

举ConsoleSink来讲,这是将metrics直接输出到执行页面的形式,用户可以在config里自己设置轮询时间,其中很重要的变量就是:

val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()

这也是codahale包里的类,就是将ConsoleSink里放着的metrics都注册到ConsoleReporter里。
其他Sink也都差不多,就不一一讲了。

MetricsSystem

这里就像是metrics监测的一个总控制台,它有这么几个成员变量:

private[this] val metricsConfig = new MetricsConfig(conf)
private val sinks = new mutable.ArrayBuffer[Sink]
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()

metricConfig就是读取了用户配置文件里的一些参数
sinks就是要输出的sink集合,sources自然就是source集合

来看start()方法:

def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
registerSources()
registerSinks()
sinks.foreach(_.start)
}

先做一个判断,然后注册所有source,注册所有sink,最后将sink一一启动。

registerSources()

private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Utils.classForName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantiated", e)
}
}
}

这里用了一个反射,通过source的类名启动了一个source对象,再将该对象source注册

registerSource()

def registerSource(source: Source) {
sources += source
try {
val regName = buildRegistryName(source)
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}

这个方法是先给source起了个名,然后把这个source注册进MetricsSystem的MetricRetrigy对象里。

再看下一步registerSinks()

private def registerSinks() {
    val instConfig = metricsConfig.getInstance(instance)
    val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
    sinkConfigs.foreach { kv =>
        val classPath = kv._2.getProperty("class")
          if (null != classPath) {
              try {
                  val sink = Utils.classForName(classPath)
                  .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
                  .newInstance(kv._2, registry, securityMgr)
                  if (kv._1 == "servlet") {
                      metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
                  } else {
                    sinks += sink.asInstanceOf[Sink]
                  }            
              } catch {
                  case e: Exception =>
                  logError("Sink class " + classPath + " cannot be instantiated")
                  throw e
              }
          }
    }
}

这边也是通过反射新建了一个sink的对象,然后把这对象放到MetricsSystem的sinks集合里。