目录
- 一,运行命令
- 二,任务提交流程图
- 三,启动脚本
- 四,程序入口类org.apache.spark.deploy.sparksubmit
- 五,org.apache.spark.deploy.yarn.yarnclusterapplication类
- 六, org.apache.spark.deploy.yarn.applicationmaster 类。
一,运行命令
bin/spark-submit \ --master yarn \ --deploy-mode cluster \ --class org.apache.spark.examples.sparkpi \ examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar
二,任务提交流程图
三,启动脚本
查看spark-submit 脚本文件,程序入口为
exec "${spark_home}"/bin/spark-class org.apache.spark.deploy.sparksubmit "$@“查看${spark_home}”/bin/spark-class可知该脚本执行了java -cp main-class 命令启动了一个java进程,进程名为sparksubmit,main函数在主类org.apache.spark.deploy.sparksubmit中。
实际执行的具体命令为:
/etc/alternatives/jre/bin/java -dhdp.version=3.0.1.0-187 -cp /usr/hdp/3.0.1.0-187/spark2/conf/:/usr/hdp/3.0.1.0-187/spark2/jars/*:/usr/hdp/3.0.1.0-187/hadoop/conf/ -xmx1g org.apache.spark.deploy.sparksubmit --master yarn --class org.apache.spark.examples.sparkpi examples/jars/spark-examples_2.11-2.3.1.3.0.1.0-187.jar
四,程序入口类org.apache.spark.deploy.sparksubmit
该类有个伴生对象,其中有main函数,创建了sparksubmit对象并执行dosubmit();
override def main(args: array[string]): unit = {
val submit = new sparksubmit() {...}
submit.dosubmit(args)
}
dosubmit 解析args参数,封装到appargs:sparksubmitarguments对象中,然后执行submit(appargs, uninitlog)。
def dosubmit(args: array[string]): unit = {
// initialize logging if it hasn't been done yet. keep track of whether logging needs to
// be reset before the application starts.
val uninitlog = initializelogifnecessary(true, silent = true)
val appargs = parsearguments(args)
if (appargs.verbose) {
loginfo(appargs.tostring)
}
appargs.action match {
case sparksubmitaction.submit => submit(appargs, uninitlog)
case sparksubmitaction.kill => kill(appargs)
case sparksubmitaction.request_status => requeststatus(appargs)
case sparksubmitaction.print_version => printversion()
}
}
submit(appargs, uninitlog) 调用 runmain(args: sparksubmitarguments, uninitlog: boolean)
private def runmain(args: sparksubmitarguments, uninitlog: boolean): unit = {
val (childargs, childclasspath, sparkconf, childmainclass) = preparesubmitenvironment(args)
.
.
.
try {
mainclass = utils.classforname(childmainclass)
} catch {...}
val app: sparkapplication = if (classof[sparkapplication].isassignablefrom(mainclass)) {
mainclass.getconstructor().newinstance().asinstanceof[sparkapplication]
} else {
new javamainapplication(mainclass)
}
.
.
.
try {
app.start(childargs.toarray, sparkconf)
} catch {
case t: throwable =>
throw findcause(t)
}
}
这里mainclass十分重要,先判读mainclass是否是sparkapplication的子类,如果是则通过反射调用其构造器创建对象;
如果不是则创建一个javamainapplication(是sparkapplication的子类)对象并在其override def start(args: array[string], conf: sparkconf)函数中利用反射执行mainclass中main函数。
sparkapplication创建完毕后执行其start(childargs.toarray, sparkconf) 方法。
/**
* entry point for a spark application. implementations must provide a no-argument constructor.
*/
private[spark] trait sparkapplication {
def start(args: array[string], conf: sparkconf): unit
}
/**
* implementation of sparkapplication that wraps a standard java class with a "main" method.
*
* configuration is propagated to the application via system properties, so running multiple
* of these in the same jvm may lead to undefined behavior due to configuration leaks.
*/
private[deploy] class javamainapplication(klass: class[_]) extends sparkapplication {
override def start(args: array[string], conf: sparkconf): unit = {
val mainmethod = klass.getmethod("main", new array[string](0).getclass)
if (!modifier.isstatic(mainmethod.getmodifiers)) {
throw new illegalstateexception("the main method in the given main class must be static")
}
val sysprops = conf.getall.tomap
sysprops.foreach { case (k, v) =>
sys.props(k) = v
}
mainmethod.invoke(null, args)
}
}
如果**–deploy-mode** 是client mainclass的值由命令行参数 –class 决定,也就是org.apache.spark.examples.sparkpi。
这种情况下会在当前虚拟机中执行客户端代码,如果是其它条件情况会比较复杂。
以上文指定的运行命令为例,这里mainclass是org.apache.spark.deploy.yarn.yarnclusterapplication类class对象。
private[deploy] val yarn_cluster_submit_class =
"org.apache.spark.deploy.yarn.yarnclusterapplication"
...
if (isyarncluster) {
childmainclass = yarn_cluster_submit_class
if (args.ispython) {
childargs += ("--primary-py-file", args.primaryresource)
childargs += ("--class", "org.apache.spark.deploy.pythonrunner")
} else if (args.isr) {
val mainfile = new path(args.primaryresource).getname
childargs += ("--primary-r-file", mainfile)
childargs += ("--class", "org.apache.spark.deploy.rrunner")
} else {
if (args.primaryresource != sparklauncher.no_resource) {
childargs += ("--jar", args.primaryresource)
}
childargs += ("--class", args.mainclass)
}
if (args.childargs != null) {
args.childargs.foreach { arg => childargs += ("--arg", arg) }
}
}
五,org.apache.spark.deploy.yarn.yarnclusterapplication类
该类在spark-yarn包中。
<dependency>
<groupid>org.apache.spark</groupid>
<artifactid>spark-yarn_${scala.version}</artifactid>
<version>${spark.version}</version>
</dependency>
开始执行其override def start(args: array[string], conf: sparkconf) 方法。
private[spark] class yarnclusterapplication extends sparkapplication {
override def start(args: array[string], conf: sparkconf): unit = {
// sparksubmit would use yarn cache to distribute files & jars in yarn mode,
// so remove them from sparkconf here for yarn mode.
conf.remove(jars)
conf.remove(files)
new client(new clientarguments(args), conf, null).run()
}
}
sparksubmi进程中创建一个客户端client,该类是一个代理类其中包括yarnclient,执行run() 方法。
提交application给yarn集群resourcemanager,提交成功后返回appid,
如果spark.submit.deploymode=cluster&&spark.yarn.submit.waitappcompletion=true,
sparksubmit进程会定期输出appid日志直到任务结束(monitorapplication(appid)),否则会输出一次日志然后退出。
def run(): unit = {
this.appid = submitapplication()
if (!launcherbackend.isconnected() && fireandforget) {
val report = getapplicationreport(appid)
val state = report.getyarnapplicationstate
loginfo(s"application report for $appid (state: $state)")
loginfo(formatreportdetails(report))
if (state == yarnapplicationstate.failed || state == yarnapplicationstate.killed) {
throw new sparkexception(s"application $appid finished with status: $state")
}
} else {
val yarnappreport(appstate, finalstate, diags) = monitorapplication(appid)
if (appstate == yarnapplicationstate.failed || finalstate == finalapplicationstatus.failed) {
diags.foreach { err =>
logerror(s"application diagnostics message: $err")
}
throw new sparkexception(s"application $appid finished with failed status")
}
if (appstate == yarnapplicationstate.killed || finalstate == finalapplicationstatus.killed) {
throw new sparkexception(s"application $appid is killed")
}
if (finalstate == finalapplicationstatus.undefined) {
throw new sparkexception(s"the final status of application $appid is undefined")
}
}
}
继续跟踪submitapplication()
def submitapplication(): applicationid = {
resourcerequesthelper.validateresources(sparkconf)
var appid: applicationid = null
try {
launcherbackend.connect()
yarnclient.init(hadoopconf)
yarnclient.start()
loginfo("requesting a new application from cluster with %d nodemanagers"
.format(yarnclient.getyarnclustermetrics.getnumnodemanagers))
// get a new application from our rm
val newapp = yarnclient.createapplication()
val newappresponse = newapp.getnewapplicationresponse()
appid = newappresponse.getapplicationid()
// the app staging dir based on the staging_dir configuration if configured
// otherwise based on the users home directory.
val appstagingbasedir = sparkconf.get(staging_dir)
.map { new path(_, usergroupinformation.getcurrentuser.getshortusername) }
.getorelse(filesystem.get(hadoopconf).gethomedirectory())
stagingdirpath = new path(appstagingbasedir, getappstagingdir(appid))
new callercontext("client", sparkconf.get(app_caller_context),
option(appid.tostring)).setcurrentcontext()
// verify whether the cluster has enough resources for our am
verifyclusterresources(newappresponse)
// set up the appropriate contexts to launch our am
val containercontext = createcontainerlaunchcontext(newappresponse)
val appcontext = createapplicationsubmissioncontext(newapp, containercontext)
// finally, submit and monitor the application
loginfo(s"submitting application $appid to resourcemanager")
yarnclient.submitapplication(appcontext)
launcherbackend.setappid(appid.tostring)
reportlauncherstate(sparkapphandle.state.submitted)
appid
} catch {
case e: throwable =>
if (stagingdirpath != null) {
cleanupstagingdir()
}
throw e
}
该方法做了如下工作(对应于任务提交流程图中的1,2,3):
1,向resourcemanager发送请求创建application,获取全局唯一的
appid。
2,根据配置的缓存目录信息+appid信息,创建运行application运行的缓存目录stagingdirpath。
3,verifyclusterresources 验证集群中是否有足够资源可用,没有的话抛出异常。
4,createcontainerlaunchcontext 创建container,其中封装了container进程的启动命令。
5,提交appcontext。
查看createcontainerlaunchcontext(newappresponse) 代码。
val amclass =
if (isclustermode) {
utils.classforname("org.apache.spark.deploy.yarn.applicationmaster").getname
} else {
utils.classforname("org.apache.spark.deploy.yarn.executorlauncher").getname
}
...
// command for the applicationmaster
val commands = prefixenv ++
seq(environment.java_home.$$() + "/bin/java", "-server") ++
javaopts ++ amargs ++
seq(
"1>", applicationconstants.log_dir_expansion_var + "/stdout",
"2>", applicationconstants.log_dir_expansion_var + "/stderr")
// todo: it would be nicer to just make sure there are no null commands here
val printablecommands = commands.map(s => if (s == null) "null" else s).tolist
amcontainer.setcommands(printablecommands.asjava)
container的启动代码大概为
bin/java -server org.apache.spark.deploy.yarn.applicationmaster –class …
六, org.apache.spark.deploy.yarn.applicationmaster 类。
yarn集群某一个nodemanager收到resourcemanager的命令,启动applicationmaster进程,对应任务提交流程图中的步骤4.
查看applicationmaster 伴生对象中的main方法。
def main(args: array[string]): unit = {
signalutils.registerlogger(log)
val amargs = new applicationmasterarguments(args)
val sparkconf = new sparkconf()
if (amargs.propertiesfile != null) {
utils.getpropertiesfromfile(amargs.propertiesfile).foreach { case (k, v) =>
sparkconf.set(k, v)
}
}
// set system properties for each config entry. this covers two use cases:
// - the default configuration stored by the sparkhadooputil class
// - the user application creating a new sparkconf in cluster mode
//
// both cases create a new sparkconf object which reads these configs from system properties.
sparkconf.getall.foreach { case (k, v) =>
sys.props(k) = v
}
val yarnconf = new yarnconfiguration(sparkhadooputil.newconfiguration(sparkconf))
master = new applicationmaster(amargs, sparkconf, yarnconf)
val ugi = sparkconf.get(principal) match {
// we only need to log in with the keytab in cluster mode. in client mode, the driver
// handles the user keytab.
case some(principal) if master.isclustermode =>
val originalcreds = usergroupinformation.getcurrentuser().getcredentials()
sparkhadooputil.get.loginuserfromkeytab(principal, sparkconf.get(keytab).ornull)
val newugi = usergroupinformation.getcurrentuser()
if (master.appattemptid == null || master.appattemptid.getattemptid > 1) {
// re-obtain delegation tokens if this is not a first attempt, as they might be outdated
// as of now. add the fresh tokens on top of the original user's credentials (overwrite).
// set the context class loader so that the token manager has access to jars
// distributed by the user.
utils.withcontextclassloader(master.userclassloader) {
val credentialmanager = new hadoopdelegationtokenmanager(sparkconf, yarnconf, null)
credentialmanager.obtaindelegationtokens(originalcreds)
}
}
// transfer the original user's tokens to the new user, since it may contain needed tokens
// (such as those user to connect to yarn).
newugi.addcredentials(originalcreds)
newugi
case _ =>
sparkhadooputil.get.createsparkuser()
}
ugi.doas(new privilegedexceptionaction[unit]() {
override def run(): unit = system.exit(master.run())
})
}
创建了applicationmaster对象并执行其run() 方法。
final def run(): int = {
try {
val attemptid = if (isclustermode) {
// set the web ui port to be ephemeral for yarn so we don't conflict with
// other spark processes running on the same box
system.setproperty(ui_port.key, "0")
// set the master and deploy mode property to match the requested mode.
system.setproperty("spark.master", "yarn")
system.setproperty(submit_deploy_mode.key, "cluster")
// set this internal configuration if it is running on cluster mode, this
// configuration will be checked in sparkcontext to avoid misuse of yarn cluster mode.
system.setproperty("spark.yarn.app.id", appattemptid.getapplicationid().tostring())
option(appattemptid.getattemptid.tostring)
} else {
none
}
new callercontext(
"appmaster", sparkconf.get(app_caller_context),
option(appattemptid.getapplicationid.tostring), attemptid).setcurrentcontext()
loginfo("applicationattemptid: " + appattemptid)
// this shutdown hook should run *after* the sparkcontext is shut down.
val priority = shutdownhookmanager.spark_context_shutdown_priority - 1
shutdownhookmanager.addshutdownhook(priority) { () =>
val maxappattempts = client.getmaxregattempts(sparkconf, yarnconf)
val islastattempt = appattemptid.getattemptid() >= maxappattempts
if (!finished) {
// the default state of applicationmaster is failed if it is invoked by shut down hook.
// this behavior is different compared to 1.x version.
// if user application is exited ahead of time by calling system.exit(n), here mark
// this application as failed with exit_early. for a good shutdown, user shouldn't call
// system.exit(0) to terminate the application.
finish(finalstatus,
applicationmaster.exit_early,
"shutdown hook called before final status was reported.")
}
if (!unregistered) {
// we only want to unregister if we don't want the rm to retry
if (finalstatus == finalapplicationstatus.succeeded || islastattempt) {
unregister(finalstatus, finalmsg)
cleanupstagingdir(new path(system.getenv("spark_yarn_staging_dir")))
}
}
}
if (isclustermode) {
rundriver()
} else {
runexecutorlauncher()
}
} catch {
case e: exception =>
// catch everything else if not specifically handled
logerror("uncaught exception: ", e)
finish(finalapplicationstatus.failed,
applicationmaster.exit_uncaught_exception,
"uncaught exception: " + stringutils.stringifyexception(e))
} finally {
try {
metricssystem.foreach { ms =>
ms.report()
ms.stop()
}
} catch {
case e: exception =>
logwarning("exception during stopping of the metric system: ", e)
}
}
exitcode
}
执行rundriver()方法。
userclassthread = startuserapplication() 启动了一个名为driver的线程,该线程中通过反射执行命令行中**–class指定的类(org.apache.spark.examples.sparkpi)中的main**函数,初始化sparkcontext。主线程唤醒后,向resourcemanager注册applicationmaster,步骤5;
private def rundriver(): unit = {
addamipfilter(none, system.getenv(applicationconstants.application_web_proxy_base_env))
userclassthread = startuserapplication()
// this a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the thread executing the user class.
loginfo("waiting for spark context initialization...")
val totalwaittime = sparkconf.get(am_max_wait_time)
try {
val sc = threadutils.awaitresult(sparkcontextpromise.future,
duration(totalwaittime, timeunit.milliseconds))
if (sc != null) {
val rpcenv = sc.env.rpcenv
val userconf = sc.getconf
val host = userconf.get(driver_host_address)
val port = userconf.get(driver_port)
registeram(host, port, userconf, sc.ui.map(_.weburl), appattemptid)
val driverref = rpcenv.setupendpointref(
rpcaddress(host, port),
yarnschedulerbackend.endpoint_name)
createallocator(driverref, userconf, rpcenv, appattemptid, distcacheconf)
} else {
// sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a sparkcontext.
throw new illegalstateexception("user did not initialize spark context!")
}
resumedriver()
userclassthread.join()
} catch {
case e: sparkexception if e.getcause().isinstanceof[timeoutexception] =>
logerror(
s"sparkcontext did not initialize after waiting for $totalwaittime ms. " +
"please check earlier log output for errors. failing the application.")
finish(finalapplicationstatus.failed,
applicationmaster.exit_sc_not_inited,
"timed out waiting for sparkcontext.")
} finally {
resumedriver()
}
}
private def startuserapplication(): thread = {
loginfo("starting the user application in a separate thread")
var userargs = args.userargs
if (args.primarypyfile != null && args.primarypyfile.endswith(".py")) {
// when running pyspark, the app is run using pythonrunner. the second argument is the list
// of files to add to pythonpath, which client.scala already handles, so it's empty.
userargs = seq(args.primarypyfile, "") ++ userargs
}
if (args.primaryrfile != null &&
(args.primaryrfile.endswith(".r") || args.primaryrfile.endswith(".r"))) {
// todo(davies): add r dependencies here
}
val mainmethod = userclassloader.loadclass(args.userclass)
.getmethod("main", classof[array[string]])
val userthread = new thread {
override def run(): unit = {
try {
if (!modifier.isstatic(mainmethod.getmodifiers)) {
logerror(s"could not find static main method in object ${args.userclass}")
finish(finalapplicationstatus.failed, applicationmaster.exit_exception_user_class)
} else {
mainmethod.invoke(null, userargs.toarray)
finish(finalapplicationstatus.succeeded, applicationmaster.exit_success)
logdebug("done running user class")
}
} catch {
case e: invocationtargetexception =>
e.getcause match {
case _: interruptedexception =>
// reporter thread can interrupt to stop user class
case sparkuserappexception(exitcode) =>
val msg = s"user application exited with status $exitcode"
logerror(msg)
finish(finalapplicationstatus.failed, exitcode, msg)
case cause: throwable =>
logerror("user class threw exception: " + cause, cause)
finish(finalapplicationstatus.failed,
applicationmaster.exit_exception_user_class,
"user class threw exception: " + stringutils.stringifyexception(cause))
}
sparkcontextpromise.tryfailure(e.getcause())
} finally {
// notify the thread waiting for the sparkcontext, in case the application did not
// instantiate one. this will do nothing when the user code instantiates a sparkcontext
// (with the correct master), or when the user code throws an exception (due to the
// tryfailure above).
sparkcontextpromise.trysuccess(null)
}
}
}
userthread.setcontextclassloader(userclassloader)
userthread.setname("driver")
userthread.start()
userthread
}
注册完成后,主线程处理yarn返回的资源createallocator(driverref, userconf, rpcenv, appattemptid, distcacheconf)。
private def createallocator(
driverref: rpcendpointref,
_sparkconf: sparkconf,
rpcenv: rpcenv,
appattemptid: applicationattemptid,
distcacheconf: sparkconf): unit = {
// in client mode, the am may be restarting after delegation tokens have reached their ttl. so
// always contact the driver to get the current set of valid tokens, so that local resources can
// be initialized below.
if (!isclustermode) {
val tokens = driverref.asksync[array[byte]](retrievedelegationtokens)
if (tokens != null) {
sparkhadooputil.get.adddelegationtokens(tokens, _sparkconf)
}
}
val appid = appattemptid.getapplicationid().tostring()
val driverurl = rpcendpointaddress(driverref.address.host, driverref.address.port,
coarsegrainedschedulerbackend.endpoint_name).tostring
val localresources = preparelocalresources(distcacheconf)
// before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// use placeholders for information that changes such as executor ids.
loginfo {
val executormemory = _sparkconf.get(executor_memory).toint
val executorcores = _sparkconf.get(executor_cores)
val dummyrunner = new executorrunnable(none, yarnconf, _sparkconf, driverurl, "<executorid>",
"<hostname>", executormemory, executorcores, appid, securitymgr, localresources,
resourceprofile.default_resource_profile_id)
dummyrunner.launchcontextdebuginfo()
}
allocator = client.createallocator(
yarnconf,
_sparkconf,
appattemptid,
driverurl,
driverref,
securitymgr,
localresources)
// initialize the am endpoint *after* the allocator has been initialized. this ensures
// that when the driver sends an initial executor request (e.g. after an am restart),
// the allocator is ready to service requests.
rpcenv.setupendpoint("yarnam", new amendpoint(rpcenv, driverref))
allocator.allocateresources()
val ms = metricssystem.createmetricssystem(metricssysteminstances.application_master,
sparkconf, securitymgr)
val prefix = _sparkconf.get(yarn_metrics_namespace).getorelse(appid)
ms.registersource(new applicationmastersource(prefix, allocator))
// do not register static sources in this case as per spark-25277
ms.start(false)
metricssystem = some(ms)
reporterthread = launchreporterthread()
}
只看关键代码allocator.allocateresources(),处理分配的资源。
def allocateresources(): unit = synchronized {
updateresourcerequests()
val progressindicator = 0.1f
// poll the resourcemanager. this doubles as a heartbeat if there are no pending container
// requests.
val allocateresponse = amclient.allocate(progressindicator)
val allocatedcontainers = allocateresponse.getallocatedcontainers()
allocatorblacklisttracker.setnumclusternodes(allocateresponse.getnumclusternodes)
if (allocatedcontainers.size > 0) {
logdebug(("allocated containers: %d. current executor count: %d. " +
"launching executor count: %d. cluster resources: %s.")
.format(
allocatedcontainers.size,
runningexecutors.size,
numexecutorsstarting.get,
allocateresponse.getavailableresources))
handleallocatedcontainers(allocatedcontainers.asscala)
}
val completedcontainers = allocateresponse.getcompletedcontainersstatuses()
if (completedcontainers.size > 0) {
logdebug("completed %d containers".format(completedcontainers.size))
processcompletedcontainers(completedcontainers.asscala)
logdebug("finished processing %d completed containers. current running executor count: %d."
.format(completedcontainers.size, runningexecutors.size))
}
}
如果分配的container数量大于0,调用** handleallocatedcontainers(allocatedcontainers.asscala)**
def handleallocatedcontainers(allocatedcontainers: seq[container]): unit = {
val containerstouse = new arraybuffer[container](allocatedcontainers.size)
// match incoming requests by host
val remainingafterhostmatches = new arraybuffer[container]
for (allocatedcontainer <- allocatedcontainers) {
matchcontainertorequest(allocatedcontainer, allocatedcontainer.getnodeid.gethost,
containerstouse, remainingafterhostmatches)
}
// match remaining by rack. because yarn's rackresolver swallows thread interrupts
// (see spark-27094), which can cause this code to miss interrupts from the am, use
// a separate thread to perform the operation.
val remainingafterrackmatches = new arraybuffer[container]
if (remainingafterhostmatches.nonempty) {
var exception: option[throwable] = none
val thread = new thread("spark-rack-resolver") {
override def run(): unit = {
try {
for (allocatedcontainer <- remainingafterhostmatches) {
val rack = resolver.resolve(allocatedcontainer.getnodeid.gethost)
matchcontainertorequest(allocatedcontainer, rack, containerstouse,
remainingafterrackmatches)
}
} catch {
case e: throwable =>
exception = some(e)
}
}
}
thread.setdaemon(true)
thread.start()
try {
thread.join()
} catch {
case e: interruptedexception =>
thread.interrupt()
throw e
}
if (exception.isdefined) {
throw exception.get
}
}
// assign remaining that are neither node-local nor rack-local
val remainingafteroffrackmatches = new arraybuffer[container]
for (allocatedcontainer <- remainingafterrackmatches) {
matchcontainertorequest(allocatedcontainer, any_host, containerstouse,
remainingafteroffrackmatches)
}
if (remainingafteroffrackmatches.nonempty) {
logdebug(s"releasing ${remainingafteroffrackmatches.size} unneeded containers that were " +
s"allocated to us")
for (container <- remainingafteroffrackmatches) {
internalreleasecontainer(container)
}
}
runallocatedcontainers(containerstouse)
loginfo("received %d containers from yarn, launching executors on %d of them."
.format(allocatedcontainers.size, containerstouse.size))
}
这里会根据主机host,机架rack等信息队container进行分配。完成后启动container,runallocatedcontainers(containerstouse)。
private val launcherpool = threadutils.newdaemoncachedthreadpool(
"containerlauncher", sparkconf.get(container_launch_max_threads))
创建线程池launcherpool。
/**
* launches executors in the allocated containers.
*/
private def runallocatedcontainers(containerstouse: arraybuffer[container]): unit = {
for (container <- containerstouse) {
executoridcounter += 1
val executorhostname = container.getnodeid.gethost
val containerid = container.getid
val executorid = executoridcounter.tostring
assert(container.getresource.getmemory >= resource.getmemory)
loginfo(s"launching container $containerid on host $executorhostname " +
s"for executor with id $executorid")
def updateinternalstate(): unit = synchronized {
runningexecutors.add(executorid)
numexecutorsstarting.decrementandget()
executoridtocontainer(executorid) = container
containeridtoexecutorid(container.getid) = executorid
val containerset = allocatedhosttocontainersmap.getorelseupdate(executorhostname,
new hashset[containerid])
containerset += containerid
allocatedcontainertohostmap.put(containerid, executorhostname)
}
if (runningexecutors.size() < targetnumexecutors) {
numexecutorsstarting.incrementandget()
if (launchcontainers) {
launcherpool.execute(() => {
try {
new executorrunnable(
some(container),
conf,
sparkconf,
driverurl,
executorid,
executorhostname,
executormemory,
executorcores,
appattemptid.getapplicationid.tostring,
securitymgr,
localresources,
resourceprofile.default_resource_profile_id // use until fully supported
).run()
updateinternalstate()
} catch {
case e: throwable =>
numexecutorsstarting.decrementandget()
if (nonfatal(e)) {
logerror(s"failed to launch executor $executorid on container $containerid", e)
// assigned container should be released immediately
// to avoid unnecessary resource occupation.
amclient.releaseassignedcontainer(containerid)
} else {
throw e
}
}
})
} else {
// for test only
updateinternalstate()
}
} else {
loginfo(("skip launching executorrunnable as running executors count: %d " +
"reached target executors count: %d.").format(
runningexecutors.size, targetnumexecutors))
}
}
}
查看executorrunnable 类,其中nmclient = nmclient.createnmclient(), nodemanager客户端,负责于nodemanager交互;其preparecommand() 方法拼接了一个进程启动命令,大体格式为:
bin/java -server org.apache.spark.executor.yarncoarsegrainedexecutorbackend ...
applicationmaster进程中的launcherpool线程池,会根据container的个数挨个启动线程executorrunnable,executorrunnable中的nmclient会将拼接好的jvm启动命令发送给相关的nodemanager,启动container进程,进程名为yarncoarsegrainedexecutorbackend。
executorrunnable完整代码:
private[yarn] class executorrunnable(
container: option[container],
conf: yarnconfiguration,
sparkconf: sparkconf,
masteraddress: string,
executorid: string,
hostname: string,
executormemory: int,
executorcores: int,
appid: string,
securitymgr: securitymanager,
localresources: map[string, localresource],
resourceprofileid: int) extends logging {
var rpc: yarnrpc = yarnrpc.create(conf)
var nmclient: nmclient = _
def run(): unit = {
logdebug("starting executor container")
nmclient = nmclient.createnmclient()
nmclient.init(conf)
nmclient.start()
startcontainer()
}
def launchcontextdebuginfo(): string = {
val commands = preparecommand()
val env = prepareenvironment()
s"""
|===============================================================================
|default yarn executor launch context:
| env:
|${utils.redact(sparkconf, env.toseq).map { case (k, v) => s" $k -> $v\n" }.mkstring}
| command:
| ${utils.redactcommandlineargs(sparkconf, commands).mkstring(" \\ \n ")}
|
| resources:
|${localresources.map { case (k, v) => s" $k -> $v\n" }.mkstring}
|===============================================================================""".stripmargin
}
def startcontainer(): java.util.map[string, bytebuffer] = {
val ctx = records.newrecord(classof[containerlaunchcontext])
.asinstanceof[containerlaunchcontext]
val env = prepareenvironment().asjava
ctx.setlocalresources(localresources.asjava)
ctx.setenvironment(env)
val credentials = usergroupinformation.getcurrentuser().getcredentials()
val dob = new dataoutputbuffer()
credentials.writetokenstoragetostream(dob)
ctx.settokens(bytebuffer.wrap(dob.getdata()))
val commands = preparecommand()
ctx.setcommands(commands.asjava)
ctx.setapplicationacls(
yarnsparkhadooputil.getapplicationaclsforyarn(securitymgr).asjava)
// if external shuffle service is enabled, register with the yarn shuffle service already
// started on the nodemanager and, if authentication is enabled, provide it with our secret
// key for fetching shuffle files later
if (sparkconf.get(shuffle_service_enabled)) {
val secretstring = securitymgr.getsecretkey()
val secretbytes =
if (secretstring != null) {
// this conversion must match how the yarnshuffleservice decodes our secret
javautils.stringtobytes(secretstring)
} else {
// authentication is not enabled, so just provide dummy metadata
bytebuffer.allocate(0)
}
ctx.setservicedata(collections.singletonmap("spark_shuffle", secretbytes))
}
// send the start request to the containermanager
try {
nmclient.startcontainer(container.get, ctx)
} catch {
case ex: exception =>
throw new sparkexception(s"exception while starting container ${container.get.getid}" +
s" on host $hostname", ex)
}
}
private def preparecommand(): list[string] = {
// extra options for the jvm
val javaopts = listbuffer[string]()
// set the jvm memory
val executormemorystring = executormemory + "m"
javaopts += "-xmx" + executormemorystring
// set extra java options for the executor, if defined
sparkconf.get(executor_java_options).foreach { opts =>
val subsopt = utils.substituteappnexecids(opts, appid, executorid)
javaopts ++= utils.splitcommandstring(subsopt).map(yarnsparkhadooputil.escapeforshell)
}
// set the library path through a command prefix to append to the existing value of the
// env variable.
val prefixenv = sparkconf.get(executor_library_path).map { libpath =>
client.createlibrarypathprefix(libpath, sparkconf)
}
javaopts += "-djava.io.tmpdir=" +
new path(environment.pwd.$$(), yarnconfiguration.default_container_temp_dir)
// certain configs need to be passed here because they are needed before the executor
// registers with the scheduler and transfers the spark configs. since the executor backend
// uses rpc to connect to the scheduler, the rpc settings are needed as well as the
// authentication settings.
sparkconf.getall
.filter { case (k, v) => sparkconf.isexecutorstartupconf(k) }
.foreach { case (k, v) => javaopts += yarnsparkhadooputil.escapeforshell(s"-d$k=$v") }
// commenting it out for now - so that people can refer to the properties if required. remove
// it once cpuset version is pushed out.
// the context is, default gc for server class machines end up using all cores to do gc - hence
// if there are multiple containers in same node, spark gc effects all other containers
// performance (which can also be other spark containers)
// instead of using this, rely on cpusets by yarn to enforce spark behaves 'properly' in
// multi-tenant environments. not sure how default java gc behaves if it is limited to subset
// of cores on a node.
/*
else {
// if no java_opts specified, default to using -xx:+cmsincrementalmode
// it might be possible that other modes/config is being done in
// spark.executor.extrajavaoptions, so we don't want to mess with it.
// in our expts, using (default) throughput collector has severe perf ramifications in
// multi-tenant machines
// the options are based on
// http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20when%20to%20use
// %20the%20concurrent%20low%20pause%20collector|outline
javaopts += "-xx:+useconcmarksweepgc"
javaopts += "-xx:+cmsincrementalmode"
javaopts += "-xx:+cmsincrementalpacing"
javaopts += "-xx:cmsincrementaldutycyclemin=0"
javaopts += "-xx:cmsincrementaldutycycle=10"
}
*/
// for log4j configuration to reference
javaopts += ("-dspark.yarn.app.container.log.dir=" + applicationconstants.log_dir_expansion_var)
val userclasspath = client.getuserclasspath(sparkconf).flatmap { uri =>
val abspath =
if (new file(uri.getpath()).isabsolute()) {
client.getclusterpath(sparkconf, uri.getpath())
} else {
client.buildpath(environment.pwd.$(), uri.getpath())
}
seq("--user-class-path", "file:" + abspath)
}.toseq
yarnsparkhadooputil.addoutofmemoryerrorargument(javaopts)
val commands = prefixenv ++
seq(environment.java_home.$$() + "/bin/java", "-server") ++
javaopts ++
seq("org.apache.spark.executor.yarncoarsegrainedexecutorbackend",
"--driver-url", masteraddress,
"--executor-id", executorid,
"--hostname", hostname,
"--cores", executorcores.tostring,
"--app-id", appid,
"--resourceprofileid", resourceprofileid.tostring) ++
userclasspath ++
seq(
s"1>${applicationconstants.log_dir_expansion_var}/stdout",
s"2>${applicationconstants.log_dir_expansion_var}/stderr")
// todo: it would be nicer to just make sure there are no null commands here
commands.map(s => if (s == null) "null" else s).tolist
}
private def prepareenvironment(): hashmap[string, string] = {
val env = new hashmap[string, string]()
client.populateclasspath(null, conf, sparkconf, env, sparkconf.get(executor_class_path))
system.getenv().asscala.filterkeys(_.startswith("spark"))
.foreach { case (k, v) => env(k) = v }
sparkconf.getexecutorenv.foreach { case (key, value) =>
if (key == environment.classpath.name()) {
// if the key of env variable is classpath, we assume it is a path and append it.
// this is kept for backward compatibility and consistency with hadoop
yarnsparkhadooputil.addpathtoenvironment(env, key, value)
} else {
// for other env variables, simply overwrite the value.
env(key) = value
}
}
env
}
}
以上就是解析spark源码yarn-cluster模式任务提交的详细内容,更多关于spark源码解析的资料请关注www.887551.com其它相关文章!