|
| 1 | +import collections.abc |
| 2 | +import contextlib |
| 3 | +import copy |
| 4 | +import datetime |
| 5 | +import types |
| 6 | +import typing |
| 7 | +import uuid |
| 8 | + |
| 9 | +from django.apps.registry import apps as django_apps |
| 10 | +from django.core.exceptions import ObjectDoesNotExist |
| 11 | +from django.db import models |
| 12 | +from django.db.models.fields.files import FieldFile |
| 13 | +from django.forms import model_to_dict |
| 14 | + |
| 15 | + |
| 16 | +def arbitrary_value_to_basic_type(value: typing.Any) -> str | int | float | bool | None: |
| 17 | + """Convert an arbitrary value to a basic type that can be JSON serialized.""" |
| 18 | + if isinstance(value, (int, float, bool)): |
| 19 | + return value |
| 20 | + elif isinstance(value, str): |
| 21 | + return value |
| 22 | + elif isinstance(value, (datetime.datetime, datetime.date, datetime.time)): |
| 23 | + return value.isoformat() |
| 24 | + elif isinstance(value, uuid.UUID): |
| 25 | + return str(value) |
| 26 | + elif isinstance(value, FieldFile): |
| 27 | + return value.name |
| 28 | + elif value is None: |
| 29 | + return None |
| 30 | + else: |
| 31 | + raise TypeError(f"Unsupported type for JSON serialization: {type(value)}") |
| 32 | + |
| 33 | + |
| 34 | +def model_to_identifier(instance: models.Model) -> str: |
| 35 | + return f"mdl:{instance._meta.app_label}:{instance._meta.model_name}:{instance.pk}" |
| 36 | + |
| 37 | + |
| 38 | +def identifier_to_model(identifier: str) -> models.Model | None: |
| 39 | + if not identifier.startswith("mdl:"): |
| 40 | + raise ValueError(f"Invalid model identifier: {identifier}") |
| 41 | + |
| 42 | + with contextlib.suppress(ValueError, LookupError, ObjectDoesNotExist): |
| 43 | + _, app_label, model_name, pk = identifier.split(":", 3) |
| 44 | + model_class: type[models.Model] = django_apps.get_model(app_label, model_name) |
| 45 | + return model_class.objects.get(pk=pk) |
| 46 | + |
| 47 | + return None |
| 48 | + |
| 49 | + |
| 50 | +def _model_to_jsonable_dict( # noqa: C901 |
| 51 | + instance: models.Model, |
| 52 | + converted_models: dict[str, dict[str, typing.Any]], |
| 53 | + exclude: set[str] | None = None, |
| 54 | + nested: bool = False, |
| 55 | +) -> dict: |
| 56 | + all_fields = { |
| 57 | + z for z in {f.name for f in instance._meta.fields} | set(instance._meta.fields_map.keys()) if z != "+" |
| 58 | + } |
| 59 | + exclude = {"created_at", "created_by", "updated_at", "updated_by", "deleted_at", "deleted_by"} | (exclude or set()) |
| 60 | + |
| 61 | + model_dict = model_to_dict(instance, exclude=exclude) |
| 62 | + jsonable_model_dict: dict[str, str | int | float | bool | None] = { |
| 63 | + "id": str(instance.pk) if isinstance(instance.pk, uuid.UUID) else instance.pk, |
| 64 | + "pk": str(instance.pk) if isinstance(instance.pk, uuid.UUID) else instance.pk, |
| 65 | + "_meta": types.SimpleNamespace( |
| 66 | + app_label=instance._meta.app_label, |
| 67 | + model_name=instance._meta.model_name, |
| 68 | + verbose_name=instance._meta.verbose_name, |
| 69 | + verbose_name_plural=instance._meta.verbose_name_plural, |
| 70 | + ), |
| 71 | + } |
| 72 | + |
| 73 | + for field, value in model_dict.items(): |
| 74 | + if isinstance(value, (datetime.datetime, datetime.date, datetime.time)): |
| 75 | + jsonable_model_dict[field] = value.isoformat() |
| 76 | + elif isinstance(value, (uuid.UUID, int, str)): |
| 77 | + # Is this field just a UUID | int | str, or is it a ForeignKey/OneToOneField? |
| 78 | + # django.forms.model_to_dict will return a UUID for Proxy model fields, |
| 79 | + # so we need to check if getattr(instance, field) is a model.Model instance. |
| 80 | + model_attr = getattr(instance, field, None) |
| 81 | + if isinstance(model_attr, models.Model): |
| 82 | + key = model_to_identifier(model_attr) |
| 83 | + |
| 84 | + jsonable_model_dict[f"{field}_id"] = ( |
| 85 | + str(model_attr.pk) if isinstance(model_attr.pk, uuid.UUID) else model_attr.pk |
| 86 | + ) |
| 87 | + jsonable_model_dict[field] = key |
| 88 | + if key not in converted_models: |
| 89 | + converted_models[key] = _model_to_jsonable_dict(model_attr, converted_models, None, True) |
| 90 | + else: |
| 91 | + # If it's just a UUID | int field, we can store it directly. |
| 92 | + jsonable_model_dict[field] = str(value) if isinstance(value, uuid.UUID) else value |
| 93 | + elif isinstance(value, models.Model): |
| 94 | + # simple ForeignKey or OneToOneField case. |
| 95 | + key = model_to_identifier(value) |
| 96 | + jsonable_model_dict[f"{field}_id"] = str(value.pk) if isinstance(value.pk, uuid.UUID) else value.pk |
| 97 | + jsonable_model_dict[field] = key |
| 98 | + if key not in converted_models: |
| 99 | + converted_models[key] = _model_to_jsonable_dict(value, converted_models, None, True) |
| 100 | + elif isinstance(value, (float, bool, str)) or value is None: |
| 101 | + jsonable_model_dict[field] = value |
| 102 | + elif isinstance(value, FieldFile): |
| 103 | + jsonable_model_dict[field] = value.name |
| 104 | + elif isinstance(value, collections.abc.Iterable): |
| 105 | + # Is this field a ManyToOneRel | ManyToManyField, or is it a json or array field? |
| 106 | + model_attr = getattr(instance, field, None) |
| 107 | + if isinstance(model_attr, models.manager.BaseManager): |
| 108 | + value = typing.cast(collections.abc.Iterable[models.Model], value) |
| 109 | + jsonable_value = [] |
| 110 | + for v in value: |
| 111 | + key = model_to_identifier(v) |
| 112 | + jsonable_value.append(key) |
| 113 | + if key not in converted_models: |
| 114 | + converted_models[key] = _model_to_jsonable_dict(v, converted_models, None, True) |
| 115 | + jsonable_model_dict[field] = jsonable_value |
| 116 | + else: |
| 117 | + jsonable_model_dict[field] = [arbitrary_value_to_basic_type(v) for v in value] |
| 118 | + else: |
| 119 | + raise TypeError(f"Unsupported field type: {type(value)} for field '{field}'") |
| 120 | + |
| 121 | + if not nested: |
| 122 | + for leftover_field in all_fields - exclude - jsonable_model_dict.keys(): |
| 123 | + # Possibly a many-to-many or many-to-one relation that was not included in the model_to_dict output. |
| 124 | + model_attr = getattr(instance, leftover_field, None) |
| 125 | + if not isinstance(model_attr, models.manager.BaseManager): |
| 126 | + continue |
| 127 | + if not (model_attr_objs := model_attr.all()): |
| 128 | + continue |
| 129 | + |
| 130 | + model_identifier_list = [] |
| 131 | + for model in model_attr_objs: |
| 132 | + key = model_to_identifier(model) |
| 133 | + model_identifier_list.append(key) |
| 134 | + if key not in converted_models: |
| 135 | + converted_models[key] = _model_to_jsonable_dict(model, converted_models, None, True) |
| 136 | + jsonable_model_dict[leftover_field] = model_identifier_list |
| 137 | + |
| 138 | + return jsonable_model_dict |
| 139 | + |
| 140 | + |
| 141 | +def model_to_jsonable_dict(instance: models.Model): |
| 142 | + key = model_to_identifier(instance) |
| 143 | + converted_models: dict[str, dict[str, typing.Any]] = {} |
| 144 | + converted_models[key] = _model_to_jsonable_dict(instance, converted_models) |
| 145 | + return {"key": key, "model_data": converted_models} |
| 146 | + |
| 147 | + |
| 148 | +def get_diff_data_from_jsonized_models( |
| 149 | + models_asis: dict[str, dict[str, typing.Any]], |
| 150 | + models_tobe: dict[str, dict[str, typing.Any]], |
| 151 | +) -> dict[str, dict[str, typing.Any]]: |
| 152 | + diff_models_data: dict[str, dict[str, typing.Any]] = {} |
| 153 | + |
| 154 | + for model_identifier in set(models_asis.keys()).union(models_tobe.keys()): |
| 155 | + model_asis = models_asis.get(model_identifier, {}) |
| 156 | + model_tobe = models_tobe.get(model_identifier, {}) |
| 157 | + if not (model_asis and model_tobe): |
| 158 | + continue |
| 159 | + |
| 160 | + diff_models_data[model_identifier] = {} |
| 161 | + # Compare the contents of the models |
| 162 | + for field_name in set(model_asis.keys()).union(model_tobe.keys()): |
| 163 | + if field_name in model_asis and field_name not in model_tobe: |
| 164 | + continue |
| 165 | + if field_name not in model_asis and field_name in model_tobe: |
| 166 | + diff_models_data[model_identifier][field_name] = model_tobe[field_name] |
| 167 | + continue |
| 168 | + |
| 169 | + value_a = model_asis[field_name] |
| 170 | + value_b = model_tobe[field_name] |
| 171 | + if value_a == value_b: |
| 172 | + continue |
| 173 | + if type(value_a) != type(value_b): # noqa: E721 |
| 174 | + raise TypeError( |
| 175 | + f"Type mismatch for field '{field_name}' in model '{model_identifier}': " |
| 176 | + f"{type(value_a)} != {type(value_b)}" |
| 177 | + ) |
| 178 | + diff_models_data[model_identifier][field_name] = value_b |
| 179 | + |
| 180 | + return {k: v for k, v in diff_models_data.items() if v} |
| 181 | + |
| 182 | + |
| 183 | +def apply_diff_to_jsonized_models( |
| 184 | + models_asis: dict[str, dict[str, typing.Any]], |
| 185 | + models_diff: dict[str, dict[str, typing.Any]], |
| 186 | +) -> dict[str, dict[str, typing.Any]]: |
| 187 | + updated_models = copy.deepcopy(models_asis) |
| 188 | + |
| 189 | + for model_identifier, diff_data in models_diff.items(): |
| 190 | + if model_identifier not in updated_models: |
| 191 | + updated_models[model_identifier] = diff_data |
| 192 | + continue |
| 193 | + |
| 194 | + if not isinstance(updated_models[model_identifier], dict): |
| 195 | + raise TypeError( |
| 196 | + f"Expected a dict for model '{model_identifier}', got {type(updated_models[model_identifier])}" |
| 197 | + ) |
| 198 | + |
| 199 | + for field_name, new_value in diff_data.items(): |
| 200 | + updated_models[model_identifier][field_name] = new_value |
| 201 | + |
| 202 | + return updated_models |
| 203 | + |
| 204 | + |
| 205 | +def json_to_simplenamespace(model_data: dict[str, dict[str, typing.Any]], key: str) -> types.SimpleNamespace: |
| 206 | + # Resolve models first |
| 207 | + resolved_models: dict[str, types.SimpleNamespace] = {} |
| 208 | + for model_identifier, model_datum in model_data.items(): |
| 209 | + resolved_models[model_identifier] = types.SimpleNamespace(**model_datum) |
| 210 | + # link identifiers in resolved models to their SimpleNamespace instances |
| 211 | + for resolved_model in resolved_models.values(): |
| 212 | + for attr_name, attr_value in resolved_model.__dict__.items(): |
| 213 | + if isinstance(attr_value, str) and attr_value.startswith("mdl:"): |
| 214 | + setattr(resolved_model, attr_name, resolved_models[attr_value]) |
| 215 | + elif isinstance(attr_value, list) and all( |
| 216 | + isinstance(item, str) and item.startswith("mdl:") for item in attr_value |
| 217 | + ): |
| 218 | + resolved_many_rel_models = [resolved_models[item] for item in attr_value] |
| 219 | + setattr(resolved_model, attr_name, resolved_many_rel_models) |
| 220 | + |
| 221 | + return resolved_models[key] |
| 222 | + |
| 223 | + |
| 224 | +def apply_diff_to_model(models_data: dict[str, dict[str, typing.Any]]) -> list[models.Model]: |
| 225 | + result_instances: list[models.Model] = [] |
| 226 | + |
| 227 | + for model_identifier, model_data in models_data.items(): |
| 228 | + if not (model_instance := identifier_to_model(model_identifier)): |
| 229 | + raise ValueError(f"Model class not found for identifier: {model_identifier}") |
| 230 | + |
| 231 | + # Apply the data to the model instance |
| 232 | + for field_name, value in model_data.items(): |
| 233 | + if isinstance(value, str) and value.startswith("mdl:"): |
| 234 | + # If the value is a model identifier, resolve it to a model instance |
| 235 | + if not (related_model_instance := identifier_to_model(value)): |
| 236 | + raise ValueError(f"Related model not found for identifier: {value}") |
| 237 | + setattr(model_instance, field_name, related_model_instance) |
| 238 | + elif isinstance(value, list) and all(isinstance(item, str) and item.startswith("mdl:") for item in value): |
| 239 | + # If the value is a list of model identifiers, resolve them to model instances |
| 240 | + related_model_instances = [identifier_to_model(item) for item in value] |
| 241 | + if None in related_model_instances: |
| 242 | + raise ValueError(f"One or more related models not found for identifiers: {value}") |
| 243 | + |
| 244 | + old_related_models = {item.pk: item for item in getattr(model_instance, field_name, [])} |
| 245 | + new_related_models = {item.pk: item for item in related_model_instances} |
| 246 | + |
| 247 | + field = getattr(model_instance, field_name) |
| 248 | + for del_pk in old_related_models.keys() - new_related_models.keys(): |
| 249 | + field.remove(old_related_models[del_pk]) |
| 250 | + for add_pk in new_related_models.keys() - old_related_models.keys(): |
| 251 | + field.add(new_related_models[add_pk]) |
| 252 | + else: |
| 253 | + setattr(model_instance, field_name, value) |
| 254 | + |
| 255 | + model_instance.save() |
| 256 | + result_instances.append(model_instance) |
| 257 | + |
| 258 | + return result_instances |
0 commit comments