spark on kubernetes

the motivation for this post is to document our experience migrating our spark data processing to run on kubernetes.

what we use spark for

its useful to know what we use spark for because this will play into our choices for our setup for spark on kubernetes. our main use of spark is for data processing, where the data is many times larger than what fits in memory on spark executors, and we tend to write data that is the same order of magnitude as the data that is being read. because of this our focus is on efficient out-of-core processing and large shuffles. we are less interested in query performance (e.g. using spark as the back-end for a SQL engine), which has somewhat different requirements such as efficient predicate push-down, query plan optimizations, quick resource allocation, etc.

our deployment of spark before kubernetes

we started out by running spark on hadoop on physical servers, and when we initially migrated to cloud platforms we leveraged the hadoop as-a-service offerings like EMR, dataproc and hdinsight, where we would install our own spark version with our software. our workflows would pop up temporary hadoop clusters, and for each cluster copy data from bucket store (S3, GCS, ABFS) into HDFS on the cluster, run the jobs, and copy result from HDFS back to the bucket store, after which the cluster was destroyed.

the main benefits of this setup were that that it was similar to what we were used to, we could leverage HDFS (which is still a lot faster than bucket stores) for intermediate results, and we had the spark shuffle service available. the downsides were that dynamic resource scaling capabilities were limited, there was a lot of overhead for these pop-up clusters, and we always had to deal with potential library (jar) conflicts with whatever was installed on these clusters in the hadoop classpath (a problem that also existed on traditional hadoop clusters).

starting out on kubernetes

the first thing to get your head around on kubernetes is that there is no server you launch your jobs from. what i mean by this is that the setup where you launch your job using spark-submit from a semi-permanent virtual machine (on which you can get all comfortable and install all your favorite tools) is gone.

now you might read about --deploy-mode=cluster and think this is you ticket to launching jobs from a virtual machine that sits besides the kubernetes cluster, and you are right, although it it is not really the same. for example you cannot use local files anymore. despite the fact that this approach works i think its an anti-pattern, because what --deploy-mode=cluster does is to launch a pod on kubernetes and in that pod launch spark-submit again with --deploy-mode=client. so this big install of spark that you launch from has become a really complex way to launch a single pod, something that can be done more simply using kubectl. another reason to avoid this setup is that the launch sits outside the kubernetes cluster, making it harder to manage using the really powerful kubernetes tool set.

another tool worth mentioning is the spark operator, which is pretty neat. the operator leverages a custom resource. it creates a new resource type in the kubernetes API called SparkApplication for spark jobs. there is a controller (that nobody needs direct access to, it runs on the cluster as a pod somewhere) that watches for any creation of or changes to SparkApplication resources and takes action to get them to the desired state. to me this is a way better way to go about it that than spark-submit --deploy-mode cluster. however now you launch a SparkApplication not a pod, and its not like SparkApplication extends pod or can be used in its place easily. why does that matter? well there are lots of places you can use pods, like in workflows, so this SparkApplication we found to be somewhat limiting.

we decided the best way to go about it is to launch the spark jobs directly in pods ourselves. our main motivations for this choice were that we had full control and that we could use all the tools that leverage pods like workflow engines.

kubernetes cluster settings

spark batch jobs are not typical kubernetes applications: they require bulk scheduling of large amounts of compute resources and large and fast local scratch storage in pods.

note that tresata currently deploys kubernetes on AWS EKS and azure AKS so we will focus on those platforms here. since i am more familiar with AWS i will go in to more detail for AWS than for azure.

scale horizontally

batch jobs require large amounts of compute resources for limited time periods. we ideally want to allocate resources on demand so we only pay for the resources we use.

on AWS for this one could use fargate, however when we were building this fargate had no good support for mounting additional storage or increasing the default container storage size, and 20G of disk space is not enough for a spark executor. apparently today fargate supports up to 175Gb of storage, see issue 1794. we haven’t tested this but 175Gb would still not be enough for many of our use cases either. instead of using fargate we decided to use managed node pools and karpenter to add or remove kubernetes nodes (EC2 instances) as required.

