Last active
August 29, 2015 14:15
-
-
Save fgalan/1922df50f35ab3529b91 to your computer and use it in GitHub Desktop.
merge_default_sp_dups.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
# -*- coding: latin-1 -*- | |
# Copyright 2015 Telefonica Investigacion y Desarrollo, S.A.U | |
# | |
# This file is part of Orion Context Broker. | |
# | |
# Orion Context Broker is free software: you can redistribute it and/or | |
# modify it under the terms of the GNU Affero General Public License as | |
# published by the Free Software Foundation, either version 3 of the | |
# License, or (at your option) any later version. | |
# | |
# Orion Context Broker is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero | |
# General Public License for more details. | |
# | |
# You should have received a copy of the GNU Affero General Public License | |
# along with Orion Context Broker. If not, see http://www.gnu.org/licenses/. | |
# | |
# For those usages not covered by this license please contact with | |
# iot_support at tid dot es | |
from __future__ import division # need for seconds calculation (could be removed with Python 2.7) | |
from pymongo import MongoClient | |
import sys | |
from datetime import timedelta, datetime | |
if len(sys.argv) != 2: | |
print "missing db name" | |
sys.exit() | |
DB = sys.argv[1] | |
COL = 'entities' | |
need_fix = False | |
client = MongoClient('localhost', 27017) | |
db = client[DB] | |
def warn(n, s): | |
print '%5d: WARNING: %s' % (n, s) | |
global need_fix | |
need_fix = True | |
def duplicated_attr_name(n, list): | |
names = {} | |
for attr in list: | |
if (names.has_key(attr['name'])): | |
warn(n, 'attribute %s is duplicated' % attr['name']) | |
return True | |
else: | |
names[attr['name']] = 1 | |
return False | |
def has_attr(attr, doc_attrs): | |
for a in doc_attrs: | |
if a['name'] == attr['name']: | |
return True | |
return False | |
def is_attr_in_doc_older(attr, doc, dup): | |
# This method assumes that attribute is not duplicated in doc or dup (the method is called | |
# after passing that check) | |
mod_date_doc = 0 | |
mod_date_dup = 0 | |
for a in doc['attrs']: | |
if (a['name'] == attr): | |
mod_date_doc = a['modDate'] | |
for a in dup['attrs']: | |
if (a['name'] == attr): | |
mod_date_dup = a['modDate'] | |
if (mod_date_doc < mod_date_dup): | |
return True | |
else: | |
return False | |
def time_string(t_0): | |
delta = datetime.now() - t_0 | |
# Python 2.7 could use delta.total_seconds(), but we need this formula for backward compatibility with Python 2.6 | |
#t = delta.total_seconds() | |
t = (delta.microseconds + (delta.seconds + delta.days * 24 * 3600) * 10**6) / 10**6 | |
return str(timedelta(seconds=t)) | |
def merge_sp(): | |
n = 0 | |
n_dup = 0 | |
skipped = 0 | |
print '----- start processing' | |
t_0 = datetime.now() | |
for doc in db[COL].find({'_id.servicePath': '/'}): | |
n += 1 | |
id = doc['_id']['id'] | |
if doc['_id'].has_key('type'): | |
type = doc['_id']['type'] | |
else: | |
type = '' | |
if type != '': | |
dup = db[COL].find_one({'_id.id': id, '_id.type': type, '_id.servicePath': None}) | |
else: | |
dup = db[COL].find_one({'_id.id': id, '_id.type': None, '_id.servicePath': None}) | |
if dup != None: | |
n_dup += 1 | |
# Sanity check: doc should be newer than dup | |
delta = doc['modDate'] - dup['modDate'] | |
if delta < 0: | |
warn(n_dup, 'dup for <%s, %s> is newer than not dup. Skipping.' % (n_dup, id, type)) | |
skipped += 1 | |
continue | |
print '%5d: candidate to merge for <%s, %s>, time offset: %s' % (n_dup, id, type, str(timedelta(seconds=delta))) | |
doc_attrs = doc['attrs'] | |
dup_attrs = dup['attrs'] | |
# More sanity checks. In release 0.17.0 attribute type was removed from attribute identification, but | |
# some old entity document in this way may exist | |
if (duplicated_attr_name(n_dup, doc_attrs)): | |
warn(n_dup, 'duplicated attribute in doc. Skipping.') | |
skipped += 1 | |
continue | |
if (duplicated_attr_name(n_dup, dup_attrs)): | |
warn(n_dup, 'duplicated attribute in dup. Skipping.') | |
skipped += 1 | |
continue | |
# Parse each attribute in dup. If the attribute is not found in the document, then it is added | |
attrs_to_push = [] | |
for attr in dup['attrs']: | |
if not has_attr(attr, doc_attrs): | |
print '%5d: attribute to append <%s>' % (n_dup, attr['name']) | |
attrs_to_push.append(attr) | |
else: | |
# Sanity check: modData in the attribute in doc should be newer than modData in dup | |
if is_attr_in_doc_older(attr, doc, dup): | |
warn(n_dup, 'attribute %s is older in doc than in dup. Skipping' % attr) | |
skipped += 1 | |
continue | |
if (len(attrs_to_push) > 0): | |
try: | |
# Pushing attributes | |
print '%5d: pushing %d attributes' % (n_dup, len(attrs_to_push)) | |
if type != '': | |
db[COL].update({'_id.id': id, '_id.type': type, '_id.servicePath': '/'}, {'$push': {'attrs': {'$each': attrs_to_push }}}) | |
else: | |
db[COL].update({'_id.id': id, '_id.type': None, '_id.servicePath': '/'}, {'$push': {'attrs': {'$each': attrs_to_push }}}) | |
except: | |
print '%5d: ERROR pushing attributes at DB: %s' % (n_dup, str(sys.exc_info()[:2])) | |
global need_fix | |
need_fix = True | |
continue | |
else: | |
# No attributes to push | |
print '%5d: no attributes to push, just remove the old duplicate' % n_dup | |
try: | |
# Removing dup | |
print '%5d: removing duplicate' % n_dup | |
if type != '': | |
db[COL].remove({'_id.id': id, '_id.type': type, '_id.servicePath': None}) | |
else: | |
db[COL].remove({'_id.id': id, '_id.type': None, '_id.servicePath': None}) | |
except: | |
print '%5d: ERROR removing dup at DB: %s' % (n_dup, sys.exc_info()[:2]) | |
global need_fix | |
need_fix = True | |
continue | |
if (n % 1000 == 0): | |
print '----- processed %s (time: %s)' % (n, time_string(t_0)) | |
print '----- processed in total: %d (total time: %s)' % (n, time_string(t_0)) | |
print '----- dup processed: %d' % n_dup | |
print '----- skipped: %d' % skipped | |
# Warn user | |
print "WARNING!!!! This script modifies your '%s' database. It is STRONGLY RECOMMENDED that you" % DB | |
print "do a backup of your database before using it as described in https://forge.fiware.org/plugins/mediawiki/wiki/fiware/index.php/Publish/Subscribe_Broker_-_Orion_Context_Broker_-_Installation_and_Administration_Guide#Backup. Use this script at your own risk." | |
print "If you are sure you want to continue type 'yes' and press Enter" | |
confirm = raw_input() | |
if (confirm != 'yes'): | |
sys.exit() | |
merge_sp() | |
if need_fix: | |
print "------------------------------------------------------" | |
print "WARNING: some problem was found during the process. Please, check the documentation at https://forge.fiware.org/plugins/mediawiki/wiki/fiware/index.php/Publish/Subscribe_Broker_-_Orion_Context_Broker_-_Installation_and_Administration_Guide#Upgrading_to_0.19.0_and_beyond_from_any_pre-0.19.0_version for solving it" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment