-
Notifications
You must be signed in to change notification settings - Fork 117
Basic Secure HDFS Support [514] #540
Changes from 4 commits
7612bf5
50f47d0
87df4a7
67856a5
7cdae31
04aa26f
765455d
488b37e
37feb22
4e44027
86c7b8f
64b0af7
ba2bafc
0c99503
a9d074b
a3b12a7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -752,6 +752,61 @@ from the other deployment modes. See the [configuration page](configuration.html | |
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.kubernetes.kerberos.enabled</code></td> | ||
| <td>false</td> | ||
| <td> | ||
| Specify whether your job requires a Kerberos Authentication to access HDFS. By default, we | ||
| will assume that you will not require secure HDFS access. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.kubernetes.kerberos.keytab</code></td> | ||
| <td>(none)</td> | ||
| <td> | ||
| Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify | ||
| the location of your Kerberos keytab to be used in order to access Secure HDFS. This is optional as you | ||
| may login by running <code>kinit</code> before running the spark-submit, and the submission client | ||
| will look within your local TGT cache to resolve this. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.kubernetes.kerberos.principal</code></td> | ||
| <td>(none)</td> | ||
| <td> | ||
| Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify | ||
| your Kerberos principal that you wish to use to access Secure HDFS. This is optional as you | ||
| may login by running <code>kinit</code> before running the spark-submit, and the submission client | ||
| will look within your local TGT cache to resolve this. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.kubernetes.kerberos.rewewer.principal</code></td> | ||
| <td>(none)</td> | ||
| <td> | ||
| Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify | ||
| the principal that you wish to use to handle renewing of Delegation Tokens. This is optional as you | ||
|
||
| we will set the principal to be the job users principal by default. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.kubernetes.kerberos.tokensecret.name</code></td> | ||
| <td>(none)</td> | ||
| <td> | ||
| Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify | ||
| the name of the secret where your existing delegation token data is stored. You must also specify the | ||
| item key <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> where your data is stored on the secret. | ||
| This is optional in the case that you want to use pre-existing secret, otherwise a new secret will be automatically | ||
| created. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.kubernetes.kerberos.tokensecret.itemkey</code></td> | ||
| <td>spark.kubernetes.kerberos.dt.label</td> | ||
| <td> | ||
| Assuming you have set <code>spark.kubernetes.kerberos.enabled</code> to be true. This will let you specify | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Curious. Is the token refresh server supposed to renew this pre-populated token as well? Or is it supposed to be renewed by the job user? We may want to comment on that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The token refresh server is supposed to renew this pre-populated token. The assumption is that if you supply a pre-populated token it will be automatically updated by either an administrator or the token refresh server. In the later PR if you think, you should probably note this. |
||
| the data item key name within the pre-specified secret where the data of your existing delegation token data is stored. | ||
| We have a default value of <code>spark.kubernetes.kerberos.tokensecret.itemkey</code> should you not include it. But | ||
| you should always include this if you are proposing a pre-existing secret contain the delegation token data. | ||
| <td><code>spark.executorEnv.[EnvironmentVariableName]</code></td> | ||
| <td>(none)</td> | ||
| <td> | ||
|
|
@@ -791,4 +846,3 @@ from the other deployment modes. See the [configuration page](configuration.html | |
| Running Spark on Kubernetes is currently an experimental feature. Some restrictions on the current implementation that | ||
| should be lifted in the future include: | ||
| * Applications can only run in cluster mode. | ||
| * Only Scala and Java applications can be run. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,86 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import java.io.File | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Imports are not consistent with the rest of the project. Order should be as follows everywhere:
Please look over all files and fix all imports. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was curious about the import order. According to http://spark.apache.org/contributing.html, the recommended import order is slightly different. scala.* and other 3rd parties libraries are separated by an empty space. Do we know which one is correct?
An example from the same page: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually @kimoonkim and I think our code is incorrect in most places. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mccheah Cool. Should we then follow the import order suggested in http://spark.apache.org/contributing.html going forward? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes we should. We can fix the ordering as we merge upstream. |
||
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerBuilder, KeyToPathBuilder, PodBuilder} | ||
|
|
||
| import org.apache.spark.deploy.k8s.constants._ | ||
| import org.apache.spark.internal.Logging | ||
|
|
||
| /** | ||
| * This is separated out from the HadoopConf steps API because this component can be reused to | ||
| * set up the Hadoop Configuration for executors as well. | ||
| */ | ||
| private[spark] trait HadoopConfBootstrap { | ||
| /** | ||
| * Bootstraps a main container with the ConfigMaps containing Hadoop config files | ||
| * mounted as volumes and an ENV variable pointing to the mounted file. | ||
| */ | ||
| def bootstrapMainContainerAndVolumes( | ||
| originalPodWithMainContainer: PodWithMainContainer) | ||
|
||
| : PodWithMainContainer | ||
| } | ||
|
|
||
| private[spark] class HadoopConfBootstrapImpl( | ||
| hadoopConfConfigMapName: String, | ||
| hadoopConfigFiles: Seq[File], | ||
| hadoopUGI: HadoopUGIUtil) extends HadoopConfBootstrap with Logging{ | ||
|
||
|
|
||
| override def bootstrapMainContainerAndVolumes( | ||
| originalPodWithMainContainer: PodWithMainContainer) | ||
| : PodWithMainContainer = { | ||
| logInfo("HADOOP_CONF_DIR defined. Mounting HDFS specific .xml files") | ||
|
||
| val keyPaths = hadoopConfigFiles.map(file => | ||
| new KeyToPathBuilder() | ||
| .withKey(file.toPath.getFileName.toString) | ||
| .withPath(file.toPath.getFileName.toString) | ||
|
||
| .build()) | ||
| val hadoopSupportedPod = new PodBuilder(originalPodWithMainContainer.pod) | ||
| .editSpec() | ||
| .addNewVolume() | ||
| .withName(HADOOP_FILE_VOLUME) | ||
| .withNewConfigMap() | ||
| .withName(hadoopConfConfigMapName) | ||
| .withItems(keyPaths.asJava) | ||
| .endConfigMap() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrong indention. |
||
| .endVolume() | ||
| .endSpec() | ||
| .build() | ||
| val mainContainerWithMountedHadoopConf = new ContainerBuilder( | ||
|
||
| originalPodWithMainContainer.mainContainer) | ||
| .addNewVolumeMount() | ||
| .withName(HADOOP_FILE_VOLUME) | ||
| .withMountPath(HADOOP_CONF_DIR_PATH) | ||
| .endVolumeMount() | ||
| .addNewEnv() | ||
| .withName(ENV_HADOOP_CONF_DIR) | ||
| .withValue(HADOOP_CONF_DIR_PATH) | ||
| .endEnv() | ||
| .addNewEnv() | ||
| .withName(ENV_SPARK_USER) | ||
| .withValue(hadoopUGI.getShortName) | ||
| .endEnv() | ||
| .build() | ||
| originalPodWithMainContainer.copy( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: put an empty line before the returned value. |
||
| pod = hadoopSupportedPod, | ||
| mainContainer = mainContainerWithMountedHadoopConf) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,79 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} | ||
|
|
||
| import scala.util.Try | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.fs.FileSystem | ||
| import org.apache.hadoop.security.{Credentials, UserGroupInformation} | ||
| import org.apache.hadoop.security.token.{Token, TokenIdentifier} | ||
| import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier | ||
|
|
||
|
|
||
| // Function of this class is merely for mocking reasons | ||
| private[spark] class HadoopUGIUtil{ | ||
|
||
| def getCurrentUser: UserGroupInformation = UserGroupInformation.getCurrentUser | ||
|
|
||
| def getShortName: String = getCurrentUser.getShortUserName | ||
|
||
|
|
||
| def isSecurityEnabled: Boolean = UserGroupInformation.isSecurityEnabled | ||
|
|
||
| def loginUserFromKeytabAndReturnUGI(principal: String, keytab: String): UserGroupInformation = | ||
| UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) | ||
|
|
||
| def dfsAddDelegationToken(hadoopConf: Configuration, renewer: String, creds: Credentials) | ||
| : Iterable[Token[_ <: TokenIdentifier]] = | ||
| FileSystem.get(hadoopConf).addDelegationTokens(renewer, creds) | ||
|
||
|
|
||
| def getCurrentTime: Long = System.currentTimeMillis() | ||
|
||
|
|
||
| // Functions that should be in Core with Rebase to 2.3 | ||
| @deprecated("Moved to core in 2.3", "2.3") | ||
| def getTokenRenewalInterval( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Line 30 says "Function of this class is merely for mocking reasons". But it seems this function has real business logic, more than just mocking purpose. Move it to some other class? |
||
| renewedTokens: Iterable[Token[_ <: TokenIdentifier]], | ||
| hadoopConf: Configuration): Option[Long] = { | ||
| val renewIntervals = renewedTokens.filter { | ||
| _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] | ||
| }.flatMap { token => | ||
| Try { | ||
| val newExpiration = token.renew(hadoopConf) | ||
| val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] | ||
| val interval = newExpiration - identifier.getIssueDate | ||
| interval | ||
| }.toOption | ||
| } | ||
| renewIntervals.reduceLeftOption(_ min _) | ||
| } | ||
|
|
||
| @deprecated("Moved to core in 2.3", "2.3") | ||
| def serialize(creds: Credentials): Array[Byte] = { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ditto. It has business logic that should be tested than just being mocked. Move to some other class? |
||
| val byteStream = new ByteArrayOutputStream | ||
| val dataStream = new DataOutputStream(byteStream) | ||
|
||
| creds.writeTokenStorageToStream(dataStream) | ||
| byteStream.toByteArray | ||
| } | ||
|
|
||
| @deprecated("Moved to core in 2.3", "2.3") | ||
| def deserialize(tokenBytes: Array[Byte]): Credentials = { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here. Move to some other class? |
||
| val creds = new Credentials() | ||
| creds.readTokenStorageStream(new DataInputStream(new ByteArrayInputStream(tokenBytes))) | ||
|
||
| creds | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.spark.deploy.k8s | ||
|
|
||
| import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder} | ||
|
|
||
| import org.apache.spark.deploy.k8s.constants._ | ||
| import org.apache.spark.internal.Logging | ||
|
|
||
|
|
||
| /** | ||
| * This is separated out from the HadoopConf steps API because this component can be reused to | ||
| * mounted the DT secret for executors as well. | ||
| */ | ||
| private[spark] trait KerberosTokenConfBootstrap { | ||
| // Bootstraps a main container with the Secret mounted as volumes and an ENV variable | ||
| // pointing to the mounted file containing the DT for Secure HDFS interaction | ||
| def bootstrapMainContainerAndVolumes( | ||
| originalPodWithMainContainer: PodWithMainContainer) | ||
|
||
| : PodWithMainContainer | ||
| } | ||
|
|
||
| private[spark] class KerberosTokenConfBootstrapImpl( | ||
| secretName: String, | ||
| secretItemKey: String, | ||
| userName: String) extends KerberosTokenConfBootstrap with Logging{ | ||
|
||
|
|
||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit. Have one empty line instead of two? |
||
|
|
||
| override def bootstrapMainContainerAndVolumes( | ||
| originalPodWithMainContainer: PodWithMainContainer) | ||
|
||
| : PodWithMainContainer = { | ||
| logInfo("Mounting HDFS DT from Secret for Secure HDFS") | ||
| val dtMountedPod = new PodBuilder(originalPodWithMainContainer.pod) | ||
|
||
| .editOrNewSpec() | ||
| .addNewVolume() | ||
| .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) | ||
| .withNewSecret() | ||
| .withSecretName(secretName) | ||
| .endSecret() | ||
| .endVolume() | ||
| .endSpec() | ||
| .build() | ||
| val mainContainerWithMountedKerberos = new ContainerBuilder( | ||
|
||
| originalPodWithMainContainer.mainContainer) | ||
| .addNewVolumeMount() | ||
| .withName(SPARK_APP_HADOOP_SECRET_VOLUME_NAME) | ||
| .withMountPath(SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR) | ||
| .endVolumeMount() | ||
| .addNewEnv() | ||
| .withName(ENV_HADOOP_TOKEN_FILE_LOCATION) | ||
| .withValue(s"$SPARK_APP_HADOOP_CREDENTIALS_BASE_DIR/$secretItemKey") | ||
|
||
| .endEnv() | ||
| .addNewEnv() | ||
| .withName(ENV_SPARK_USER) | ||
|
||
| .withValue(userName) | ||
| .endEnv() | ||
| .build() | ||
| originalPodWithMainContainer.copy( | ||
| pod = dtMountedPod, | ||
| mainContainer = mainContainerWithMountedKerberos) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -157,6 +157,12 @@ package object config extends Logging { | |
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val KUBERNETES_SHUFFLE_DIR = | ||
|
||
| ConfigBuilder("spark.kubernetes.shuffle.dir") | ||
| .doc("Path to the shared shuffle directories.") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val KUBERNETES_SHUFFLE_APISERVER_URI = | ||
| ConfigBuilder("spark.kubernetes.shuffle.apiServer.url") | ||
| .doc("URL to the Kubernetes API server that the shuffle service will monitor for Spark pods.") | ||
|
|
@@ -496,6 +502,49 @@ package object config extends Logging { | |
|
|
||
| private[spark] val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." | ||
|
|
||
| private[spark] val KUBERNETES_KERBEROS_SUPPORT = | ||
| ConfigBuilder("spark.kubernetes.kerberos.enabled") | ||
| .doc("Specify whether your job is a job that will require a Delegation Token to access HDFS") | ||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val KUBERNETES_KERBEROS_KEYTAB = | ||
| ConfigBuilder("spark.kubernetes.kerberos.keytab") | ||
| .doc("Specify the location of keytab" + | ||
| " for Kerberos in order to access Secure HDFS") | ||
|
||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val KUBERNETES_KERBEROS_PRINCIPAL = | ||
| ConfigBuilder("spark.kubernetes.kerberos.principal") | ||
| .doc("Specify the principal" + | ||
| " for Kerberos in order to access Secure HDFS") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val KUBERNETES_KERBEROS_RENEWER_PRINCIPAL = | ||
| ConfigBuilder("spark.kubernetes.kerberos.rewnewer.principal") | ||
|
||
| .doc("Specify the principal" + | ||
| " you wish to renew and retrieve your Kerberos values with") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val KUBERNETES_KERBEROS_DT_SECRET_NAME = | ||
| ConfigBuilder("spark.kubernetes.kerberos.tokensecret.name") | ||
| .doc("Specify the name of the secret where " + | ||
| " your existing delegation token is stored. This removes the need" + | ||
| " for the job user to provide any keytab for launching a job") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] val KUBERNETES_KERBEROS_DT_SECRET_ITEM_KEY = | ||
| ConfigBuilder("spark.kubernetes.kerberos.tokensecret.itemkey") | ||
| .doc("Specify the item key of the data where " + | ||
| " your existing delegation token is stored. This removes the need" + | ||
| " for the job user to provide any keytab for launching a job") | ||
| .stringConf | ||
| .createOptional | ||
|
|
||
| private[spark] def resolveK8sMaster(rawMasterString: String): String = { | ||
| if (!rawMasterString.startsWith("k8s://")) { | ||
| throw new IllegalArgumentException("Master URL should start with k8s:// in Kubernetes mode.") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo. s/rewewer/renewer/