on azure we use cluster autoscaler which is not as versatile or powerful as karpenter but it works for now.

large scratch storage

to get the large scratch space in the spark executor pods there are generally three options:

  • use persistent volume claims to attach block storage directly to the spark executor pods.
    this approach is ideal in a multi-tenant setup where we want to use encrypted scratch storage that is fully isolated per executor (and per tenant if an executor is used for only one tenant). the relevant settings for this in spark-defaults.conf are:

    spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName=OnDemand
    spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass=spark-local-dir
    spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit=200Gi
    spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path=/data
    spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly=false
    
  • attach block storage to the kubernetes nodes on which executor pods are launched.
    this is perhaps the simplest approach but it does not give the multi-tenant isolation nor does it give the performance of locally attached disks.
  • use nodes with large and fast local storage to launch executor pods on.
    this involves using virtual machine types that have large locally attached SSD storage. it does not give multi-tenant isolation of scratch space but it gives the best performance. kubernetes scratch space (in /var/lib/containerd) needs to be moved from the root volume to the mount for the local disk volume, so that the local storage will be used for emptyDir volumes in the pods. AWS EKS optimized AMIs include a script to do this on startup, which can be configured to run using the userData attribute. in a karpenter node template:

    userData: |
      #!/bin/bash
      /bin/setup-local-disks raid0
    

kubernetes RBAC

when spark is launched on kubernetes what is being launched is the driver, which in turn will launch the executor pods. to do so requires the pod that runs the spark driver to be launched with a service account that has the required kubernetes RBAC permissions to launch pods in the same namespace. in kubernetes RBAC the permission to launch pods means the permission to launch pods as any service account within that namespace. because of this it should be assumed that spark jobs can gain access to the permissions of any service account in the namespace, and therefore to avoid unintended privilege escalations spark jobs are usually run in their own dedicated namespace, or at least in a namespace where no service accounts exists with escalated privileges.

IAM on kubernetes

the spark jobs run on kubernetes using kubernetes RBAC permissions, but you will also want access to data that is stored in your cloud providers bucket store (and perhaps also databases etc.) for which spark will need to use the cloud providers IAM permissions. this involves setting up an OIDC provider for the kubernetes cluster that makes the identities for the kubernetes service accounts available in IAM.

For AWS see aws documentation. to assign an AWS IAM role to a kubernetes service account the role must have a trust policy that allows the service account to become that role, and the service account must have the annotation eks.amazonaws.com/role-arn that links back to the role.

In azure the same setup is newer and is called workload identity and uses managed identities. Note that hadoop-azure as of version 3.3.6 does not support workload identity, see HADOOP-18610. To enable it you have to build hadoop-azure yourself after merging in this pull request and replace the hadoop-azure jar in your spark install with the patched version.

docker images

we build spark for scala 2.13. to build the spark docker image first build the spark distribution that is suitable for kubernetes and next the docker image:

$ git clone https://github.com/apache/spark.git
$ cd spark
$ git checkout v3.5.0
$ dev/change-scala-version.sh 2.13
$ dev/make-distribution.sh --name kubernetes --tgz -Pscala-2.13 -Pkubernetes -Phadoop-cloud
$ tar -xvzf spark-3.5.0-bin-kubernetes.tgz
$ cd spark-3.5.0-bin-kubernetes/
$ docker build -t spark:3.5.0 -f ../resource-managers/kubernetes/docker/src/main/dockerfiles/spark/Dockerfile .

after this we build another docker image that captures our preferred spark settings using the just created spark docker image as your base (parent) image. this layering doesn’t cost anything in docker and it provides a clear separation of concerns. at tresata our docker image that has our preferred spark settings is called tresata/spark-submit (for historical reasons). it is basically the same as the spark image but with custom /opt/spark/conf/spark-env.sh and /opt/spark/conf/spark-defaults.conf. the next sections will go over the changes to spark-env.sh and spark-defaults.conf.

spark settings

you will want to customize /opt/spark/conf/spark-env.sh and /opt/spark/conf/spark-defaults.conf to provide reasonable defaults for kubernetes on your target cluster provider. tresata currently maintains images with these default settings for AWS EKS and azure AKS.

