Slack Knowledge Engineering not too long ago underwent information workload migration from AWS EMR 5 (Spark 2/Hive 2 processing engine) to EMR 6 (Spark 3 processing engine). On this weblog, we are going to share our migration journey, challenges, and the efficiency features we noticed within the course of. This weblog goals to help Knowledge Engineers, Knowledge Infrastructure Engineers, and Product Managers who could also be contemplating migrating to EMR 6/Spark 3.
In Knowledge Engineering, our major goal is to help inner groups—akin to Product Engineering, Machine Studying, and Knowledge Science—by offering important datasets and a dependable information infrastructure to facilitate the creation of their very own datasets. We make sure the reliability and timeliness of important billing and utilization information for our shoppers. Sustaining Touchdown Time SLAs (Service Stage Agreements) serves as a measure to maintain up these guarantees.
Over time, the speedy enlargement of our information quantity incessantly led to the violation of our important information pipeline’s SLAs. As we sought alternate options to Spark 2 and Hive 2, Spark 3 emerged as a compelling resolution for all our information processing wants, notably because of its Adaptive Query Execution (AQE) function that might enhance efficiency for a few of our skewed datasets. We launched into this EMR 6/Spark 3 migration because of enhanced efficiency, enhanced safety—with up to date log4j libraries—and the potential for vital cost financial savings.
This year-long venture consisted of two main phases:
- Part 1: Improve EMR from 5.3x to six.x.
- Part 2: Improve from Hive 2.x/Spark 2.x to Spark 3.x.
Migration journey
Present panorama
We at Slack Knowledge Engineering use a federated AWS EMR cluster mannequin to handle all information analytics necessities. The info that lives within the information warehouse is bodily saved in S3 and its metadata is saved in Hive Metastore schema on an RDS database. SQL handles most of our use instances. Moreover, we depend on Scala/PySpark for sure complicated workloads. We use Apache Airflow to orchestrate our workflows and have designed customized Spark Airflow operators for submitting SparkSQL, PySpark and Scala jobs to the EMR cluster by way of Livy Batches API utilizing authenticated HTTP requests.
Right here is an instance of our hierarchical customized Airflow Spark operators:
BaseAirflowOperator → SparkBaseAirflowOperator → CustomPySparkAirflowOperator or CustomSparkSqlAirflowOperator
Right here is an instance of how we use CustomSparkSqlAirflowOperator to schedule Airflow process:
Under is a pictorial illustration of all of the elements working collectively:
Our information warehouse infrastructure includes over 60 EMR clusters, catering to the wants of over 40 groups and supporting hundreds of Airflow Directed Acyclic Graphs (DAGs). Previous to this migration, all workloads have been executed on EMR 5.36, Spark 2.4.8, and Hive 2.3.9.
Migration challenges
As the vast majority of our workloads have been managed by Hive 2, making the transition to Hive 3 in EMR 6 was the popular selection for our inner clients because of minimal adjustments required within the codebase. Nevertheless, we opted to consolidate right into a single compute engine, Spark 3. This strategic choice was made to leverage Spark 3 Adaptive Query Execution (AQE) function, develop experience in Spark 3 throughout our groups, and fine-tune Hadoop clusters solely for Spark operations for effectivity.
Given the dimensions of this migration, a phased strategy was important. Thus, we determined to help each AWS EMR 5 and EMR 6 variations till the migration was full, permitting us to transition workloads with out disrupting roadmaps for current groups.
Nevertheless, sustaining two completely different cluster settings (Hive 2.x/Spark 2.x in EMR 5.x and Spark 3.x in EMR 6) introduced a number of challenges for us:
- How can we help the identical Hive catalog throughout Spark 2/Spark 3 workloads?
- How can we provision completely different variations of EMR clusters?
- How can we management value?
- How can we help completely different variations of our job libraries throughout these clusters?
- How can we submit and route jobs throughout these completely different variations of clusters?
Pre-migration planning
Hive catalog migration
How can we help the identical Hive catalog throughout Spark 2/Spark 3 workloads?
We would have liked to make use of the identical Hive Metastore catalog for our workloads throughout EMR 5/Spark 2 and EMR 6/Spark 3 as migration of our pipelines from Spark 2 to Spark 3 would take a number of quarters. We solved this drawback by migrating our current HMS 2.3.0 catalog to HMS 3.1.0 catalog, utilizing Hive Schema Tool. We executed the next instructions on the EMR 5 grasp host related to the catalog database.
Earlier than migration we took backups of our Hive Metastore database, and in addition took some downtime on job processing throughout migration for schema improve.
Publish schema improve each our EMR 5 and EMR 6 clusters might speak to the identical upgraded HMS 3 catalog DB because it was backward suitable with Hive 2 and Spark 2 purposes.
EMR cluster provisioning
How can we provision completely different variations of EMR clusters? How can we management value?
We use EMR’s golang SDK to launch EMR clusters by way of the RunJobFlow api. This API accepts a JSON-based launch configuration for an EMR cluster. We preserve a base JSON config for all clusters and override customized parameters like InstanceFleets
, Capability
, and Launch Label
on the cluster configuration stage. We created particular EMR 6 configurations for brand spanking new EMR 6 clusters with auto-scaling enabled and low minimal capability to maintain prices underneath management. In the course of the technique of migration, we created extra such EMR 6 cluster configurations for every new cluster. We regulated the capability and total cluster utilization prices by progressively lowering EMR 5 fleet measurement and growing EMR 6 fleets based mostly on utilization.
Job builds throughout completely different Spark variations
How can we help completely different variations of our job libraries throughout these clusters?
We use Bazel as the first software to construct our codebase. Utilizing Bazel, we carried out parallel construct streams for Spark JARs throughout variations 2.x and three.x. We propagated all ongoing config adjustments to each Spark 2 and Spark 3 JARs for consistency. Enabling the construct --config=spark3
flag within the .bazelrc file allowed constructing native JARs with the required model for testing. In our airflow pipelines, as we migrated jobs to EMR 6, the airflow operator would decide Spark 3 jars mechanically based mostly on the flag strategy described beneath.
Airflow operators enhancement
How can we submit and route jobs throughout these completely different variations of clusters?
We enhanced our customized Airflow Spark operator to route jobs to completely different variations of clusters through the use of a boolean flag. This flag supplied the comfort of submitting jobs to both pre-migration and post-migration cluster by a easy toggle.
Moreover we launched 4 logical teams of Spark config sizing choices (SMALL, DEFAULT, LARGE and EXTRA_LARGE) embedded within the Airflow Spark operator. Every choice has its personal executor reminiscence, driver reminiscence, and executor ranges. Sizing choices helped a few of our finish customers emigrate current Hive jobs with minimal understanding of Spark configurations.
That is an instance of our enhanced CustomSparkSqlAirflowOperator:
Code adjustments
For many instances, the present Hive and Spark 2 code ran effective in Spark 3. There have been few instances the place we needed to make adjustments to the code to make it Spark 3 suitable.
One instance of a code change from Hive to Spark 3 could be the usage of a salting perform for skewed joins. Whereas some code used cumbersome subqueries to generate salt keys, others used RAND() within the becoming a member of key as a workaround for dealing with skew. Whereas RAND() within the becoming a member of key works in Hive, it throws an error in Spark 3: org.apache.spark.sql.AnalysisException: nondeterministic expressions are solely allowed in Venture, Filter, Mixture, or Window. We eliminated all skew-handling code and let Spark 3’s Adaptive Query Execution (AQE) maintain the information skew. Extra about AQE within the ‘Migration achieve and impression’ part.
Moreover, Spark 3 threw errors for sure information sort casting eventualities that labored nicely in Spark 2. We needed to change the default worth of some Spark 3 configurations. One instance is setting spark.sql.storeAssignmentPolicy to ‘Legacy’ as a substitute of default Spark 3 worth ‘ANSI’.
We confronted a number of cases the place the Spark 3 job inferred the schema from the Hive Metastore however did not consolidate schemas, erroring with java.lang.StackOverflowError
. This occurred because of an absence of synchronization between the underlying Parquet information and the Hive metastore schema. By setting spark.sql.hive.convertMetastoreParquet to False
, we efficiently resolved the problem.
Publish-migration information validation
We in contrast two tables:
- prod_table_hive2_or_spark2 (EMR 5 desk)
- test_table_spark3 (EMR 6 desk)
We aimed for a precise information match between the tables slightly than counting on sampling, notably as a result of a few of our information, akin to buyer billing information, is mission-critical.
We used config information and macros to allow our SQL script to learn from the manufacturing schema and write to the take a look at schema within the take a look at atmosphere. This helped us to populate the precise prod information within the take a look at schema utilizing Spark 3 for simple comparability. We then ran besides
and depend
SQL queries between prod_table_hive2_or_spark2 and test_table_spark3 in Trino to hurry up the validation course of.
In case of mismatch in besides
or depend
question output, we used our in-house Python framework with the Trino engine for detailed evaluation. We repeatedly monitored submit migration manufacturing runtime of our pipelines utilizing Airflow metadata DB tables and tuned pipelines as required.
There have been few sources of uncertainties within the validation course of. For instance:
- When the code relied on the present timestamp, it brought about variations between manufacturing and improvement runs. We excluded timestamp associated columns whereas validating these tables.
- Random rows appeared when there’s no differentiable
order by
clause within the code to resolve ties. We fastened the code to have a differentiableorder by
clause for future. - Discrepancies appeared within the habits of sure built-in capabilities between Hive and Spark. For example, capabilities like Greatest, which is used to return the best worth of the record of arguments, exhibit completely different habits when one of many arguments is
NULL
. We made code adjustments to stick to the proper enterprise logic.
Migration achieve and impression
After migration, we noticed substantial runtime efficiency enhancements throughout the vast majority of our pipeline duties. Most of our Airflow duties confirmed enhancements starting from 30% to 60%, with some jobs experiencing a powerful 90% enhance in runtime effectivity. We used Airflow metadata DB tables (length
column in task_instance desk) to get runtime comparability numbers. Right here is an instance of how the runtime of certainly one of our important duties improved considerably submit migration:
EMR 6 EMRFS S3-optimized committer fastened the issue of incomplete writes and deceptive SUCCESS statuses for a few of our Spark jobs that dealt with text-based enter and output format. It additionally improves software efficiency by avoiding record and rename operations carried out in S3 throughout job and process commit phases. Previous to EMR 6.4.0, this function was solely accessible for Apache Parquet file format. From EMR 6.4.0 it was prolonged to all frequent codecs, together with parquet, ORC, and text-based codecs (together with CSV and JSON).
As anticipated, we seen a number of Adaptive Query Execution(AQE) enhancements within the question execution plan. One of many key enhancements was dynamically optimizing skew be part of. This helped us to take away a number of traces of skew dealing with logic from our codebase and exchange them by easy be part of situation between the keys. Under is an instance which exhibits AQE (skew=true) trace within the question plan.
One other enchancment was in dynamically coalescing shuffle partitions. This function simplified the tuning of the shuffle partition quantity by deciding on the proper shuffle partition quantity at runtime. We solely had to offer a big sufficient preliminary shuffle partition quantity utilizing spark.sql.adaptive.coalescePartitions.initialPartitionNum configuration. Under is a question plan which exhibits partition depend going from 3000 to 348 utilizing AQE.
Conclusion
The migration to EMR 6 has resulted in vital enchancment within the runtime efficiency, effectivity, and reliability of our information processing pipelines.
AQE enhancements, akin to dynamically optimizing skew joins and coalescing shuffle partitions, have simplified question optimization and decreased the necessity for guide intervention in tuning parameters. S3-optimized committer has addressed points associated to incomplete writes and deceptive statuses in Spark jobs, resulting in improved stability. All the technique of migration described right here ran fairly easily and didn’t trigger any incidents in any of the steps! We improved our pipeline codebase alongside the way in which, making it simpler for brand spanking new engineers to onboard on a clear basis and work solely off Spark 3 engine. The migration has additionally laid the inspiration for a extra scalable lakehouse with availability of recent desk codecs like Iceberg and Hudi in EMR 6. We advocate information organizations to put money into such long-term modernization initiatives because it brings efficiencies throughout the board.
Interested by becoming a member of our Knowledge Engineering crew? Apply now