Source code for networkapi.util.signals_helper

# -*- coding: utf-8 -*-
# copyright: https://github.com/leandrosouza/django-simple-audit
from __future__ import absolute_import
from __future__ import unicode_literals

import copy
import logging
from pprint import pprint

LOG = logging.getLogger(__name__)


[docs]def ValuesQuerySetToDict(vqs): """converts a ValuesQuerySet to Dict""" return [item for item in vqs]
[docs]def get_m2m_fields_for(instance=None): """gets m2mfields for instance""" return instance._meta._many_to_many()
[docs]def get_m2m_values_for(instance=None): values = {} for m2m_field in get_m2m_fields_for(instance=instance): values[m2m_field.verbose_name] = ValuesQuerySetToDict( m2m_field._get_val_from_obj(instance).values()) return copy.deepcopy(values)
[docs]def normalize_dict(d): """removes datetime objects and passwords""" for k in d.keys(): if d.get(k).find('password') != -1: d[k] = 'xxxxx' return d
[docs]def m2m_proccess_diff_states(old, new): """ old and new are dictionaries in the following format old: {u'toppings.1': {u'id': [1, None], 'name': [u'ceboloa', None]}, u'toppings.11': {u'id': [11, None], 'name': [u'abacate', None]}, u'toppings.5': {u'id': [5, None], 'name': [u'cogumelo', None]}, u'toppings.6': {u'id': [6, None], 'name': [u'abobrinha', None]}, u'toppings.8': {u'id': [8, None], 'name': [u'codorna', None]}, u'toppings.9': {u'id': [9, None], 'name': [u'banana', None]}} new: {u'toppings.1': {u'id': [None, 1], 'name': [None, u'ceboloa']}, u'toppings.10': {u'id': [None, 10], 'name': [None, u'abacaxi']}, u'toppings.5': {u'id': [None, 5], 'name': [None, u'cogumelo']}, u'toppings.6': {u'id': [None, 6], 'name': [None, u'abobrinha']}, u'toppings.8': {u'id': [None, 8], 'name': [None, u'codorna']}, u'toppings.9': {u'id': [None, 9], 'name': [None, u'banana']}} """ # print "old..." # pprint(old) # print "^^^" # print "new..." # pprint(new) # print "^^^" diff = copy.deepcopy(old) new_copy = copy.deepcopy(new) for field_id in old.keys(): old_ = old[field_id] if field_id in new_copy: for k in old_.keys(): try: diff[field_id][k][1] = new_copy[field_id][k][1] except: pass del new_copy[field_id] # add remaining items in new_copy to diff for field_id in new_copy.keys(): new_ = new_copy[field_id] diff[field_id] = new_ return diff
[docs]def m2m_clean_unchanged_fields(dict_diff): """ returns a list of dicts with only the changes """ dict_list = [] for key in dict_diff.keys(): new_dict = {} dict_ = dict_diff.get(key) for value in dict_.keys(): compound_key = '%s.%s' % (key, value) if dict_[value][0] == dict_[value][1]: del dict_[value] else: new_dict[compound_key] = dict_[value] del dict_diff[key] if new_dict: dict_list.append(new_dict) return dict_list
[docs]def m2m_dict_diff(old, new): # old is empty? swap = False if not old: # set old to new, then swap elements at the end old = new new = {} swap = True # first create empty diff based in old state field_name = None diff_old = {} diff_new = {} for key in old.keys(): field_name = key ########### # oldstate ########## id_ = 0 for item in old[key]: empty_dict = {} for key_ in item.keys(): if key_ == 'id': id_ = item[key_] # when old is empty, the dicts are swapped if swap: empty_dict[key_] = [None, item[key_]] else: empty_dict[key_] = [item[key_], None] diff_old['%s.%s' % (field_name, id_)] = empty_dict ############ # new state ############ id_ = 0 for item in new.get(key, {}): empty_dict = {} for key_ in item.keys(): if key_ == 'id': id_ = item[key_] empty_dict[key_] = [None, item[key_]] diff_new['%s.%s' % (field_name, id_)] = empty_dict if swap: diff_old, diff_new = diff_new, diff_old diff = m2m_proccess_diff_states(diff_old, diff_new) diff = m2m_clean_unchanged_fields(diff) if diff: LOG.debug('m2m diff cleaned: %s' % pprint(diff)) return diff
[docs]def persist_m2m_audit(): pass