hostname

you might encounter this error in your spark driver pod:

ERROR Main: Failed to initialize Spark session.
org.apache.hadoop.security.KerberosAuthException: failure to login: javax.security.auth.login.LoginException: java.lang.NullPointerException: invalid null input: name
    at jdk.security.auth/com.sun.security.auth.UnixPrincipal.<init>(UnixPrincipal.java:67)
    at jdk.security.auth/com.sun.security.auth.module.UnixLoginModule.login(UnixLoginModule.java:134)
    at java.base/javax.security.auth.login.LoginContext.invoke(LoginContext.java:754)
    at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:678)
    at java.base/javax.security.auth.login.LoginContext$4.run(LoginContext.java:676)
    at java.base/java.security.AccessController.doPrivileged(AccessController.java:714)
    at java.base/javax.security.auth.login.LoginContext.invokePriv(LoginContext.java:676)
    at java.base/javax.security.auth.login.LoginContext.login(LoginContext.java:587)
    at org.apache.hadoop.security.UserGroupInformation$HadoopLoginContext.login(UserGroupInformation.java:2091)
    at org.apache.hadoop.security.UserGroupInformation.doSubjectLogin(UserGroupInformation.java:2001)
    at org.apache.hadoop.security.UserGroupInformation.createLoginUser(UserGroupInformation.java:722)
    at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:672)
    at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:579)
    at org.apache.spark.util.Utils$.$anonfun$getCurrentUserName$1(Utils.scala:2451)

the fix is to add to spark-env.sh:

SPARK_PUBLIC_DNS=$(hostname -i)
SPARK_LOCAL_HOSTNAME=$(hostname -i)

dynamic allocation

a big appeal of deploying on kubernetes is that resources can be allocated on demand, as discussed in the scale horizontally section. however for this to work for spark jobs need to scale up and down resources as needed, which is known within spark as dynamic allocation. settings we recommend in spark-defaults.conf:

spark.dynamicAllocation.enabled true
spark.dynamicAllocation.executorAllocationRatio 0.1
spark.dynamicAllocation.schedulerBacklogTimeout 1s
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 90s
spark.dynamicAllocation.shuffleTracking.enabled true

shuffleTtracking.enabled is to avoid losing shuffle blocks which are kept on the executor nodes. this makes scale-down less efficient but its better than the alternative which is to lose shuffle blocks and restart tasks. executorAllocationRatio is lowered from its default to avoid too aggressive scale up which risks wasting (and paying for) unnecessary virtual machines.

shuffle

on kubernetes you likely do not have a spark shuffle service available, unless you deployed something like uniffle in which case you can skip this section. without a shuffle service you have to rely on shuffle blocks being stored in the executor pods, which means these shuffle blocks can get lost when executor pods get shut down, which is not unusual if you use dynamic allocation. to handle this you will have to rely on graceful decommissioning for executor pods. in spark-defaults.conf:

spark.shuffle.service.enabled false
spark.decommission.enabled true
spark.storage.decommission.enabled true
spark.storage.decommission.shuffleBlocks.enabled true

block storage

we considered deploying HDFS on kubernetes but decided against it due to the fixed costs of having a HDFS cluster and because data transfer performance of S3 and ABFS have gotten better in the last few years. however spark still uses hadoop libraries for interacting with filesystems including S3 and ABFS, using hadoop OutputFormat, OutputCommitter and related classes. we chose to use magic committer for S3, and the manifest committer for ABFS. For s3 we also use version 2 of the fileoutputcommitter algorithm, which is risky, but performance on S3 was otherwise simply not acceptable. ABFS performance on azure is overall much better.

on AWS:

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.hadoop.fs.s3a.path.style.access true
spark.hadoop.fs.s3a.committer.name magic
spark.hadoop.fs.s3a.committer.magic.enabled true
spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory

on azure:

spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1
spark.hadoop.mapreduce.outputcommitter.factory.scheme.abfs org.apache.hadoop.fs.azurebfs.commit.AzureManifestCommitterFactory
Written on December 9, 2023