flows: fix exporting and importing for models with multiple unique fields

This commit is contained in:
Jens Langhammer
2020-09-06 01:07:06 +02:00
parent 268de20872
commit dd017e7190
15 changed files with 280 additions and 79 deletions

View File

@ -11,10 +11,10 @@ from passbook.lib.sentry import SentryIgnoredException
def get_attrs(obj: SerializerModel) -> Dict[str, Any]:
"""Get object's attributes via their serializer, and covert it to a normal dict"""
data = dict(obj.serializer(obj).data)
if "policies" in data:
data.pop("policies")
if "stages" in data:
data.pop("stages")
to_remove = ("policies", "stages", "pk")
for to_remove_name in to_remove:
if to_remove_name in data:
data.pop(to_remove_name)
return data
@ -22,17 +22,26 @@ def get_attrs(obj: SerializerModel) -> Dict[str, Any]:
class FlowBundleEntry:
"""Single entry of a bundle"""
identifier: str
identifiers: Dict[str, Any]
model: str
attrs: Dict[str, Any]
@staticmethod
def from_model(model: SerializerModel) -> "FlowBundleEntry":
def from_model(
model: SerializerModel, *extra_identifier_names: str
) -> "FlowBundleEntry":
"""Convert a SerializerModel instance to a Bundle Entry"""
identifiers = {
"pk": model.pk,
}
all_attrs = get_attrs(model)
for extra_identifier_name in extra_identifier_names:
identifiers[extra_identifier_name] = all_attrs.pop(extra_identifier_name)
return FlowBundleEntry(
identifier=model.pk,
identifiers=identifiers,
model=f"{model._meta.app_label}.{model._meta.model_name}",
attrs=get_attrs(model),
attrs=all_attrs,
)

View File

@ -28,13 +28,13 @@ class FlowExporter:
for stage in stages:
if isinstance(stage, PromptStage):
pass
yield FlowBundleEntry.from_model(stage)
yield FlowBundleEntry.from_model(stage, "name")
def walk_stage_bindings(self) -> Iterator[FlowBundleEntry]:
"""Convert all bindings attached to self.flow into FlowBundleEntry objects"""
bindings = FlowStageBinding.objects.filter(target=self.flow).select_related()
for binding in bindings:
yield FlowBundleEntry.from_model(binding)
yield FlowBundleEntry.from_model(binding, "target", "stage", "order")
def walk_policies(self) -> Iterator[FlowBundleEntry]:
"""Walk over all policies and their respective bindings"""
@ -64,7 +64,7 @@ class FlowExporter:
def export(self) -> FlowBundle:
"""Create a list of all objects including the flow"""
bundle = FlowBundle()
bundle.entries.append(FlowBundleEntry.from_model(self.flow))
bundle.entries.append(FlowBundleEntry.from_model(self.flow, "slug"))
if self.with_stage_prompts:
bundle.entries.extend(self.walk_stage_prompts())
bundle.entries.extend(self.walk_stages())

View File

