Last active
September 16, 2020 17:42
-
-
Save omegaml/243b385e15d66ff5419926ffc82c7b0c to your computer and use it in GitHub Desktop.
MDataFrame merge quickfix
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This fixes MDataFrame.merge() where the join results in duplicate object keys | |
When to use: | |
if your mdf.merge() operation results in a duplicate key error exception | |
Usage: | |
!pip install -q getgist | |
!rm -f *omx_qfmdfmerge.py && getgist -y omegaml omx_qfmdfmerge.py | |
import omx_qfmdfmerge | |
Results: | |
mdf.merge() with duplicate keys will return a new MDataFrame with merged documents | |
""" | |
def patch(merge): | |
from omegaml import mdataframe as mdf | |
if not hasattr(mdf.MDataFrame, '_merge'): | |
mdf.MDataFrame._merge = mdf.MDataFrame.merge | |
mdf.MDataFrame.merge = merge | |
def merge(self, *args, **kwargs): | |
inspect = kwargs.pop('inspect', False) | |
pipeline = super(self.__class__, self)._merge(*args, **kwargs, inspect=True) | |
target_name = pipeline[-1]['$out'] | |
expected_columns = list(pipeline[2]['$project'].keys()) | |
pipeline[2]['$project']['_id'] = 0 | |
if inspect: | |
return pipeline | |
result = self.collection.aggregate(pipeline, allowDiskUse=True) | |
result = self._clone(self.collection.database[target_name])[expected_columns] | |
return result | |
patch(merge) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment