From 690dca8371349123d5c0cc9a58fff0951eda80bb Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 22 Apr 2019 16:01:40 -0400 Subject: [PATCH 1/2] added shuffle file server discovery --- .../spark/internal/config/package.scala | 9 ++ .../ShuffleServiceAddressProvider.scala | 25 +-- ...ShuffleServiceAddressProviderFactory.scala | 25 +++ .../org/apache/spark/deploy/k8s/Config.scala | 23 +++ .../k8s/SparkKubernetesClientFactory.scala | 33 +++- .../cluster/k8s/ExecutorPodsSnapshot.scala | 8 +- .../k8s/KubernetesClusterManager.scala | 29 +--- .../cluster/k8s/SparkPodStates.scala | 67 ++++++++ ...ernetesShuffleServiceAddressProvider.scala | 147 ++++++++++++++++++ ...ShuffleServiceAddressProviderFactory.scala | 53 +++++++ .../src/main/dockerfiles/spark/Dockerfile | 2 +- 11 files changed, 371 insertions(+), 50 deletions(-) rename resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala => core/src/main/scala/org/apache/spark/shuffle/ShuffleServiceAddressProvider.scala (60%) create mode 100644 core/src/main/scala/org/apache/spark/shuffle/ShuffleServiceAddressProviderFactory.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/SparkPodStates.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesShuffleServiceAddressProvider.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesShuffleServiceAddressProviderFactory.scala diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 6e996b4c1936f..6b9cee1ec3519 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -360,6 +360,9 @@ package object config { private[spark] val SHUFFLE_SERVICE_ENABLED = ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false) + private[spark] val K8S_SHUFFLE_SERVICE_ENABLED = + ConfigBuilder("spark.k8s.shuffle.service.enabled").booleanConf.createWithDefault(false) + private[spark] val SHUFFLE_SERVICE_DB_ENABLED = ConfigBuilder("spark.shuffle.service.db.enabled") .doc("Whether to use db in ExternalShuffleService. Note that this only affects " + @@ -773,6 +776,12 @@ package object config { .stringConf .createWithDefault(classOf[DefaultShuffleDataIO].getName) + private[spark] val SHUFFLE_SERVICE_PROVIDER_CLASS = + ConfigBuilder("spark.shuffle.provider.plugin.class") + .doc("Experimental. Specify a class that can handle detecting shuffle service pods.") + .stringConf + .createOptional + private[spark] val SHUFFLE_FILE_BUFFER_SIZE = ConfigBuilder("spark.shuffle.file.buffer") .doc("Size of the in-memory buffer for each shuffle file output stream, in KiB unless " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleServiceAddressProvider.scala similarity index 60% rename from resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala rename to core/src/main/scala/org/apache/spark/shuffle/ShuffleServiceAddressProvider.scala index 83daddf714489..96d529872b306 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodStates.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleServiceAddressProvider.scala @@ -14,24 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.scheduler.cluster.k8s -import io.fabric8.kubernetes.api.model.Pod +package org.apache.spark.shuffle -sealed trait ExecutorPodState { - def pod: Pod +trait ShuffleServiceAddressProvider { + def start(): Unit = {} + def getShuffleServiceAddresses(): List[(String, Int)] + def stop(): Unit = {} } -case class PodRunning(pod: Pod) extends ExecutorPodState - -case class PodPending(pod: Pod) extends ExecutorPodState - -sealed trait FinalPodState extends ExecutorPodState - -case class PodSucceeded(pod: Pod) extends FinalPodState - -case class PodFailed(pod: Pod) extends FinalPodState - -case class PodDeleted(pod: Pod) extends FinalPodState - -case class PodUnknown(pod: Pod) extends ExecutorPodState +private[spark] object DefaultShuffleServiceAddressProvider extends ShuffleServiceAddressProvider { + override def getShuffleServiceAddresses(): List[(String, Int)] = List.empty[(String, Int)] +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleServiceAddressProviderFactory.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleServiceAddressProviderFactory.scala new file mode 100644 index 0000000000000..68adb8e44585c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleServiceAddressProviderFactory.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.SparkConf + +trait ShuffleServiceAddressProviderFactory { + def canCreate(masterUrl: String): Boolean + def create(conf: SparkConf): ShuffleServiceAddressProvider +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 83b5a758f0f5e..483ea9619f064 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -325,6 +325,26 @@ private[spark] object Config extends Logging { .booleanConf .createWithDefault(true) + val KUBERNETES_REMOTE_SHUFFLE_SERVICE_PODS_NAMESPACE = + ConfigBuilder("spark.kubernetes.shuffle.service.remote.pods.namespace") + .doc("Namespace of the pods that are running the shuffle service instances for remote" + + " pushing of shuffle data.") + .stringConf + .createOptional + + val KUBERNETES_REMOTE_SHUFFLE_SERVICE_PORT = + ConfigBuilder("spark.kubernetes.shuffle.service.remote.port") + .doc("Port of the external k8s shuffle service pods") + .intConf + .createWithDefault(7337) + + val KUBERNETES_REMOTE_SHUFFLE_SERVICE_CLEANUP_INTERVAL = + ConfigBuilder("spark.kubernetes.shuffle.service.cleanup.interval") + .doc("Cleanup interval for the shuffle service to take down an app id") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("30s") + + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." @@ -349,4 +369,7 @@ private[spark] object Config extends Logging { val KUBERNETES_VOLUMES_OPTIONS_SIZE_LIMIT_KEY = "options.sizeLimit" val KUBERNETES_DRIVER_ENV_PREFIX = "spark.kubernetes.driverEnv." + + val KUBERNETES_REMOTE_SHUFFLE_SERVICE_LABELS = + "spark.kubernetes.shuffle.service.remote.label." } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 459259f77796c..abfbe3f1434a9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -21,12 +21,13 @@ import java.io.File import com.google.common.base.Charsets import com.google.common.io.Files import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} -import io.fabric8.kubernetes.client.Config.autoConfigure +import io.fabric8.kubernetes.client.Config._ import io.fabric8.kubernetes.client.utils.HttpClientUtils import okhttp3.Dispatcher import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.util.ThreadUtils @@ -37,6 +38,36 @@ import org.apache.spark.util.ThreadUtils * options for different components. */ private[spark] object SparkKubernetesClientFactory extends Logging { + def getDriverKubernetesClient(conf: SparkConf, masterURL: String): KubernetesClient = { + val wasSparkSubmittedInClusterMode = conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) + val (authConfPrefix, + apiServerUri, + defaultServiceAccountToken, + defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { + require(conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, + "If the application is deployed using spark-submit in cluster mode, the driver pod name " + + "must be provided.") + (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + KUBERNETES_MASTER_INTERNAL_URL, + Some(new File(KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + } else { + (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, + KubernetesUtils.parseMasterUrl(masterURL), + None, + None) + } + + val kubernetesClient = createKubernetesClient( + apiServerUri, + Some(conf.get(KUBERNETES_NAMESPACE)), + authConfPrefix, + SparkKubernetesClientFactory.ClientType.Driver, + conf, + defaultServiceAccountToken, + defaultServiceAccountCaCrt) + kubernetesClient + } def createKubernetesClient( master: String, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala index 435a5f1461c92..aa4ce28aeb6ba 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging /** * An immutable view of the current executor pods that are running in the cluster. */ -private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, ExecutorPodState]) { +private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, SparkPodState]) { import ExecutorPodsSnapshot._ @@ -42,15 +42,15 @@ object ExecutorPodsSnapshot extends Logging { ExecutorPodsSnapshot(toStatesByExecutorId(executorPods)) } - def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long, ExecutorPodState]) + def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long, SparkPodState]) - private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, ExecutorPodState] = { + private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, SparkPodState] = { executorPods.map { pod => (pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong, toState(pod)) }.toMap } - private def toState(pod: Pod): ExecutorPodState = { + private def toState(pod: Pod): SparkPodState = { if (isDeleted(pod)) { PodDeleted(pod) } else { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 31ca06b721c5d..9d6ab4a52f4a3 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -42,33 +42,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { - val wasSparkSubmittedInClusterMode = sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK) - val (authConfPrefix, - apiServerUri, - defaultServiceAccountToken, - defaultServiceAccountCaCrt) = if (wasSparkSubmittedInClusterMode) { - require(sc.conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, - "If the application is deployed using spark-submit in cluster mode, the driver pod name " + - "must be provided.") - (KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, - KUBERNETES_MASTER_INTERNAL_URL, - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), - Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) - } else { - (KUBERNETES_AUTH_CLIENT_MODE_PREFIX, - KubernetesUtils.parseMasterUrl(masterURL), - None, - None) - } - - val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( - apiServerUri, - Some(sc.conf.get(KUBERNETES_NAMESPACE)), - authConfPrefix, - SparkKubernetesClientFactory.ClientType.Driver, - sc.conf, - defaultServiceAccountToken, - defaultServiceAccountCaCrt) + val kubernetesClient = SparkKubernetesClientFactory.getDriverKubernetesClient( + sc.conf, masterURL) if (sc.conf.get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE).isDefined) { KubernetesUtils.loadPodFromTemplate( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/SparkPodStates.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/SparkPodStates.scala new file mode 100644 index 0000000000000..e5d9594fc3d5e --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/SparkPodStates.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.Locale + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.internal.Logging + +sealed trait SparkPodState { + def pod: Pod +} + +case class PodRunning(pod: Pod) extends SparkPodState + +case class PodPending(pod: Pod) extends SparkPodState + +sealed trait FinalPodState extends SparkPodState + +case class PodSucceeded(pod: Pod) extends FinalPodState + +case class PodFailed(pod: Pod) extends FinalPodState + +case class PodDeleted(pod: Pod) extends FinalPodState + +case class PodUnknown(pod: Pod) extends SparkPodState + +object SparkPodState extends Logging { + def toState(pod: Pod): SparkPodState = { + if (isDeleted(pod)) { + PodDeleted(pod) + } else { + val phase = pod.getStatus.getPhase.toLowerCase(Locale.ROOT) + phase match { + case "pending" => + PodPending(pod) + case "running" => + PodRunning(pod) + case "failed" => + PodFailed(pod) + case "succeeded" => + PodSucceeded(pod) + case _ => + logWarning(s"Received unknown phase $phase for executor pod with name" + + s" ${pod.getMetadata.getName} in namespace ${pod.getMetadata.getNamespace}") + PodUnknown(pod) + } + } + } + + private def isDeleted(pod: Pod): Boolean = pod.getMetadata.getDeletionTimestamp != null +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesShuffleServiceAddressProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesShuffleServiceAddressProvider.scala new file mode 100644 index 0000000000000..513ca52583615 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesShuffleServiceAddressProvider.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} +import java.util.concurrent.locks.ReentrantReadWriteLock + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.cluster.k8s._ +import org.apache.spark.scheduler.cluster.k8s.SparkPodState +import org.apache.spark.util.Utils + +class KubernetesShuffleServiceAddressProvider( + kubernetesClient: KubernetesClient, + pollForPodsExecutor: ScheduledExecutorService, + podLabels: Map[String, String], + namespace: String, + portNumber: Int) + extends ShuffleServiceAddressProvider with Logging { + + // General implementation remark: this bears a strong resemblance to ExecutorPodsSnapshotsStore, + // but we don't need all "in-between" lists of all executor pods, just the latest known list + // when we query in getShuffleServiceAddresses. + + private val podsUpdateLock = new ReentrantReadWriteLock() + + private val shuffleServicePods = mutable.HashMap.empty[String, Pod] + + private var shuffleServicePodsWatch: Watch = _ + private var pollForPodsTask: ScheduledFuture[_] = _ + + override def start(): Unit = { + pollForPods() + pollForPodsTask = pollForPodsExecutor.scheduleWithFixedDelay( + () => pollForPods(), 0, 10, TimeUnit.SECONDS) + shuffleServicePodsWatch = kubernetesClient + .pods() + .inNamespace(namespace) + .withLabels(podLabels.asJava).watch(new PutPodsInCacheWatcher()) + } + + override def stop(): Unit = { + Utils.tryLogNonFatalError { + if (pollForPodsTask != null) { + pollForPodsTask.cancel(false) + } + } + + Utils.tryLogNonFatalError { + if (shuffleServicePodsWatch != null) { + shuffleServicePodsWatch.close() + } + } + + Utils.tryLogNonFatalError { + kubernetesClient.close() + } + } + + override def getShuffleServiceAddresses(): List[(String, Int)] = { + val readLock = podsUpdateLock.readLock() + readLock.lock() + try { + val addresses = shuffleServicePods.values.map(pod => { + (pod.getStatus.getPodIP, portNumber) + }).toList + logInfo(s"Found remote shuffle service addresses at $addresses.") + addresses + } finally { + readLock.unlock() + } + } + + private def pollForPods(): Unit = { + val writeLock = podsUpdateLock.writeLock() + writeLock.lock() + try { + val allPods = kubernetesClient + .pods() + .inNamespace(namespace) + .withLabels(podLabels.asJava) + .list() + shuffleServicePods.clear() + allPods.getItems.asScala.foreach(updatePod) + } finally { + writeLock.unlock() + } + } + + private def updatePod(pod: Pod): Unit = { + require(podsUpdateLock.isWriteLockedByCurrentThread, "Should only update pods under lock.") + val state = SparkPodState.toState(pod) + state match { + case PodPending(_) | PodFailed(_) | PodSucceeded(_) | PodDeleted(_) => + shuffleServicePods.remove(pod.getMetadata.getName) + case PodRunning(_) => + shuffleServicePods.put(pod.getMetadata.getName, pod) + case _ => + logWarning(s"Unknown state $state for pod named ${pod.getMetadata.getName}") + } + } + + private def deletePod(pod: Pod): Unit = { + require(podsUpdateLock.isWriteLockedByCurrentThread, "Should only delete under lock.") + shuffleServicePods.remove(pod.getMetadata.getName) + } + + private class PutPodsInCacheWatcher extends Watcher[Pod] { + override def eventReceived(action: Watcher.Action, pod: Pod): Unit = { + val writeLock = podsUpdateLock.writeLock() + writeLock.lock() + try { + updatePod(pod) + } finally { + writeLock.unlock() + } + } + + override def onClose(e: KubernetesClientException): Unit = {} + } + + private implicit def toRunnable(func: () => Unit): Runnable = { + new Runnable { + override def run(): Unit = func() + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesShuffleServiceAddressProviderFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesShuffleServiceAddressProviderFactory.scala new file mode 100644 index 0000000000000..de2b7eb7a322c --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/shuffle/KubernetesShuffleServiceAddressProviderFactory.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.internal.{config => C, Logging} +import org.apache.spark.util.ThreadUtils + +class KubernetesShuffleServiceAddressProviderFactory + extends ShuffleServiceAddressProviderFactory with Logging { + override def canCreate(masterUrl: String): Boolean = masterUrl.startsWith("k8s://") + + override def create(conf: SparkConf): ShuffleServiceAddressProvider = { + if (conf.get(C.K8S_SHUFFLE_SERVICE_ENABLED)) { + val kubernetesClient = SparkKubernetesClientFactory.getDriverKubernetesClient( + conf, conf.get("spark.master")) + val pollForPodsExecutor = ThreadUtils.newDaemonThreadPoolScheduledExecutor( + "poll-shuffle-service-pods", 1) + logInfo("Beginning to search for K8S pods that act as an External Shuffle Service") + val shuffleServiceLabels = conf.getAllWithPrefix(KUBERNETES_REMOTE_SHUFFLE_SERVICE_LABELS) + val shuffleServicePodsNamespace = conf.get(KUBERNETES_REMOTE_SHUFFLE_SERVICE_PODS_NAMESPACE) + require(shuffleServicePodsNamespace.isDefined, "Namespace for the pods running the external" + + s" shuffle service must be defined by" + + s" ${KUBERNETES_REMOTE_SHUFFLE_SERVICE_PODS_NAMESPACE.key}") + require(shuffleServiceLabels.nonEmpty, "Requires labels for external shuffle service pods") + + val port: Int = conf.get(KUBERNETES_REMOTE_SHUFFLE_SERVICE_PORT) + new KubernetesShuffleServiceAddressProvider( + kubernetesClient, + pollForPodsExecutor, + shuffleServiceLabels.toMap, + shuffleServicePodsNamespace.get, + port) + } else DefaultShuffleServiceAddressProvider + } +} diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile index 871d34b11e174..7fddd762fe545 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile @@ -29,7 +29,7 @@ ARG spark_uid=185 RUN set -ex && \ apk upgrade --no-cache && \ ln -s /lib /lib64 && \ - apk add --no-cache bash tini libc6-compat linux-pam krb5 krb5-libs nss && \ + apk add --no-cache bash tini libc6-compat linux-pam krb5 krb5-libs nss procps && \ mkdir -p /opt/spark && \ mkdir -p /opt/spark/examples && \ mkdir -p /opt/spark/work-dir && \ From 3346bf427647b6869f55f9ada3f0e3a2476f9019 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Tue, 23 Apr 2019 11:11:10 -0400 Subject: [PATCH 2/2] integration shuffle location discovery with executors --- .../io/external/ExternalShuffleDataIO.java | 36 +++++++++++++ .../ExternalShuffleExecutorComponents.java | 54 +++++++++++++++++++ .../scala/org/apache/spark/SparkContext.scala | 3 ++ .../scala/org/apache/spark/SparkEnv.scala | 19 ++++++- .../org/apache/spark/executor/Executor.scala | 1 + .../BlockStoreShuffleReaderBenchmark.scala | 1 + .../sort/SortShuffleWriterBenchmark.scala | 1 + 7 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/apache/spark/shuffle/sort/io/external/ExternalShuffleDataIO.java create mode 100644 core/src/main/java/org/apache/spark/shuffle/sort/io/external/ExternalShuffleExecutorComponents.java diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/external/ExternalShuffleDataIO.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/external/ExternalShuffleDataIO.java new file mode 100644 index 0000000000000..a9949862133ee --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/external/ExternalShuffleDataIO.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort.io.external; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.shuffle.ShuffleExecutorComponents; +import org.apache.spark.api.shuffle.ShuffleDataIO; + +public class ExternalShuffleDataIO implements ShuffleDataIO { + + private final SparkConf sparkConf; + + public ExternalShuffleDataIO(SparkConf sparkConf) { + this.sparkConf = sparkConf; + } + + @Override + public ShuffleExecutorComponents executor() { + return new ExternalShuffleExecutorComponents(sparkConf); + } +} diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/io/external/ExternalShuffleExecutorComponents.java b/core/src/main/java/org/apache/spark/shuffle/sort/io/external/ExternalShuffleExecutorComponents.java new file mode 100644 index 0000000000000..ccdadd6baaf28 --- /dev/null +++ b/core/src/main/java/org/apache/spark/shuffle/sort/io/external/ExternalShuffleExecutorComponents.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort.io.external; + +import org.apache.spark.SparkConf; +import org.apache.spark.SparkEnv; +import org.apache.spark.api.shuffle.ShuffleExecutorComponents; +import org.apache.spark.api.shuffle.ShuffleWriteSupport; +import org.apache.spark.shuffle.IndexShuffleBlockResolver; +import org.apache.spark.shuffle.ShuffleServiceAddressProvider; +import org.apache.spark.storage.BlockManager; + +public class ExternalShuffleExecutorComponents implements ShuffleExecutorComponents { + + private final SparkConf sparkConf; + private BlockManager blockManager; + private IndexShuffleBlockResolver blockResolver; + private ShuffleServiceAddressProvider shuffleServiceAddressProvider; + + public ExternalShuffleExecutorComponents(SparkConf sparkConf) { + this.sparkConf = sparkConf; +} + + @Override + public void initializeExecutor(String appId, String execId) { + blockManager = SparkEnv.get().blockManager(); + blockResolver = new IndexShuffleBlockResolver(sparkConf, blockManager); + shuffleServiceAddressProvider = SparkEnv.get().shuffleServiceAddressProvider(); + } + + @Override + public ShuffleWriteSupport writes() { + if (blockResolver == null) { + throw new IllegalStateException( + "Executor components must be initialized before getting writers."); + } + return null; + } +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4abb18d4aaa73..2923f41ebac3a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -536,6 +536,9 @@ class SparkContext(config: SparkConf) extends Logging { None } + // Start the ShuffleServiceAddressProvider + _env.shuffleServiceAddressProvider.start() + // Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a5064cc25113f..b22d7bc07a05f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -41,7 +41,7 @@ import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator} import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint import org.apache.spark.security.CryptoStreamUtils import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager} -import org.apache.spark.shuffle.ShuffleManager +import org.apache.spark.shuffle.{DefaultShuffleServiceAddressProvider, ShuffleManager, ShuffleServiceAddressProvider, ShuffleServiceAddressProviderFactory} import org.apache.spark.storage._ import org.apache.spark.util.{RpcUtils, Utils} @@ -67,6 +67,7 @@ class SparkEnv ( val metricsSystem: MetricsSystem, val memoryManager: MemoryManager, val outputCommitCoordinator: OutputCommitCoordinator, + val shuffleServiceAddressProvider: ShuffleServiceAddressProvider, val conf: SparkConf) extends Logging { private[spark] var isStopped = false @@ -90,6 +91,7 @@ class SparkEnv ( blockManager.master.stop() metricsSystem.stop() outputCommitCoordinator.stop() + shuffleServiceAddressProvider.stop() rpcEnv.shutdown() rpcEnv.awaitTermination() @@ -365,6 +367,20 @@ object SparkEnv extends Logging { new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef) + // ShuffleServiceAddressProvider initialization + val master = conf.get("spark.master") + val shuffleProvider = conf.get(SHUFFLE_SERVICE_PROVIDER_CLASS) + .map(clazz => Utils.loadExtensions(classOf[ShuffleServiceAddressProviderFactory], + Seq(clazz), conf)).getOrElse(Seq()) + val serviceLoaders = shuffleProvider.filter(_.canCreate(master)) + if (serviceLoaders.size > 1) { + throw new SparkException( + s"Multiple external cluster managers registered for the url $master: $serviceLoaders") + } + val shuffleServiceAddressProvider = serviceLoaders.headOption + .map(_.create(conf)) + .getOrElse(DefaultShuffleServiceAddressProvider) + val envInstance = new SparkEnv( executorId, rpcEnv, @@ -379,6 +395,7 @@ object SparkEnv extends Logging { metricsSystem, memoryManager, outputCommitCoordinator, + shuffleServiceAddressProvider, conf) // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d5973158cf739..6b3a35a5a8a17 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -120,6 +120,7 @@ private[spark] class Executor( env.metricsSystem.registerSource(executorSource) env.metricsSystem.registerSource(new JVMCPUSource()) env.metricsSystem.registerSource(env.blockManager.shuffleMetricsSource) + env.shuffleServiceAddressProvider.start() } // Whether to load classes in user jars before those in Spark jars diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/BlockStoreShuffleReaderBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/BlockStoreShuffleReaderBenchmark.scala index b39e37c1e3842..5fbe6dd7fe751 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/BlockStoreShuffleReaderBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/BlockStoreShuffleReaderBenchmark.scala @@ -177,6 +177,7 @@ object BlockStoreShuffleReaderBenchmark extends BenchmarkBase { null, null, null, + null, defaultConf )) diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala index b0ff15cb1f790..f458212e8f860 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterBenchmark.scala @@ -65,6 +65,7 @@ object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase { null, null, null, + null, defaultConf ))