@ -1,12 +1,14 @@
"""Flow importer"""
from json import loads
from typing import Type
from typing import Any, Dict
from dacite import from_dict
from dacite.exceptions import DaciteError
from django.apps import apps
from django.db import transaction
from django.db.models import Model
from django.db.models.query_utils import Q
from rest_framework.exceptions import ValidationError
from rest_framework.serializers import BaseSerializer, Serializer
from structlog import BoundLogger, get_logger
@ -17,7 +19,7 @@ from passbook.flows.transfer.common import (
FlowBundleEntry,
)
from passbook.lib.models import SerializerModel
from passbook.policies.models import Policy, PolicyBinding, PolicyBindingModel
from passbook.policies.models import Policy, PolicyBinding
from passbook.stages.prompt.models import Prompt
ALLOWED_MODELS = (Flow, FlowStageBinding, Stage, Policy, PolicyBinding, Prompt)
@ -28,49 +30,42 @@ class FlowImporter:
__import: FlowBundle
__pk_map: Dict[Any, Model]
logger: BoundLogger
def __init__(self, json_input: str):
self.logger = get_logger()
self.__pk_map = {}
import_dict = loads(json_input)
try:
self.__import = from_dict(FlowBundle, import_dict)
except DaciteError as exc:
raise EntryInvalidError from exc
def validate(self) -> bool:
"""Validate loaded flow export, ensure all models are allowed
and serializers have no errors"""
if self.__import.version != 1:
self.logger.warning("Invalid bundle version")
return False
for entry in self.__import.entries:
try:
self._validate_single(entry)
except EntryInvalidError as exc:
self.logger.warning(exc)
return False
return True
def __update_pks_for_attrs(self, attrs: Dict[str, Any]) -> Dict[str, Any]:
"""Replace any value if it is a known primary key of an other object"""
for key, value in attrs.items():
if isinstance(value, (list, dict)):
continue
if value in self.__pk_map:
attrs[key] = self.__pk_map[value]
self.logger.debug(
"updating reference in entry", key=key, new_value=attrs[key]
)
return attrs
def __get_pk_filed(self, model_class: Type[Model]) -> str:
fields = model_class._meta.get_fields()
pks = []
for field in fields:
# Ignore base PK from pbm as that isn't the same pk we exported
if field.model in [PolicyBindingModel]:
def __query_from_identifier(self, attrs: Dict[str, Any]) -> Q:
"""Generate an or'd query from all identifiers in an entry"""
# Since identifiers can also be pk-references to other objects (see FlowStageBinding)
# we have to ensure those references are also replaced
main_query = Q(pk=attrs["pk"])
sub_query = Q()
for identifier, value in attrs.items():
if identifier == "pk":
continue
# Ignore primary keys with _ptr suffix as those are surrogate and not what we exported
if field.name.endswith("_ptr"):
continue
if hasattr(field, "primary_key"):
if field.primary_key:
pks.append(field.name)
if len(pks) > 1:
self.logger.debug(
"Found more than one fields with primary_key=True, using pk", pks=pks
)
return "pk"
return pks[0]
sub_query &= Q(**{identifier: value})
return main_query | sub_query
def _validate_single(self, entry: FlowBundleEntry) -> BaseSerializer:
"""Validate a single entry"""
@ -82,43 +77,53 @@ class FlowImporter:
# If we try to validate without referencing a possible instance
# we'll get a duplicate error, hence we load the model here and return
# the full serializer for later usage
existing_models = model.objects.filter(pk=entry.identifier)
serializer_kwargs = {"data": entry.attrs}
# Because a model might have multiple unique columns, we chain all identifiers together
# to create an OR query.
updated_identifiers = self.__update_pks_for_attrs(entry.identifiers)
existing_models = model.objects.filter(
self.__query_from_identifier(updated_identifiers)
)
serializer_kwargs = {}
if existing_models.exists():
model_instance = existing_models.first()
self.logger.debug(
"initialise serializer with instance", instance=existing_models.first()
"initialise serializer with instance",
model=model,
instance=model_instance,
pk=model_instance.pk,
)
serializer_kwargs["instance"] = existing_models.first()
serializer_kwargs["instance"] = model_instance
else:
self.logger.debug("initialise new instance", pk=entry.identifier)
self.logger.debug(
"initialise new instance", model=model, **updated_identifiers
)
full_data = self.__update_pks_for_attrs(entry.attrs)
full_data.update(updated_identifiers)
serializer_kwargs["data"] = full_data
serializer: Serializer = model().serializer(**serializer_kwargs)
is_valid = serializer.is_valid()
if not is_valid:
raise EntryInvalidError(f"Serializer errors {serializer.errors}")
if not existing_models.exists():
# only insert the PK if we're creating a new model, otherwise we get
# an integrity error
model_pk = self.__get_pk_filed(model)
serializer.validated_data[model_pk] = entry.identifier
try:
serializer.is_valid(raise_exception=True)
except ValidationError as exc:
raise EntryInvalidError(f"Serializer errors {serializer.errors}") from exc
return serializer
def apply(self) -> bool:
"""Apply (create/update) flow json, in database transaction"""
transaction.set_autocommit(False)
sid = transaction.savepoint()
successful = self._apply_models()
if not successful:
self.logger.debug("Reverting changes due to error")
transaction.rollback()
transaction.set_autocommit(True)
transaction.savepoint_rollback(sid)
return False
self.logger.debug("Committing changes")
transaction.commit()
transaction.set_autocommit(True)
transaction.savepoint_commit(sid)
return True
def _apply_models(self) -> bool:
"""Apply (create/update) flow json"""
self.__pk_map = {}
for entry in self.__import.entries:
model_app_label, model_name = entry.model.split(".")
model: SerializerModel = apps.get_model(model_app_label, model_name)
@ -130,5 +135,20 @@ class FlowImporter:
return False
model = serializer.save()
self.__pk_map[entry.identifiers["pk"]] = model.pk
self.logger.debug("updated model", model=model, pk=model.pk)
return True
def validate(self) -> bool:
"""Validate loaded flow export, ensure all models are allowed
and serializers have no errors"""
self.logger.debug("Starting flow import validaton")
if self.__import.version != 1:
self.logger.warning("Invalid bundle version")
return False
sid = transaction.savepoint()
successful = self._apply_models()
if not successful:
self.logger.debug("Flow validation failed")
transaction.savepoint_rollback(sid)
return successful