Skip to content

Commit cc9e11c

Browse files
committed
[KYUUBI #6920] Spark SQL engine supports Spark 4.0
### Why are the changes needed? Spark 4.0 continues to receive breaking changes since 4.0.0-preview2, and the 4.0.0 RC1 is scheduled at 20250215, this PR fixes all compatibility for the latest Spark 4.0.0-SNAPSHOT for Spark SQL engine. ### How was this patch tested? Pass GHA with `spark-master` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #6920 from pan3793/spark4. Closes #6920 170430e [Cheng Pan] Revert "ci" c6d8893 [Cheng Pan] fix 86ff7ea [Cheng Pan] fix 75d0bf5 [Cheng Pan] ci 9d88c86 [Cheng Pan] fix spark 4.0 compatibility Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Cheng Pan <chengpan@apache.org>
1 parent 51b8e7b commit cc9e11c

File tree

4 files changed

+96
-22
lines changed

4 files changed

+96
-22
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.time.ZoneId
2323
import org.apache.spark.kyuubi.{SparkProgressMonitor, SQLOperationListener}
2424
import org.apache.spark.kyuubi.SparkUtilsHelper.redact
2525
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
26-
import org.apache.spark.sql.execution.SQLExecution
26+
import org.apache.spark.sql.execution.SparkSQLExecutionHelper
2727
import org.apache.spark.sql.types.{BinaryType, StructField, StructType}
2828
import org.apache.spark.ui.SparkUIUtils.formatDuration
2929

@@ -155,7 +155,7 @@ abstract class SparkOperation(session: Session)
155155
spark.sparkContext.setLocalProperty
156156

157157
protected def withLocalProperties[T](f: => T): T = {
158-
SQLExecution.withSQLConfPropagated(spark) {
158+
SparkSQLExecutionHelper.withSQLConfPropagated(spark) {
159159
val originalSession = SparkSession.getActiveSession
160160
try {
161161
SparkSession.setActiveSession(spark)
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.sql.SparkSession
21+
22+
import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}
23+
24+
object SparkSQLExecutionHelper {
25+
26+
private val sparkSessionClz = DynClasses.builder()
27+
.impl("org.apache.spark.sql.classic.SparkSession") // SPARK-49700 (4.0.0)
28+
.impl("org.apache.spark.sql.SparkSession")
29+
.build()
30+
31+
private val withSQLConfPropagatedMethod =
32+
DynMethods.builder("withSQLConfPropagated")
33+
.impl(SQLExecution.getClass, sparkSessionClz, classOf[() => Any])
34+
.buildChecked(SQLExecution)
35+
36+
def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = {
37+
withSQLConfPropagatedMethod.invokeChecked[T](sparkSession, () => body)
38+
}
39+
}

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.apache.spark.sql.types._
3636
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
3737
import org.apache.kyuubi.engine.spark.schema.RowSet
3838
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded
39-
import org.apache.kyuubi.util.reflect.DynMethods
39+
import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}
4040

4141
object SparkDatasetHelper extends Logging {
4242

@@ -63,8 +63,18 @@ object SparkDatasetHelper extends Logging {
6363
toArrowBatchRdd(plan).collect()
6464
}
6565

66+
private val datasetClz = DynClasses.builder()
67+
.impl("org.apache.spark.sql.classic.Dataset") // SPARK-49700 (4.0.0)
68+
.impl("org.apache.spark.sql.Dataset")
69+
.build()
70+
71+
private val toArrowBatchRddMethod =
72+
DynMethods.builder("toArrowBatchRdd")
73+
.impl(datasetClz)
74+
.buildChecked()
75+
6676
def toArrowBatchRdd[T](ds: Dataset[T]): RDD[Array[Byte]] = {
67-
ds.toArrowBatchRdd
77+
toArrowBatchRddMethod.bind(ds).invoke()
6878
}
6979

7080
/**

externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,15 @@ import java.lang.{Boolean => JBoolean}
2121
import java.sql.Statement
2222
import java.util.Locale
2323

24+
import scala.util.Try
25+
2426
import org.apache.spark.{KyuubiSparkContextHelper, TaskContext}
2527
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
2628
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
2729
import org.apache.spark.sql.catalyst.InternalRow
2830
import org.apache.spark.sql.catalyst.plans.logical.Project
2931
import org.apache.spark.sql.execution.{CollectLimitExec, LocalTableScanExec, QueryExecution, SparkPlan}
30-
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
32+
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
3133
import org.apache.spark.sql.execution.exchange.Exchange
3234
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec}
3335
import org.apache.spark.sql.execution.metric.SparkMetricsTestUtils
@@ -163,6 +165,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
163165
sparkPlan.schema,
164166
"",
165167
true,
168+
true, // spark.sql.execution.arrow.useLargeVarTypes
166169
KyuubiSparkContextHelper.dummyTaskContext())
167170
assert(rows.size == expectSize)
168171
}
@@ -247,7 +250,11 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
247250
|) LIMIT 1
248251
|""".stripMargin)
249252
val smj = plan.collect { case smj: SortMergeJoinExec => smj }
250-
val bhj = adaptivePlan.collect { case bhj: BroadcastHashJoinExec => bhj }
253+
val bhj = (adaptivePlan match {
254+
// SPARK-51008 (4.0.0) adds ResultQueryStageExec
255+
case queryStage: QueryStageExec => queryStage.plan
256+
case plan => plan
257+
}).collect { case bhj: BroadcastHashJoinExec => bhj }
251258
assert(smj.size == 1)
252259
assert(bhj.size == 1)
253260
}
@@ -505,49 +512,67 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
505512
}
506513
}
507514

508-
// the signature of function [[ArrowConverters.fromBatchIterator]] is changed in SPARK-43528
509-
// (since Spark 3.5)
510515
private lazy val fromBatchIteratorMethod = DynMethods.builder("fromBatchIterator")
511-
.hiddenImpl( // for Spark 3.4 or previous
516+
.hiddenImpl( // SPARK-51079: Spark 4.0 or later
512517
"org.apache.spark.sql.execution.arrow.ArrowConverters$",
513518
classOf[Iterator[Array[Byte]]],
514519
classOf[StructType],
515520
classOf[String],
521+
classOf[Boolean],
522+
classOf[Boolean],
516523
classOf[TaskContext])
517-
.hiddenImpl( // for Spark 3.5 or later
524+
.hiddenImpl( // SPARK-43528: Spark 3.5
518525
"org.apache.spark.sql.execution.arrow.ArrowConverters$",
519526
classOf[Iterator[Array[Byte]]],
520527
classOf[StructType],
521528
classOf[String],
522529
classOf[Boolean],
523530
classOf[TaskContext])
524-
.build()
531+
.hiddenImpl( // for Spark 3.4 or previous
532+
"org.apache.spark.sql.execution.arrow.ArrowConverters$",
533+
classOf[Iterator[Array[Byte]]],
534+
classOf[StructType],
535+
classOf[String],
536+
classOf[TaskContext])
537+
.buildChecked()
538+
539+
private lazy val arrowConvertersObject = DynFields.builder()
540+
.impl("org.apache.spark.sql.execution.arrow.ArrowConverters$", "MODULE$")
541+
.buildStaticChecked[Any]()
542+
.get()
525543

526544
def fromBatchIterator(
527545
arrowBatchIter: Iterator[Array[Byte]],
528546
schema: StructType,
529547
timeZoneId: String,
530548
errorOnDuplicatedFieldNames: JBoolean,
531-
context: TaskContext): Iterator[InternalRow] = {
532-
val className = "org.apache.spark.sql.execution.arrow.ArrowConverters$"
533-
val instance = DynFields.builder().impl(className, "MODULE$").build[Object]().get(null)
534-
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.5") {
535-
fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
536-
instance,
549+
largeVarTypes: JBoolean,
550+
context: TaskContext): Iterator[InternalRow] =
551+
Try { // SPARK-51079: Spark 4.0 or later
552+
fromBatchIteratorMethod.invokeChecked[Iterator[InternalRow]](
553+
arrowConvertersObject,
537554
arrowBatchIter,
538555
schema,
539556
timeZoneId,
540557
errorOnDuplicatedFieldNames,
558+
largeVarTypes,
541559
context)
542-
} else {
543-
fromBatchIteratorMethod.invoke[Iterator[InternalRow]](
544-
instance,
560+
}.recover { case _: Exception => // SPARK-43528: Spark 3.5
561+
fromBatchIteratorMethod.invokeChecked[Iterator[InternalRow]](
562+
arrowConvertersObject,
545563
arrowBatchIter,
546564
schema,
547565
timeZoneId,
566+
errorOnDuplicatedFieldNames,
548567
context)
549-
}
550-
}
568+
}.recover { case _: Exception => // for Spark 3.4 or previous
569+
fromBatchIteratorMethod.invokeChecked[Iterator[InternalRow]](
570+
arrowConvertersObject,
571+
arrowBatchIter,
572+
schema,
573+
timeZoneId,
574+
context)
575+
}.get
551576

552577
class JobCountListener extends SparkListener {
553578
var numJobs = 0

0 commit comments

Comments
 (0)