unnecessary serialize/deserialize for cases where no conversion was performed#329
unnecessary serialize/deserialize for cases where no conversion was performed#329chiragaggarwal wants to merge 4 commits into
Conversation
|
Can one of the admins verify this patch? |
|
Jenkins, test this please. |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. All automated tests passed. |
|
All automated tests passed. |
There was a problem hiding this comment.
Maybe it's better to move the branch out of the iter.map.
There was a problem hiding this comment.
for the case, where partTblObjectInspectorConverter.isInstanceOf[IdentityConverter] is true,
the deserialized value needs to be calculated anyways, which is done as part of iter.map, so would it make sense to pull it out of this iter.map, and push it in another one.
There was a problem hiding this comment.
Sorry, I mean how about like this?
rowWithPartArr.update(1, partValues)
if (!partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
iter.map {...// do as it previously
}
} else {
iter.map {
rowWithPartArr.update(0, partSerDe.deserialize(value))
rowWithPartArr.asInstanceOf[Object]
}
}…erformed Incorporated the review feedback.
|
Incorporated the review feedback. |
There was a problem hiding this comment.
The partValues doesn't change per partition, we can move out of the iter.map, the same for the later one.
|
@chiragaggarwal thank you for the revising, I've tested that in my cluster, it does improve the performance for partitioned based table scan. I've also updated my code example which had a critical typo. |
|
@chenghao-intel I am little confused. Does your last comment imply that the code after revision is fine, or do you think it needs to be changed further, or do you think that the commit before the revision was better? Could you please clarify so that I can act accordingly. |
|
@chiragaggarwal sorry for the confusing. // this is done per partition, and no necessary put it in the iterations (in iter.map).
rowWithPartArr.update(1, partValues)
if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
iter.map {
rowWithPartArr.update(0, partSerDe.deserialize(value))
rowWithPartArr.asInstanceOf[Object]
}
} else {
iter.map {...// do as it orginally
}
}Let me know if you are still confused. |
|
I've tested the latest code in my cluster, it improves the performance up to 30% for a partition tables based query. |
|
@chenghao-intel do you mind submit a PR against Spark SQL to fix the same problem in Spark SQL? (assuming it also exists) |
If partition schema does not match table schema, the row (formed by deserializing through partition serde) is converted to match the table schema. If conversion was performed, convertedRow will be a standard Object, but if conversion wasn't necessary, it will still be lazy.
We can't have both (standard and lazy objects) across partitions, so we serialize and deserialize again to make it lazy.
This extra serialize/deserialize is being performed irrespective of the fact that whether conversion was done or not.
There are two effects of this serialization / deserialization:
Extra serialization / deserilization cost for cases, where no conversion happened.
If a table is created using ThriftDeserializer, the non-availability of serialize function in it makes it unusable in this context.
The fix done is that in case conversion was not done (when partition and table serde match), then this serialization and deserialization step should be skipped, since it is not required as the object would still be lazy. This shall also allow users to be able to use ThriftDeserializer in such a case.