[#1750] feat(remote merge): Support Spark.#2405
[#1750] feat(remote merge): Support Spark.#2405zhengchenyu wants to merge 5 commits intoapache:masterfrom
Conversation
zuston
left a comment
There was a problem hiding this comment.
From my sight, this feature now can't be used in Spark SQL. Maybe RDD could use this.
| import java.util | ||
| import java.util.Map | ||
|
|
||
| object RMSparkSQLTest { |
There was a problem hiding this comment.
Aha! . How about renaming the RemoteMergeSparkSQLTest . RM looks Yarn ResourceManager
There was a problem hiding this comment.
For me, when I mention RM, the first thing that comes to my mind is Yarn ResourceManager. But I don't have any other good names.
There was a problem hiding this comment.
Or I will rename RM to RMS or RS. In fact, merge sort is implemented on the server side, but there is no combine, and remote sort is fine.
If so, the previous code and documentation need to be modified. Maybe a new PR is needed.
This test is based on draft pr apache/spark#50248. |
cc @LuciferYang |
This will break the code implement of Spark. You would better to insert a new logic plan represents the distribution and partitioning after shuffling. You only need to implement some optimization rules. |
Are you talking about changes to Spark? My initial idea was also to see if I could add a new rule. Maybe for map side, I could add new rules. But for reduce, adding a new SortExec is determined by determining whether distribution and partitioning match, which is not easy to do by adding a new Rule. |
|
also cc @summaryzb |
Does this one depend on SPARK-51398 being merged first? |
Yes, Meta Cosco ever did similar things. You can see cc @c21 Excuse me, sorry to bother you. Is it possible that we don't change the code of Spark and only add some rules to implement this feature? Could you give us some suggestion? |
| : dependency.valueClassName()) | ||
| .setComparatorClass( | ||
| dependency | ||
| .keyOrdering() |
There was a problem hiding this comment.
if the keyOrdering is empty, the remote merge could be disabled? @zhengchenyu
There was a problem hiding this comment.
The current implementation cannot enable remote merge based on whether keyOrdering is empty, and can only be done through configuration.
For spark rdd, there may be such a situation (not for spark sql), keyOrdering is empty, but combineclass is not empty, it will be sorted by hash and then combined in memory.
This can be further improved, especially for spark sql.
| return encode(o); | ||
| } | ||
| }) | ||
| .get()) |
There was a problem hiding this comment.
if dependency.keyOrdering().isDefined() == false, an exception will be thrown here
What changes were proposed in this pull request?
Support spark Framework
Why are the changes needed?
#1750
Does this PR introduce any user-facing change?
No.
How was this patch tested?
unit test, integration test, real job in cluster.