From 3d152c9672292a51f56fe2e593f3ccccd6fb1e06 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 29 Dec 2017 16:22:19 -0500 Subject: [PATCH 01/11] initial logic needed for porting PR 521 --- README.md | 4 + integration-test/pom.xml | 32 +-- .../k8s/integrationtest/KubernetesSuite.scala | 15 +- .../deploy/k8s/integrationtest/Utils.scala | 54 +++++ .../backend/GCE/GCETestBackend.scala | 6 +- .../backend/IntegrationTestBackend.scala | 5 +- .../backend/minikube/Minikube.scala | 49 +--- .../minikube/MinikubeTestBackend.scala | 37 +-- .../deploy/k8s/integrationtest/config.scala | 24 ++ .../docker/KubernetesSuiteDockerManager.scala | 212 ++++++++++++++++++ .../docker/SparkDockerImageBuilder.scala | 77 ------- 11 files changed, 343 insertions(+), 172 deletions(-) create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala create mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala delete mode 100644 integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala diff --git a/README.md b/README.md index b376a04..ed6d378 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,10 @@ is subject to change. Note that currently the integration tests only run with Java 8. +Integration tests firstly require installing [Minikube](https://kubernetes.io/docs/getting-started-guides/minikube/) on +your machine, and for the `Minikube` binary to be on your `PATH`.. Refer to the Minikube documentation for instructions +on how to install it. It is recommended to allocate at least 8 CPUs and 8GB of memory to the Minikube cluster. + Running the integration tests requires a Spark distribution package tarball that contains Spark jars, submission clients, etc. You can download a tarball from http://spark.apache.org/downloads.html. Or, you can create a distribution from diff --git a/integration-test/pom.xml b/integration-test/pom.xml index bf48318..f65398d 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -39,6 +39,7 @@ 1.0 1.7.24 kubernetes-integration-tests + YOUR-SPARK-DISTRO-TARBALL-HERE YOUR-DOCKERFILES-DIR-HERE @@ -141,37 +142,6 @@ - - com.googlecode.maven-download-plugin - download-maven-plugin - ${download-maven-plugin.version} - - - download-minikube-linux - pre-integration-test - - wget - - - https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-linux-amd64 - ${project.build.directory}/minikube-bin/linux-amd64 - minikube - - - - download-minikube-darwin - pre-integration-test - - wget - - - https://storage.googleapis.com/minikube/releases/v0.22.0/minikube-darwin-amd64 - ${project.build.directory}/minikube-bin/darwin-amd64 - minikube - - - - diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 97fcec7..bbbe34c 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -27,8 +27,10 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackendFactory -import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND -import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH +import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend +import org.apache.spark.deploy.k8s.integrationtest.constants._ +import org.apache.spark.deploy.k8s.integrationtest.config._ + private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAfter { @@ -50,6 +52,9 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit before { sparkAppConf = kubernetesTestComponents.newSparkAppConf() .set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL) + .set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init")) + .set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver")) + .set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor")) kubernetesTestComponents.createNamespace() } @@ -58,10 +63,12 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } test("Run SparkPi with no resources") { + doMinikubeCheck runSparkPiAndVerifyCompletion() } test("Run SparkPi with a very long application name.") { + doMinikubeCheck sparkAppConf.set("spark.app.name", "long" * 40) runSparkPiAndVerifyCompletion() } @@ -100,6 +107,10 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } } } + private def doMinikubeCheck(): Unit = { + assume(testBackend == MinikubeTestBackend) + } + private def tagImage(image: String): String = s"$image:${testBackend.dockerImageTag()}" } private[spark] object KubernetesSuite { diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index 911b3a9..062e94f 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -19,6 +19,8 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.Closeable import java.net.URI +import java.io.{IOException,InputStream,OutputStream} + object Utils extends Logging { def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { @@ -26,6 +28,32 @@ object Utils extends Logging { try f.apply(resource) finally resource.close() } + def tryWithSafeFinally[T](block: => T)(finallyBlock: => Unit): T = { + var originalThrowable: Throwable = null + try { + block + } catch { + case t: Throwable => + // Purposefully not using NonFatal, because even fatal exceptions + // we don't want to have our finallyBlock suppress + originalThrowable = t + throw originalThrowable + } finally { + try { + finallyBlock + } catch { + case t: Throwable => + if (originalThrowable != null) { + originalThrowable.addSuppressed(t) + logWarning(s"Suppressing exception in finally: " + t.getMessage, t) + throw originalThrowable + } else { + throw t + } + } + } + } + def checkAndGetK8sMasterUrl(rawMasterURL: String): String = { require(rawMasterURL.startsWith("k8s://"), "Kubernetes master URL must start with k8s://.") @@ -57,4 +85,30 @@ object Utils extends Logging { s"k8s://$resolvedURL" } + + class RedirectThread( + in: InputStream, + out: OutputStream, + name: String, + propagateEof: Boolean = false) extends Thread(name) { + setDaemon(true) + override def run() { + scala.util.control.Exception.ignoring(classOf[IOException]) { + // FIXME: We copy the stream on the level of bytes to avoid encoding problems. + Utils.tryWithSafeFinally { + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + out.write(buf, 0, len) + out.flush() + len = in.read(buf) + } + } { + if (propagateEof) { + out.close() + } + } + } + } + } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala index cbb98fa..345ccc8 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/GCE/GCETestBackend.scala @@ -20,7 +20,7 @@ import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} import org.apache.spark.deploy.k8s.integrationtest.Utils import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend -import org.apache.spark.deploy.k8s.integrationtest.constants.GCE_TEST_BACKEND +import org.apache.spark.deploy.k8s.integrationtest.config._ private[spark] class GCETestBackend(val master: String) extends IntegrationTestBackend { private var defaultClient: DefaultKubernetesClient = _ @@ -37,5 +37,7 @@ private[spark] class GCETestBackend(val master: String) extends IntegrationTestB defaultClient } - override def name(): String = GCE_TEST_BACKEND + override def dockerImageTag(): String = { + return System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY, "latest") + } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala index 51f8b96..d3a834b 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -21,11 +21,12 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend +import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager private[spark] trait IntegrationTestBackend { - def name(): String def initialize(): Unit def getKubernetesClient(): DefaultKubernetesClient + def dockerImageTag(): String def cleanUp(): Unit = {} } @@ -33,6 +34,6 @@ private[spark] object IntegrationTestBackendFactory { def getTestBackend(): IntegrationTestBackend = { Option(System.getProperty("spark.kubernetes.test.master")) .map(new GCETestBackend(_)) - .getOrElse(new MinikubeTestBackend()) + .getOrElse(MinikubeTestBackend) } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index c04bd75..6fb7f4e 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -20,39 +20,13 @@ import java.nio.file.Paths import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} -import org.apache.commons.lang3.SystemUtils import org.apache.spark.deploy.k8s.integrationtest.{Logging, ProcessUtils} // TODO support windows private[spark] object Minikube extends Logging { - private val MINIKUBE_EXECUTABLE_DEST = if (SystemUtils.IS_OS_MAC_OSX) { - Paths.get("target", "minikube-bin", "darwin-amd64", "minikube").toFile - } else if (SystemUtils.IS_OS_WINDOWS) { - throw new IllegalStateException("Executing Minikube based integration tests not yet " + - " available on Windows.") - } else { - Paths.get("target", "minikube-bin", "linux-amd64", "minikube").toFile - } - - private val EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE = "Minikube is not downloaded, expected at " + - s"${MINIKUBE_EXECUTABLE_DEST.getAbsolutePath}" - private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 - // NOTE: This and the following methods are synchronized to prevent deleteMinikube from - // destroying the minikube VM while other methods try to use the VM. - // Such a race condition can corrupt the VM or some VM provisioning tools like VirtualBox. - def startMinikube(): Unit = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) - if (getMinikubeStatus != MinikubeStatus.RUNNING) { - executeMinikube("start", "--memory", "6000", "--cpus", "8") - } else { - logInfo("Minikube is already started.") - } - } - def getMinikubeIp: String = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) val outputs = executeMinikube("ip") .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$")) assert(outputs.size == 1, "Unexpected amount of output from minikube ip") @@ -60,17 +34,16 @@ private[spark] object Minikube extends Logging { } def getMinikubeStatus: MinikubeStatus.Value = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) val statusString = executeMinikube("status") - .filter(_.contains("minikube: ")) + .filter(line => line.contains("minikubeVM: ") || line.contains("minikube:")) .head + .replaceFirst("minikubeVM: ", "") .replaceFirst("minikube: ", "") MinikubeStatus.unapply(statusString) .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) } def getDockerEnv: Map[String, String] = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists(), EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) executeMinikube("docker-env", "--shell", "bash") .filter(_.startsWith("export")) .map(_.replaceFirst("export ", "").split('=')) @@ -78,15 +51,6 @@ private[spark] object Minikube extends Logging { .toMap } - def deleteMinikube(): Unit = synchronized { - assert(MINIKUBE_EXECUTABLE_DEST.exists, EXPECTED_DOWNLOADED_MINIKUBE_MESSAGE) - if (getMinikubeStatus != MinikubeStatus.NONE) { - executeMinikube("delete") - } else { - logInfo("Minikube was already not running.") - } - } - def getKubernetesClient: DefaultKubernetesClient = synchronized { val kubernetesMaster = s"https://${getMinikubeIp}:8443" val userHome = System.getProperty("user.home") @@ -105,13 +69,8 @@ private[spark] object Minikube extends Logging { } private def executeMinikube(action: String, args: String*): Seq[String] = { - if (!MINIKUBE_EXECUTABLE_DEST.canExecute) { - if (!MINIKUBE_EXECUTABLE_DEST.setExecutable(true)) { - throw new IllegalStateException("Failed to make the Minikube binary executable.") - } - } - ProcessUtils.executeProcess(Array(MINIKUBE_EXECUTABLE_DEST.getAbsolutePath, action) ++ args, - MINIKUBE_STARTUP_TIMEOUT_SECONDS) + ProcessUtils.executeProcess( + Array("minikube", action) ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS) } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala index 7a1433e..2c21e87 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -16,32 +16,43 @@ */ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube +import java.util.UUID + import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend -import org.apache.spark.deploy.k8s.integrationtest.constants.MINIKUBE_TEST_BACKEND -import org.apache.spark.deploy.k8s.integrationtest.docker.SparkDockerImageBuilder +import org.apache.spark.deploy.k8s.integrationtest.config._ +import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager -private[spark] class MinikubeTestBackend extends IntegrationTestBackend { +private[spark] object MinikubeTestBackend extends IntegrationTestBackend { private var defaultClient: DefaultKubernetesClient = _ - + private val userProvidedDockerImageTag = Option( + System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY)) + private val resolvedDockerImageTag = + userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", "")) + private val dockerManager = new KubernetesSuiteDockerManager( + Minikube.getDockerEnv, resolvedDockerImageTag) override def initialize(): Unit = { - Minikube.startMinikube() - if (!System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean) { - new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages() + val minikubeStatus = Minikube.getMinikubeStatus + require(minikubeStatus == MinikubeStatus.RUNNING, + s"Minikube must be running before integration tests can execute. Current status" + + s" is: $minikubeStatus") + if (userProvidedDockerImageTag.isEmpty) { + dockerManager.buildSparkDockerImages() } defaultClient = Minikube.getKubernetesClient } - override def getKubernetesClient(): DefaultKubernetesClient = { - defaultClient - } override def cleanUp(): Unit = { - if (!System.getProperty("spark.docker.test.persistMinikube", "false").toBoolean) { - Minikube.deleteMinikube() + super.cleanUp() + if (userProvidedDockerImageTag.isEmpty) { + dockerManager.deleteImages() } } - override def name(): String = MINIKUBE_TEST_BACKEND + override def getKubernetesClient(): DefaultKubernetesClient = { + defaultClient + } + override def dockerImageTag(): String = resolvedDockerImageTag } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala new file mode 100644 index 0000000..a0c973c --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +package object config { + val KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY = "spark.kubernetes.test.imageDockerTag" + val DRIVER_DOCKER_IMAGE = "spark.kubernetes.driver.docker.image" + val EXECUTOR_DOCKER_IMAGE = "spark.kubernetes.executor.docker.image" + val INIT_CONTAINER_DOCKER_IMAGE = "spark.kubernetes.initcontainer.docker.image" +} diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala new file mode 100644 index 0000000..66c85b2 --- /dev/null +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.docker + +import java.io.{File, PrintWriter} +import java.net.URI +import java.nio.file.Paths + +import com.google.common.base.Charsets +import com.google.common.io.Files +import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler} +import com.spotify.docker.client.DockerClient.{ListContainersParam, ListImagesParam, RemoveContainerParam} +import com.spotify.docker.client.messages.Container +import org.apache.http.client.utils.URIBuilder +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite +import org.apache.spark.deploy.k8s.integrationtest.Logging +import org.apache.spark.deploy.k8s.integrationtest.Utils.{RedirectThread, tryWithResource} + +private[spark] class KubernetesSuiteDockerManager( + dockerEnv: Map[String, String], dockerTag: String) extends Logging { + + private val DOCKER_BUILD_PATH = Paths.get("target", "docker") + // Dockerfile paths must be relative to the build path. + private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" + private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" + private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile" + private val DRIVERR_DOCKER_FILE = "dockerfiles/driver-r/Dockerfile" + private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" + private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile" + private val EXECUTORR_DOCKER_FILE = "dockerfiles/executor-r/Dockerfile" + private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile" + private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile" + private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" + private val STATIC_ASSET_SERVER_DOCKER_FILE = + "dockerfiles/integration-test-asset-server/Dockerfile" + private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", + throw new IllegalStateException("DOCKER_HOST env not found.")) + + private val originalDockerUri = URI.create(dockerHost) + private val httpsDockerUri = new URIBuilder() + .setHost(originalDockerUri.getHost) + .setPort(originalDockerUri.getPort) + .setScheme("https") + .build() + + private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", + throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) + + private val dockerClient = new DefaultDockerClient.Builder() + .uri(httpsDockerUri) + .dockerCertificates(DockerCertificates + .builder() + .dockerCertPath(Paths.get(dockerCerts)) + .build().get()) + .build() + + def buildSparkDockerImages(): Unit = { + Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } + // Building Python distribution environment + val pythonExec = sys.env.get("PYSPARK_DRIVER_PYTHON") + .orElse(sys.env.get("PYSPARK_PYTHON")) + .getOrElse("/usr/bin/python") + val builder = new ProcessBuilder( + Seq(pythonExec, "setup.py", "sdist").asJava) + builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python")) + builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize + val process = builder.start() + new RedirectThread(process.getInputStream, System.out, "redirect output").start() + val exitCode = process.waitFor() + if (exitCode != 0) { + logInfo(s"exitCode: $exitCode") + } + buildImage("spark-base", BASE_DOCKER_FILE) + buildImage("spark-driver", DRIVER_DOCKER_FILE) + buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) + buildImage("spark-driver-r", DRIVERR_DOCKER_FILE) + buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE) + buildImage("spark-executor-r", EXECUTORR_DOCKER_FILE) + buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE) + buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) + buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) + buildImage("spark-integration-test-asset-server", STATIC_ASSET_SERVER_DOCKER_FILE) + } + + def deleteImages(): Unit = { + removeRunningContainers() + deleteImage("spark-driver") + deleteImage("spark-driver-py") + deleteImage("spark-driver-r") + deleteImage("spark-executor") + deleteImage("spark-executor-py") + deleteImage("spark-executor-r") + deleteImage("spark-shuffle") + deleteImage("spark-resource-staging-server") + deleteImage("spark-init") + deleteImage("spark-integration-test-asset-server") + deleteImage("spark-base") + } + + private def buildImage(name: String, dockerFile: String): Unit = { + logInfo(s"Building Docker image - $name:$dockerTag") + val dockerFileWithBaseTag = new File(DOCKER_BUILD_PATH.resolve( + s"$dockerFile-$dockerTag").toAbsolutePath.toString) + dockerFileWithBaseTag.deleteOnExit() + try { + val originalDockerFileText = Files.readLines( + DOCKER_BUILD_PATH.resolve(dockerFile).toFile, Charsets.UTF_8).asScala + val dockerFileTextWithProperBaseImage = originalDockerFileText.map( + _.replace("FROM spark-base", s"FROM spark-base:$dockerTag")) + tryWithResource(Files.newWriter(dockerFileWithBaseTag, Charsets.UTF_8)) { fileWriter => + tryWithResource(new PrintWriter(fileWriter)) { printWriter => + for (line <- dockerFileTextWithProperBaseImage) { + // scalastyle:off println + printWriter.println(line) + // scalastyle:on println + } + } + } + dockerClient.build( + DOCKER_BUILD_PATH, + s"$name:$dockerTag", + s"$dockerFile-$dockerTag", + new LoggingBuildHandler()) + } finally { + dockerFileWithBaseTag.delete() + } + } + + /** + * Forces all containers running an image with the configured tag to halt and be removed. + */ + private def removeRunningContainers(): Unit = { + val imageIds = dockerClient.listImages(ListImagesParam.allImages()) + .asScala + .filter(image => image.repoTags().asScala.exists(_.endsWith(s":$dockerTag"))) + .map(_.id()) + .toSet + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val runningContainersWithImageTag = stopRunningContainers(imageIds) + require( + runningContainersWithImageTag.isEmpty, + s"${runningContainersWithImageTag.size} containers found still running" + + s" with the image tag $dockerTag") + } + dockerClient.listContainers(ListContainersParam.allContainers()) + .asScala + .filter(container => imageIds.contains(container.imageId())) + .foreach(container => dockerClient.removeContainer( + container.id(), RemoveContainerParam.forceKill(true))) + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val containersWithImageTag = dockerClient.listContainers(ListContainersParam.allContainers()) + .asScala + .filter(container => imageIds.contains(container.imageId())) + require(containersWithImageTag.isEmpty, s"${containersWithImageTag.size} containers still" + + s" found with image tag $dockerTag.") + } + + } + + private def stopRunningContainers(imageIds: Set[String]): Iterable[Container] = { + val runningContainersWithImageTag = getRunningContainersWithImageIds(imageIds) + if (runningContainersWithImageTag.nonEmpty) { + logInfo(s"Found ${runningContainersWithImageTag.size} containers running with" + + s" an image with the tag $dockerTag. Attempting to remove these containers," + + s" and then will stall for 2 seconds.") + runningContainersWithImageTag.foreach { container => + dockerClient.stopContainer(container.id(), 5) + } + } + runningContainersWithImageTag + } + + private def getRunningContainersWithImageIds(imageIds: Set[String]): Iterable[Container] = { + dockerClient + .listContainers( + ListContainersParam.allContainers(), + ListContainersParam.withStatusRunning()) + .asScala + .filter(container => imageIds.contains(container.imageId())) + } + + private def deleteImage(name: String): Unit = { + try { + dockerClient.removeImage(s"$name:$dockerTag") + } catch { + case e: RuntimeException => + logWarning(s"Failed to delete image $name:$dockerTag. There may be images leaking in the" + + s" docker environment which are now stale and unused.", e) + } + } +} \ No newline at end of file diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala deleted file mode 100644 index b3a359f..0000000 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.integrationtest.docker - -import java.net.URI -import java.nio.file.Paths - -import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler} -import org.apache.http.client.utils.URIBuilder -import org.scalatest.concurrent.{Eventually, PatienceConfiguration} -import org.scalatest.time.{Minutes, Seconds, Span} - -import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH -import org.apache.spark.deploy.k8s.integrationtest.Logging - -private[spark] class SparkDockerImageBuilder - (private val dockerEnv: Map[String, String]) extends Logging{ - - private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH - // Dockerfile paths must be relative to the build path. - private val DOCKERFILES_DIR = "kubernetes/dockerfiles/" - private val BASE_DOCKER_FILE = DOCKERFILES_DIR + "spark-base/Dockerfile" - private val DRIVER_DOCKER_FILE = DOCKERFILES_DIR + "driver/Dockerfile" - private val EXECUTOR_DOCKER_FILE = DOCKERFILES_DIR + "executor/Dockerfile" - private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) - private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) - private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", - throw new IllegalStateException("DOCKER_HOST env not found.")) - - private val originalDockerUri = URI.create(dockerHost) - private val httpsDockerUri = new URIBuilder() - .setHost(originalDockerUri.getHost) - .setPort(originalDockerUri.getPort) - .setScheme("https") - .build() - - private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", - throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) - - private val dockerClient = new DefaultDockerClient.Builder() - .uri(httpsDockerUri) - .dockerCertificates(DockerCertificates.builder() - .dockerCertPath(Paths.get(dockerCerts)) - .build() - .get()) - .build() - - def buildSparkDockerImages(): Unit = { - Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } - buildImage("spark-base", BASE_DOCKER_FILE) - buildImage("spark-driver", DRIVER_DOCKER_FILE) - buildImage("spark-executor", EXECUTOR_DOCKER_FILE) - } - - private def buildImage(name: String, dockerFile: String): Unit = { - dockerClient.build( - DOCKER_BUILD_PATH, - name, - dockerFile, - new LoggingBuildHandler()) - logInfo(s"Built $name docker image") - } -} From 22668e3deb1c800ee83ae700e5733dbb0cd0631f Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 29 Dec 2017 16:27:09 -0500 Subject: [PATCH 02/11] remove spark.version in pom --- integration-test/pom.xml | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-test/pom.xml b/integration-test/pom.xml index f65398d..2b19640 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -39,7 +39,6 @@ 1.0 1.7.24 kubernetes-integration-tests - YOUR-SPARK-DISTRO-TARBALL-HERE YOUR-DOCKERFILES-DIR-HERE From a59339c440afe91cbacd08a81275e4a06d1ffd25 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 29 Dec 2017 16:30:21 -0500 Subject: [PATCH 03/11] minor styling --- .../k8s/integrationtest/backend/IntegrationTestBackend.scala | 1 - .../integrationtest/docker/KubernetesSuiteDockerManager.scala | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala index d3a834b..aabb81b 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -21,7 +21,6 @@ import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.deploy.k8s.integrationtest.backend.GCE.GCETestBackend import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend -import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager private[spark] trait IntegrationTestBackend { def initialize(): Unit diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala index 66c85b2..28d3fa2 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala @@ -209,4 +209,4 @@ private[spark] class KubernetesSuiteDockerManager( s" docker environment which are now stale and unused.", e) } } -} \ No newline at end of file +} From 21fc0d145c660bf273f7354bff582c0c5ee8e29c Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Fri, 29 Dec 2017 17:17:29 -0500 Subject: [PATCH 04/11] handling flags, readmes, and POM changes --- README.md | 23 +++---------------- integration-test/pom.xml | 1 - .../minikube/MinikubeTestBackend.scala | 13 +++++------ .../deploy/k8s/integrationtest/config.scala | 1 - 4 files changed, 9 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index ed6d378..58fd366 100644 --- a/README.md +++ b/README.md @@ -53,34 +53,17 @@ $ mvn clean integration-test \ -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" ``` -# Preserve the Minikube VM - -The integration tests make use of -[Minikube](https://github.com/kubernetes/minikube), which fires up a virtual -machine and setup a single-node kubernetes cluster within it. By default the vm -is destroyed after the tests are finished. If you want to preserve the vm, e.g. -to reduce the running time of tests during development, you can pass the -property `spark.docker.test.persistMinikube` to the test process: - -``` -$ mvn clean integration-test \ - -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ - -DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -``` - # Reuse the previous Docker images The integration tests build a number of Docker images, which takes some time. By default, the images are built every time the tests run. You may want to skip re-building those images during development, if the distribution package did not change since the last run. You can pass the property -`spark.docker.test.skipBuildImages` to the test process. This will work only if -you have been setting the property `spark.docker.test.persistMinikube`, in the -previous run since the docker daemon run inside the minikube environment. Here -is an example: +`spark.docker.test.skipBuildImages` to the test process. +Here is an example: ``` $ mvn clean integration-test \ -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ - "-DextraScalaTestArgs=-Dspark.docker.test.persistMinikube=true -Dspark.docker.test.skipBuildImages=true" + "-Dspark.docker.test.skipBuildImages=true" ``` diff --git a/integration-test/pom.xml b/integration-test/pom.xml index 2b19640..9375d91 100644 --- a/integration-test/pom.xml +++ b/integration-test/pom.xml @@ -40,7 +40,6 @@ 1.7.24 kubernetes-integration-tests YOUR-SPARK-DISTRO-TARBALL-HERE - YOUR-DOCKERFILES-DIR-HERE jar diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala index 2c21e87..82d94b1 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -21,15 +21,14 @@ import java.util.UUID import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend -import org.apache.spark.deploy.k8s.integrationtest.config._ import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager private[spark] object MinikubeTestBackend extends IntegrationTestBackend { private var defaultClient: DefaultKubernetesClient = _ - private val userProvidedDockerImageTag = Option( - System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY)) + private val userSkipBuildImages = + System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean private val resolvedDockerImageTag = - userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", "")) + UUID.randomUUID().toString.replaceAll("-", "") private val dockerManager = new KubernetesSuiteDockerManager( Minikube.getDockerEnv, resolvedDockerImageTag) override def initialize(): Unit = { @@ -37,16 +36,15 @@ private[spark] object MinikubeTestBackend extends IntegrationTestBackend { require(minikubeStatus == MinikubeStatus.RUNNING, s"Minikube must be running before integration tests can execute. Current status" + s" is: $minikubeStatus") - if (userProvidedDockerImageTag.isEmpty) { + if (!userSkipBuildImages) { dockerManager.buildSparkDockerImages() } defaultClient = Minikube.getKubernetesClient } - override def cleanUp(): Unit = { super.cleanUp() - if (userProvidedDockerImageTag.isEmpty) { + if (!userSkipBuildImages) { dockerManager.deleteImages() } } @@ -54,5 +52,6 @@ private[spark] object MinikubeTestBackend extends IntegrationTestBackend { override def getKubernetesClient(): DefaultKubernetesClient = { defaultClient } + override def dockerImageTag(): String = resolvedDockerImageTag } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala index a0c973c..2e0b38d 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.k8s.integrationtest package object config { - val KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY = "spark.kubernetes.test.imageDockerTag" val DRIVER_DOCKER_IMAGE = "spark.kubernetes.driver.docker.image" val EXECUTOR_DOCKER_IMAGE = "spark.kubernetes.executor.docker.image" val INIT_CONTAINER_DOCKER_IMAGE = "spark.kubernetes.initcontainer.docker.image" From c777d2cb48836c0ebadda05b1b891efe81b03404 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Sat, 30 Dec 2017 17:48:21 -0500 Subject: [PATCH 05/11] resolve issues with minikube and some comment resolution --- .../minikube/MinikubeTestBackend.scala | 12 +++-- .../deploy/k8s/integrationtest/config.scala | 1 + .../docker/KubernetesSuiteDockerManager.scala | 50 +++---------------- 3 files changed, 16 insertions(+), 47 deletions(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala index 82d94b1..91ab923 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -21,22 +21,24 @@ import java.util.UUID import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend +import org.apache.spark.deploy.k8s.integrationtest.config._ import org.apache.spark.deploy.k8s.integrationtest.docker.KubernetesSuiteDockerManager private[spark] object MinikubeTestBackend extends IntegrationTestBackend { private var defaultClient: DefaultKubernetesClient = _ - private val userSkipBuildImages = - System.getProperty("spark.docker.test.skipBuildImages", "false").toBoolean + private val userProvidedDockerImageTag = Option( + System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY)) private val resolvedDockerImageTag = - UUID.randomUUID().toString.replaceAll("-", "") + userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", "")) private val dockerManager = new KubernetesSuiteDockerManager( Minikube.getDockerEnv, resolvedDockerImageTag) + override def initialize(): Unit = { val minikubeStatus = Minikube.getMinikubeStatus require(minikubeStatus == MinikubeStatus.RUNNING, s"Minikube must be running before integration tests can execute. Current status" + s" is: $minikubeStatus") - if (!userSkipBuildImages) { + if (userProvidedDockerImageTag.isEmpty) { dockerManager.buildSparkDockerImages() } defaultClient = Minikube.getKubernetesClient @@ -44,7 +46,7 @@ private[spark] object MinikubeTestBackend extends IntegrationTestBackend { override def cleanUp(): Unit = { super.cleanUp() - if (!userSkipBuildImages) { + if (userProvidedDockerImageTag.isEmpty) { dockerManager.deleteImages() } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala index 2e0b38d..a0c973c 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.k8s.integrationtest package object config { + val KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY = "spark.kubernetes.test.imageDockerTag" val DRIVER_DOCKER_IMAGE = "spark.kubernetes.driver.docker.image" val EXECUTOR_DOCKER_IMAGE = "spark.kubernetes.executor.docker.image" val INIT_CONTAINER_DOCKER_IMAGE = "spark.kubernetes.initcontainer.docker.image" diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala index 28d3fa2..8b72bc5 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala @@ -30,6 +30,7 @@ import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} import scala.collection.JavaConverters._ +import org.apache.spark.deploy.k8s.integrationtest.constants._ import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite import org.apache.spark.deploy.k8s.integrationtest.Logging import org.apache.spark.deploy.k8s.integrationtest.Utils.{RedirectThread, tryWithResource} @@ -37,20 +38,13 @@ import org.apache.spark.deploy.k8s.integrationtest.Utils.{RedirectThread, tryWit private[spark] class KubernetesSuiteDockerManager( dockerEnv: Map[String, String], dockerTag: String) extends Logging { - private val DOCKER_BUILD_PATH = Paths.get("target", "docker") + private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH // Dockerfile paths must be relative to the build path. - private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" - private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" - private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile" - private val DRIVERR_DOCKER_FILE = "dockerfiles/driver-r/Dockerfile" - private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" - private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile" - private val EXECUTORR_DOCKER_FILE = "dockerfiles/executor-r/Dockerfile" - private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile" - private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile" - private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" - private val STATIC_ASSET_SERVER_DOCKER_FILE = - "dockerfiles/integration-test-asset-server/Dockerfile" + private val DOCKERFILES_DIR = "kubernetes/dockerfiles/" + private val BASE_DOCKER_FILE = DOCKERFILES_DIR + "spark-base/Dockerfile" + private val DRIVER_DOCKER_FILE = DOCKERFILES_DIR + "driver/Dockerfile" + private val EXECUTOR_DOCKER_FILE = DOCKERFILES_DIR + "executor/Dockerfile" + private val INIT_CONTAINER_DOCKER_FILE = DOCKERFILES_DIR + "init-container/Dockerfile" private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", @@ -76,46 +70,18 @@ private[spark] class KubernetesSuiteDockerManager( def buildSparkDockerImages(): Unit = { Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } - // Building Python distribution environment - val pythonExec = sys.env.get("PYSPARK_DRIVER_PYTHON") - .orElse(sys.env.get("PYSPARK_PYTHON")) - .getOrElse("/usr/bin/python") - val builder = new ProcessBuilder( - Seq(pythonExec, "setup.py", "sdist").asJava) - builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python")) - builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize - val process = builder.start() - new RedirectThread(process.getInputStream, System.out, "redirect output").start() - val exitCode = process.waitFor() - if (exitCode != 0) { - logInfo(s"exitCode: $exitCode") - } buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) - buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) - buildImage("spark-driver-r", DRIVERR_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) - buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE) - buildImage("spark-executor-r", EXECUTORR_DOCKER_FILE) - buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE) - buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) - buildImage("spark-integration-test-asset-server", STATIC_ASSET_SERVER_DOCKER_FILE) } def deleteImages(): Unit = { removeRunningContainers() + deleteImage("spark-base") deleteImage("spark-driver") - deleteImage("spark-driver-py") - deleteImage("spark-driver-r") deleteImage("spark-executor") - deleteImage("spark-executor-py") - deleteImage("spark-executor-r") - deleteImage("spark-shuffle") - deleteImage("spark-resource-staging-server") deleteImage("spark-init") - deleteImage("spark-integration-test-asset-server") - deleteImage("spark-base") } private def buildImage(name: String, dockerFile: String): Unit = { From 55e97ea0834de13e81c5129af202d6be26d9aa9b Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Sat, 30 Dec 2017 17:57:30 -0500 Subject: [PATCH 06/11] updated readme --- README.md | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 58fd366..1aee870 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,8 @@ invoked if the `integration-test` phase is run. With Maven, the integration test can be run using the following command: ``` +$ mvn clean pre-integration-test \ + -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz $ mvn clean integration-test \ -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz ``` @@ -50,20 +52,23 @@ In order to run against any cluster, use the following: ```sh $ mvn clean integration-test \ -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ - -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// -Dspark.docker.test.driverImage= -Dspark.docker.test.executorImage=" + -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// \ + -Dspark.docker.test.driverImage= \ + -Dspark.docker.test.executorImage= ``` -# Reuse the previous Docker images +# Specify existing docker images via image:tag The integration tests build a number of Docker images, which takes some time. By default, the images are built every time the tests run. You may want to skip re-building those images during development, if the distribution package did not change since the last run. You can pass the property -`spark.docker.test.skipBuildImages` to the test process. +`spark.kubernetes.test.imageDockerTag` to the test process and specify the Docker +image tag that is appropriate. Here is an example: ``` $ mvn clean integration-test \ -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ - "-Dspark.docker.test.skipBuildImages=true" + "-Dspark.kubernetes.test.imageDockerTag=latest" ``` From 7369772ba0876872ef638b06a3058edf074ec974 Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Sat, 30 Dec 2017 22:00:04 -0500 Subject: [PATCH 07/11] config values --- README.md | 10 +++------- .../deploy/k8s/integrationtest/KubernetesSuite.scala | 2 +- .../spark/deploy/k8s/integrationtest/config.scala | 6 +++--- 3 files changed, 7 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 1aee870..31dd132 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ http://spark.apache.org/downloads.html. Or, you can create a distribution from source code using `make-distribution.sh`. For example: ``` -$ git clone git@github.com:apache/spark.git +$ https://github.com/apache/spark.git $ cd spark $ ./dev/make-distribution.sh --tgz \ -Phadoop-2.7 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver @@ -40,8 +40,6 @@ invoked if the `integration-test` phase is run. With Maven, the integration test can be run using the following command: ``` -$ mvn clean pre-integration-test \ - -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz $ mvn clean integration-test \ -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz ``` @@ -52,9 +50,7 @@ In order to run against any cluster, use the following: ```sh $ mvn clean integration-test \ -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ - -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// \ - -Dspark.docker.test.driverImage= \ - -Dspark.docker.test.executorImage= + -DextraScalaTestArgs="-Dspark.kubernetes.test.master=k8s://https:// ``` # Specify existing docker images via image:tag @@ -70,5 +66,5 @@ Here is an example: ``` $ mvn clean integration-test \ -Dspark-distro-tgz=spark/spark-2.3.0-SNAPSHOT-bin.tgz \ - "-Dspark.kubernetes.test.imageDockerTag=latest" + -Dspark.kubernetes.test.imageDockerTag=latest ``` diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index b845330..ec87d6b 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -54,9 +54,9 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit before { sparkAppConf = kubernetesTestComponents.newSparkAppConf() .set("spark.kubernetes.driver.label.spark-app-locator", APP_LOCATOR_LABEL) - .set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init")) .set(DRIVER_DOCKER_IMAGE, tagImage("spark-driver")) .set(EXECUTOR_DOCKER_IMAGE, tagImage("spark-executor")) + .set(INIT_CONTAINER_DOCKER_IMAGE, tagImage("spark-init")) kubernetesTestComponents.createNamespace() } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala index a0c973c..d82a1de 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest package object config { val KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY = "spark.kubernetes.test.imageDockerTag" - val DRIVER_DOCKER_IMAGE = "spark.kubernetes.driver.docker.image" - val EXECUTOR_DOCKER_IMAGE = "spark.kubernetes.executor.docker.image" - val INIT_CONTAINER_DOCKER_IMAGE = "spark.kubernetes.initcontainer.docker.image" + val DRIVER_DOCKER_IMAGE = "spark.kubernetes.driver.container.image" + val EXECUTOR_DOCKER_IMAGE = "spark.kubernetes.executor.container.image" + val INIT_CONTAINER_DOCKER_IMAGE = "spark.kubernetes.initcontainer.container.image" } From 7470472ed1e2f6d4ad4e9407c7164b8949ee7299 Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 8 Jan 2018 15:34:01 -0800 Subject: [PATCH 08/11] Modify SparkDockerImageBuilder so it can delete docker images --- .../docker/SparkDockerImageBuilder.scala | 138 ++++++++++++++---- 1 file changed, 111 insertions(+), 27 deletions(-) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala index 9cce325..5d94224 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala @@ -16,20 +16,27 @@ */ package org.apache.spark.deploy.k8s.integrationtest.docker +import java.io.{File, PrintWriter} import java.net.URI -import java.net.URLEncoder import java.nio.file.Paths -import com.spotify.docker.client.{DockerClient, DefaultDockerClient, DockerCertificates, LoggingBuildHandler} +import com.google.common.base.Charsets +import com.google.common.io.Files +import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler} +import com.spotify.docker.client.DockerClient.{ListContainersParam, ListImagesParam, RemoveContainerParam} +import com.spotify.docker.client.messages.Container import org.apache.http.client.utils.URIBuilder import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} +import scala.collection.JavaConverters._ -import org.apache.spark.deploy.k8s.integrationtest.constants.SPARK_DISTRO_PATH +import org.apache.spark.deploy.k8s.integrationtest.constants._ +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite import org.apache.spark.deploy.k8s.integrationtest.Logging +import org.apache.spark.deploy.k8s.integrationtest.Utils.{RedirectThread, tryWithResource} -private[spark] class SparkDockerImageBuilder - (private val dockerEnv: Map[String, String]) extends Logging { +private[spark] class KubernetesSuiteDockerManager( + dockerEnv: Map[String, String], dockerTag: String) extends Logging { private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH // Dockerfile paths must be relative to the build path. @@ -41,7 +48,7 @@ private[spark] class SparkDockerImageBuilder private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", - throw new IllegalStateException("DOCKER_HOST env not found.")) + throw new IllegalStateException("DOCKER_HOST env not found.")) private val originalDockerUri = URI.create(dockerHost) private val httpsDockerUri = new URIBuilder() @@ -51,44 +58,121 @@ private[spark] class SparkDockerImageBuilder .build() private val dockerCerts = dockerEnv.getOrElse("DOCKER_CERT_PATH", - throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) + throw new IllegalStateException("DOCKER_CERT_PATH env not found.")) private val dockerClient = new DefaultDockerClient.Builder() .uri(httpsDockerUri) - .dockerCertificates(DockerCertificates.builder() + .dockerCertificates(DockerCertificates + .builder() .dockerCertPath(Paths.get(dockerCerts)) - .build() - .get()) + .build().get()) .build() def buildSparkDockerImages(): Unit = { Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } - buildImage("spark-base", BASE_DOCKER_FILE, - Some("{\"spark_jars\":\"jars\",\"img_path\":\"kubernetes/dockerfiles\"}")) + buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) } - private def buildImage( - name: String, - dockerFile: String, - buildArgs: Option[String] = None): Unit = { - if (buildArgs.nonEmpty) { - dockerClient.build( - DOCKER_BUILD_PATH, - name, - dockerFile, - new LoggingBuildHandler(), - DockerClient.BuildParam.create("buildargs", URLEncoder.encode(buildArgs.get, "UTF-8"))) - } else { + def deleteImages(): Unit = { + removeRunningContainers() + deleteImage("spark-base") + deleteImage("spark-driver") + deleteImage("spark-executor") + deleteImage("spark-init") + } + + private def buildImage(name: String, dockerFile: String): Unit = { + logInfo(s"Building Docker image - $name:$dockerTag") + val dockerFileWithBaseTag = new File(DOCKER_BUILD_PATH.resolve( + s"$dockerFile-$dockerTag").toAbsolutePath.toString) + dockerFileWithBaseTag.deleteOnExit() + try { + val originalDockerFileText = Files.readLines( + DOCKER_BUILD_PATH.resolve(dockerFile).toFile, Charsets.UTF_8).asScala + val dockerFileTextWithProperBaseImage = originalDockerFileText.map( + _.replace("FROM spark-base", s"FROM spark-base:$dockerTag")) + tryWithResource(Files.newWriter(dockerFileWithBaseTag, Charsets.UTF_8)) { fileWriter => + tryWithResource(new PrintWriter(fileWriter)) { printWriter => + for (line <- dockerFileTextWithProperBaseImage) { + // scalastyle:off println + printWriter.println(line) + // scalastyle:on println + } + } + } dockerClient.build( DOCKER_BUILD_PATH, - name, - dockerFile, + s"$name:$dockerTag", + s"$dockerFile-$dockerTag", new LoggingBuildHandler()) + } finally { + dockerFileWithBaseTag.delete() + } + } + + /** + * Forces all containers running an image with the configured tag to halt and be removed. + */ + private def removeRunningContainers(): Unit = { + val imageIds = dockerClient.listImages(ListImagesParam.allImages()) + .asScala + .filter(image => image.repoTags().asScala.exists(_.endsWith(s":$dockerTag"))) + .map(_.id()) + .toSet + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val runningContainersWithImageTag = stopRunningContainers(imageIds) + require( + runningContainersWithImageTag.isEmpty, + s"${runningContainersWithImageTag.size} containers found still running" + + s" with the image tag $dockerTag") + } + dockerClient.listContainers(ListContainersParam.allContainers()) + .asScala + .filter(container => imageIds.contains(container.imageId())) + .foreach(container => dockerClient.removeContainer( + container.id(), RemoveContainerParam.forceKill(true))) + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val containersWithImageTag = dockerClient.listContainers(ListContainersParam.allContainers()) + .asScala + .filter(container => imageIds.contains(container.imageId())) + require(containersWithImageTag.isEmpty, s"${containersWithImageTag.size} containers still" + + s" found with image tag $dockerTag.") } - logInfo(s"Built $name docker image") + } + + private def stopRunningContainers(imageIds: Set[String]): Iterable[Container] = { + val runningContainersWithImageTag = getRunningContainersWithImageIds(imageIds) + if (runningContainersWithImageTag.nonEmpty) { + logInfo(s"Found ${runningContainersWithImageTag.size} containers running with" + + s" an image with the tag $dockerTag. Attempting to remove these containers," + + s" and then will stall for 2 seconds.") + runningContainersWithImageTag.foreach { container => + dockerClient.stopContainer(container.id(), 5) + } + } + runningContainersWithImageTag + } + + private def getRunningContainersWithImageIds(imageIds: Set[String]): Iterable[Container] = { + dockerClient + .listContainers( + ListContainersParam.allContainers(), + ListContainersParam.withStatusRunning()) + .asScala + .filter(container => imageIds.contains(container.imageId())) + } + + private def deleteImage(name: String): Unit = { + try { + dockerClient.removeImage(s"$name:$dockerTag") + } catch { + case e: RuntimeException => + logWarning(s"Failed to delete image $name:$dockerTag. There may be images leaking in the" + + s" docker environment which are now stale and unused.", e) + } } } From 047b2a247a65dd2a97b49f544399d8aa369e4c0b Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 8 Jan 2018 15:34:31 -0800 Subject: [PATCH 09/11] Move the docker manager in git --- ...ockerImageBuilder.scala => KubernetesSuiteDockerManager.scala} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/{SparkDockerImageBuilder.scala => KubernetesSuiteDockerManager.scala} (100%) diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala similarity index 100% rename from integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/SparkDockerImageBuilder.scala rename to integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala From 792476f3367f51f7984363f5ca762e91c64f9ccd Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 8 Jan 2018 15:58:14 -0800 Subject: [PATCH 10/11] Address more comments. --- README.md | 2 +- .../deploy/k8s/integrationtest/Utils.scala | 29 +-------- .../minikube/MinikubeTestBackend.scala | 16 ++--- .../docker/KubernetesSuiteDockerManager.scala | 59 +++++++++++-------- 4 files changed, 42 insertions(+), 64 deletions(-) diff --git a/README.md b/README.md index 737dc81..8d97900 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ http://spark.apache.org/downloads.html. Or, you can create a distribution from source code using `make-distribution.sh`. For example: ``` -$ https://github.com/apache/spark.git +$ git clone git@github.com/apache/spark.git $ cd spark $ ./dev/make-distribution.sh --tgz \ -Phadoop-2.7 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index 062e94f..970186e 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -18,8 +18,9 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.Closeable import java.net.URI +import java.io.{IOException, InputStream, OutputStream} -import java.io.{IOException,InputStream,OutputStream} +import com.google.common.io.ByteStreams object Utils extends Logging { @@ -85,30 +86,4 @@ object Utils extends Logging { s"k8s://$resolvedURL" } - - class RedirectThread( - in: InputStream, - out: OutputStream, - name: String, - propagateEof: Boolean = false) extends Thread(name) { - setDaemon(true) - override def run() { - scala.util.control.Exception.ignoring(classOf[IOException]) { - // FIXME: We copy the stream on the level of bytes to avoid encoding problems. - Utils.tryWithSafeFinally { - val buf = new Array[Byte](1024) - var len = in.read(buf) - while (len != -1) { - out.write(buf, 0, len) - out.flush() - len = in.read(buf) - } - } { - if (propagateEof) { - out.close() - } - } - } - } - } } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala index 91ab923..89db42f 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube -import java.util.UUID - import io.fabric8.kubernetes.client.DefaultKubernetesClient import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend @@ -28,32 +26,26 @@ private[spark] object MinikubeTestBackend extends IntegrationTestBackend { private var defaultClient: DefaultKubernetesClient = _ private val userProvidedDockerImageTag = Option( System.getProperty(KUBERNETES_TEST_DOCKER_TAG_SYSTEM_PROPERTY)) - private val resolvedDockerImageTag = - userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", "")) private val dockerManager = new KubernetesSuiteDockerManager( - Minikube.getDockerEnv, resolvedDockerImageTag) + Minikube.getDockerEnv, userProvidedDockerImageTag) override def initialize(): Unit = { val minikubeStatus = Minikube.getMinikubeStatus require(minikubeStatus == MinikubeStatus.RUNNING, s"Minikube must be running before integration tests can execute. Current status" + s" is: $minikubeStatus") - if (userProvidedDockerImageTag.isEmpty) { - dockerManager.buildSparkDockerImages() - } + dockerManager.buildSparkDockerImages() defaultClient = Minikube.getKubernetesClient } override def cleanUp(): Unit = { super.cleanUp() - if (userProvidedDockerImageTag.isEmpty) { - dockerManager.deleteImages() - } + dockerManager.deleteImages() } override def getKubernetesClient(): DefaultKubernetesClient = { defaultClient } - override def dockerImageTag(): String = resolvedDockerImageTag + override def dockerImageTag(): String = dockerManager.dockerImageTag() } diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala index 8b72bc5..0163d33 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/docker/KubernetesSuiteDockerManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.k8s.integrationtest.docker import java.io.{File, PrintWriter} import java.net.URI import java.nio.file.Paths +import java.util.UUID import com.google.common.base.Charsets import com.google.common.io.Files @@ -33,10 +34,10 @@ import scala.collection.JavaConverters._ import org.apache.spark.deploy.k8s.integrationtest.constants._ import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite import org.apache.spark.deploy.k8s.integrationtest.Logging -import org.apache.spark.deploy.k8s.integrationtest.Utils.{RedirectThread, tryWithResource} +import org.apache.spark.deploy.k8s.integrationtest.Utils.tryWithResource private[spark] class KubernetesSuiteDockerManager( - dockerEnv: Map[String, String], dockerTag: String) extends Logging { + dockerEnv: Map[String, String], userProvidedDockerImageTag: Option[String]) extends Logging { private val DOCKER_BUILD_PATH = SPARK_DISTRO_PATH // Dockerfile paths must be relative to the build path. @@ -47,9 +48,11 @@ private[spark] class KubernetesSuiteDockerManager( private val INIT_CONTAINER_DOCKER_FILE = DOCKERFILES_DIR + "init-container/Dockerfile" private val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) private val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + + private val resolvedDockerImageTag = + userProvidedDockerImageTag.getOrElse(UUID.randomUUID().toString.replaceAll("-", "")) private val dockerHost = dockerEnv.getOrElse("DOCKER_HOST", throw new IllegalStateException("DOCKER_HOST env not found.")) - private val originalDockerUri = URI.create(dockerHost) private val httpsDockerUri = new URIBuilder() .setHost(originalDockerUri.getHost) @@ -69,31 +72,39 @@ private[spark] class KubernetesSuiteDockerManager( .build() def buildSparkDockerImages(): Unit = { - Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } - buildImage("spark-base", BASE_DOCKER_FILE) - buildImage("spark-driver", DRIVER_DOCKER_FILE) - buildImage("spark-executor", EXECUTOR_DOCKER_FILE) - buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) + if (userProvidedDockerImageTag.isEmpty) { + Eventually.eventually(TIMEOUT, INTERVAL) { + dockerClient.ping() + } + buildImage("spark-base", BASE_DOCKER_FILE) + buildImage("spark-driver", DRIVER_DOCKER_FILE) + buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE) + } } def deleteImages(): Unit = { - removeRunningContainers() - deleteImage("spark-base") - deleteImage("spark-driver") - deleteImage("spark-executor") - deleteImage("spark-init") + if (userProvidedDockerImageTag.isEmpty) { + removeRunningContainers() + deleteImage("spark-base") + deleteImage("spark-driver") + deleteImage("spark-executor") + deleteImage("spark-init") + } } + def dockerImageTag(): String = resolvedDockerImageTag + private def buildImage(name: String, dockerFile: String): Unit = { - logInfo(s"Building Docker image - $name:$dockerTag") + logInfo(s"Building Docker image - $name:$resolvedDockerImageTag") val dockerFileWithBaseTag = new File(DOCKER_BUILD_PATH.resolve( - s"$dockerFile-$dockerTag").toAbsolutePath.toString) + s"$dockerFile-$resolvedDockerImageTag").toAbsolutePath.toString) dockerFileWithBaseTag.deleteOnExit() try { val originalDockerFileText = Files.readLines( DOCKER_BUILD_PATH.resolve(dockerFile).toFile, Charsets.UTF_8).asScala val dockerFileTextWithProperBaseImage = originalDockerFileText.map( - _.replace("FROM spark-base", s"FROM spark-base:$dockerTag")) + _.replace("FROM spark-base", s"FROM spark-base:$resolvedDockerImageTag")) tryWithResource(Files.newWriter(dockerFileWithBaseTag, Charsets.UTF_8)) { fileWriter => tryWithResource(new PrintWriter(fileWriter)) { printWriter => for (line <- dockerFileTextWithProperBaseImage) { @@ -105,8 +116,8 @@ private[spark] class KubernetesSuiteDockerManager( } dockerClient.build( DOCKER_BUILD_PATH, - s"$name:$dockerTag", - s"$dockerFile-$dockerTag", + s"$name:$resolvedDockerImageTag", + s"$dockerFile-$resolvedDockerImageTag", new LoggingBuildHandler()) } finally { dockerFileWithBaseTag.delete() @@ -119,7 +130,7 @@ private[spark] class KubernetesSuiteDockerManager( private def removeRunningContainers(): Unit = { val imageIds = dockerClient.listImages(ListImagesParam.allImages()) .asScala - .filter(image => image.repoTags().asScala.exists(_.endsWith(s":$dockerTag"))) + .filter(image => image.repoTags().asScala.exists(_.endsWith(s":$resolvedDockerImageTag"))) .map(_.id()) .toSet Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { @@ -127,7 +138,7 @@ private[spark] class KubernetesSuiteDockerManager( require( runningContainersWithImageTag.isEmpty, s"${runningContainersWithImageTag.size} containers found still running" + - s" with the image tag $dockerTag") + s" with the image tag $resolvedDockerImageTag") } dockerClient.listContainers(ListContainersParam.allContainers()) .asScala @@ -139,7 +150,7 @@ private[spark] class KubernetesSuiteDockerManager( .asScala .filter(container => imageIds.contains(container.imageId())) require(containersWithImageTag.isEmpty, s"${containersWithImageTag.size} containers still" + - s" found with image tag $dockerTag.") + s" found with image tag $resolvedDockerImageTag.") } } @@ -148,7 +159,7 @@ private[spark] class KubernetesSuiteDockerManager( val runningContainersWithImageTag = getRunningContainersWithImageIds(imageIds) if (runningContainersWithImageTag.nonEmpty) { logInfo(s"Found ${runningContainersWithImageTag.size} containers running with" + - s" an image with the tag $dockerTag. Attempting to remove these containers," + + s" an image with the tag $resolvedDockerImageTag. Attempting to remove these containers," + s" and then will stall for 2 seconds.") runningContainersWithImageTag.foreach { container => dockerClient.stopContainer(container.id(), 5) @@ -168,10 +179,10 @@ private[spark] class KubernetesSuiteDockerManager( private def deleteImage(name: String): Unit = { try { - dockerClient.removeImage(s"$name:$dockerTag") + dockerClient.removeImage(s"$name:$resolvedDockerImageTag") } catch { case e: RuntimeException => - logWarning(s"Failed to delete image $name:$dockerTag. There may be images leaking in the" + + logWarning(s"Failed to delete image $name:$resolvedDockerImageTag. There may be images leaking in the" + s" docker environment which are now stale and unused.", e) } } From 103d507b8f3a370411fe882b394c98bb65ba6fee Mon Sep 17 00:00:00 2001 From: mcheah Date: Mon, 8 Jan 2018 16:04:31 -0800 Subject: [PATCH 11/11] Address more comments --- README.md | 2 +- .../deploy/k8s/integrationtest/KubernetesSuite.scala | 6 ------ .../apache/spark/deploy/k8s/integrationtest/Utils.scala | 3 --- .../k8s/integrationtest/backend/minikube/Minikube.scala | 8 ++++---- 4 files changed, 5 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 8d97900..ce8999e 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ http://spark.apache.org/downloads.html. Or, you can create a distribution from source code using `make-distribution.sh`. For example: ``` -$ git clone git@github.com/apache/spark.git +$ git clone git@github.com:apache/spark.git $ cd spark $ ./dev/make-distribution.sh --tgz \ -Phadoop-2.7 -Pkubernetes -Pkinesis-asl -Phive -Phive-thriftserver diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index c169769..3b60af8 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -73,18 +73,15 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } test("Run SparkPi with no resources") { - doMinikubeCheck runSparkPiAndVerifyCompletion() } test("Run SparkPi with a very long application name.") { - doMinikubeCheck sparkAppConf.set("spark.app.name", "long" * 40) runSparkPiAndVerifyCompletion() } test("Run SparkPi with a master URL without a scheme.") { - doMinikubeCheck val url = kubernetesTestComponents.kubernetesClient.getMasterUrl val k8sMasterUrl = if (url.getPort < 0) { s"k8s://${url.getHost}" @@ -226,9 +223,6 @@ private[spark] class KubernetesSuite extends FunSuite with BeforeAndAfterAll wit } } } - private def doMinikubeCheck(): Unit = { - assume(testBackend == MinikubeTestBackend) - } private def tagImage(image: String): String = s"$image:${testBackend.dockerImageTag()}" private def doBasicDriverPodCheck(driverPod: Pod): Unit = { diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala index 970186e..c300ca4 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -18,9 +18,6 @@ package org.apache.spark.deploy.k8s.integrationtest import java.io.Closeable import java.net.URI -import java.io.{IOException, InputStream, OutputStream} - -import com.google.common.io.ByteStreams object Utils extends Logging { diff --git a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index 6fb7f4e..8204852 100644 --- a/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/integration-test/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -26,14 +26,14 @@ import org.apache.spark.deploy.k8s.integrationtest.{Logging, ProcessUtils} private[spark] object Minikube extends Logging { private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 - def getMinikubeIp: String = synchronized { + def getMinikubeIp: String = { val outputs = executeMinikube("ip") .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$")) assert(outputs.size == 1, "Unexpected amount of output from minikube ip") outputs.head } - def getMinikubeStatus: MinikubeStatus.Value = synchronized { + def getMinikubeStatus: MinikubeStatus.Value = { val statusString = executeMinikube("status") .filter(line => line.contains("minikubeVM: ") || line.contains("minikube:")) .head @@ -43,7 +43,7 @@ private[spark] object Minikube extends Logging { .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) } - def getDockerEnv: Map[String, String] = synchronized { + def getDockerEnv: Map[String, String] = { executeMinikube("docker-env", "--shell", "bash") .filter(_.startsWith("export")) .map(_.replaceFirst("export ", "").split('=')) @@ -51,7 +51,7 @@ private[spark] object Minikube extends Logging { .toMap } - def getKubernetesClient: DefaultKubernetesClient = synchronized { + def getKubernetesClient: DefaultKubernetesClient = { val kubernetesMaster = s"https://${getMinikubeIp}:8443" val userHome = System.getProperty("user.home") val kubernetesConf = new ConfigBuilder()