Skip to content

API Reference

_engine.py

Module implementing the rules engine.

Class: RulesEngine

RulesEngine

The Rules Engine is in charge of executing different groups of rules of a given rule set on user input data.

Attributes:

Name Type Description
rules dict[str, dict[str, list[Rule]]]

A dictionary of rules with k: rule set, v: (k: rule group, v: list of rule instances).

Source code in arta/_engine.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
class RulesEngine:
    """The Rules Engine is in charge of executing different groups of rules of a given rule set on user input data.

    Attributes:
        rules:  A dictionary of rules with k: rule set, v: (k: rule group, v: list of rule instances).
    """

    # ==== Class constants ====

    # Rule related config keys
    CONST_RULE_SETS_CONF_KEY: str = "rules"
    CONST_DFLT_RULE_SET_ID: str = "default_rule_set"
    CONST_STD_RULE_CONDITION_CONF_KEY: str = "condition"
    CONST_ACTION_CONF_KEY: str = "action"
    CONST_ACTION_PARAMETERS_CONF_KEY: str = "action_parameters"

    # Condition related config keys
    CONST_STD_CONDITIONS_CONF_KEY: str = "conditions"
    CONST_CONDITION_VALIDATION_FUNCTION_CONF_KEY: str = "validation_function"
    CONST_CONDITION_DESCRIPTION_CONF_KEY: str = "description"
    CONST_CONDITION_VALIDATION_PARAMETERS_CONF_KEY: str = "condition_parameters"
    CONST_USER_CONDITION_STRING: str = "USER_CONDITION"

    # Built-in factory mapping
    BUILTIN_FACTORY_MAPPING: dict[str, type[BaseCondition]] = {
        "condition": StandardCondition,
        "simple_condition": SimpleCondition,
    }

    def __init__(
        self,
        *,
        rules_dict: dict[str, dict[str, Any]] | None = None,
        config_path: str | None = None,
        config_dict: dict[str, Any] | None = None,
    ) -> None:
        """Initialize the rules.

        3 possibilities: either one of 'rules_dict', or 'config_path', or 'config_dict'.

        Args:
            rules_dict: A dictionary containing the rules' definitions.
            config_path: Path of a directory containing the YAML files.
            config_dict: A dictionary containing the configuration (same as YAML files but already
                         parsed in a dictionary).

        Raises:
            KeyError: Key not found.
            TypeError: Wrong type.
            ValueError: Bad given parameters.
        """
        # Var init.
        factory_mapping_classes: dict[str, type[BaseCondition]] = {}
        std_condition_instances: dict[str, StandardCondition] = {}

        given_params: list[bool] = [config_path is not None, rules_dict is not None, config_dict is not None]

        if given_params.count(True) != 1:
            raise ValueError(
                "RulesEngine takes one (and only one) parameter: 'rules_dict' or 'config_path' or 'config_dict'."
            )

        # Init. default parsing_error_strategy (probably not needed because already defined elsewhere)
        self._parsing_error_strategy: ParsingErrorStrategy = ParsingErrorStrategy.RAISE

        # Initialize directly with a rules dict
        if rules_dict is not None:
            # Data validation
            RulesDict.parse_obj(rules_dict)

            # Edge cases data validation
            if not isinstance(rules_dict, dict):
                raise TypeError(f"'rules_dict' must be dict type, not '{type(rules_dict)}'")
            elif len(rules_dict) == 0:
                raise KeyError("'rules_dict' couldn't be empty.")

            # Attribute definition
            self.rules: dict[str, dict[str, list[Rule]]] = self._adapt_user_rules_dict(rules_dict)

        # Initialize with a config_path or config_dict
        else:
            if config_path is not None:
                # Load config in attribute
                config_dict = load_config(config_path)

            if config_dict is not None:
                # Data validation
                config: Configuration = Configuration(**config_dict)

            if config.parsing_error_strategy is not None:
                # Set parsing error handling strategy from config
                self._parsing_error_strategy = ParsingErrorStrategy(config.parsing_error_strategy)

            # dict of available action functions (k: function name, v: function object)
            action_modules: list[str] = config.actions_source_modules
            action_functions: dict[str, Callable] = self._get_object_from_source_modules(action_modules)

            # dict of available standard condition functions (k: function name, v: function object)
            condition_modules: list[str] = (
                config.conditions_source_modules if config.conditions_source_modules is not None else []
            )
            std_condition_functions: dict[str, Callable] = self._get_object_from_source_modules(condition_modules)

            # Dictionary of condition instances (k: condition id, v: instance), built from config data
            if len(std_condition_functions) > 0:
                std_condition_instances = self._build_std_conditions(
                    config=config.dict(), condition_functions_dict=std_condition_functions
                )

            # User-defined/custom conditions
            if config.condition_factory_mapping is not None and config.custom_classes_source_modules is not None:
                # dict of custom condition classes (k: classe name, v: class object)
                custom_condition_classes: dict[str, type[BaseCondition]] = self._get_object_from_source_modules(
                    config.custom_classes_source_modules
                )

                # Build a factory mapping dictionary (k: conf key, v:class object)
                factory_mapping_classes.update(
                    {
                        conf_key: custom_condition_classes[class_name]
                        for conf_key, class_name in config.condition_factory_mapping.items()
                    }
                )

            # Arta built-in conditions
            factory_mapping_classes.update(self.BUILTIN_FACTORY_MAPPING)

            # Attribute definition
            self.rules = self._build_rules(
                std_condition_instances=std_condition_instances,
                action_functions=action_functions,
                config=config.dict(),
                factory_mapping_classes=factory_mapping_classes,
            )

    def apply_rules(
        self,
        input_data: dict[str, Any],
        *,
        rule_set: str | None = None,
        ignored_rules: set[str] | None = None,
        verbose: bool = False,
        **kwargs: Any,
    ) -> dict[str, Any]:
        """Apply the rules and return results.

        For each rule group of a given rule set, rules are applied sequentially,
        The loop is broken when a rule is applied (an action is triggered).
        Then, the next rule group is evaluated.
        And so on...

        This means that the order of the rules in the configuration file
        (e.g., rules.yaml) is meaningful.

        Args:
            input_data: Input data to apply rules on.
            rule_set: Apply rules associated with the specified rule set.
            ignored_rules: A set/list of rule's ids to be ignored/disabled during evaluation.
            verbose: If True, add extra ids (group_id, rule_id) for result explicability.
            **kwargs: For user extra arguments.

        Returns:
            A dictionary containing the rule groups' results (k: group id, v: action result).

        Raises:
            TypeError: Wrong type (e.g., input_data is not a dictionary).
            KeyError: Key not found (e.g., input_data is an empty dictionary).
            RuleExecutionError: A rule fails during execution.
            ConditionExecutionError: A condition fails during execution.
        """
        # Input_data validation
        if not isinstance(input_data, dict):
            raise TypeError(f"'input_data' must be dict type, not '{type(input_data)}'")
        elif len(input_data) == 0:
            raise KeyError("'input_data' couldn't be empty.")

        # Var init.
        input_data_copy: dict[str, Any] = copy.deepcopy(input_data)
        ignored_ids: set[str] = ignored_rules if ignored_rules is not None else set()

        # Prepare the result key
        input_data_copy["output"] = {}

        # If there is no given rule set param. and there is only one rule set in self.rules
        # and its value is 'default_rule_set', look for this one (rule_set='default_rule_set')
        if rule_set is None and len(self.rules) == 1 and self.rules.get(self.CONST_DFLT_RULE_SET_ID) is not None:
            rule_set = self.CONST_DFLT_RULE_SET_ID

        # Check if given rule set is in self.rules?
        if rule_set not in self.rules:
            raise KeyError(
                f"Rule set '{rule_set}' not found in the rules, available rule sets are : {list(self.rules.keys())}."
            )

        # Var init.
        results_dict: dict[str, Any] = {"verbosity": {"rule_set": rule_set, "results": []}}

        # Groups' loop
        for group_id, rules_list in self.rules[rule_set].items():
            # Initialize result of the rule group with None
            results_dict[group_id] = None

            # Rules' loop (inside a group)
            for rule in rules_list:
                if rule._rule_id in ignored_ids:
                    # Ignore that rule
                    continue

                # Apply rules
                action_result, rule_details = rule.apply(
                    input_data_copy, parsing_error_strategy=self._parsing_error_strategy, **kwargs
                )

                # Check if the rule has been applied (= action activated)
                if "action_result" in rule_details:
                    # Save result and details
                    results_dict[group_id] = action_result
                    results_dict["verbosity"]["results"].append(rule_details)

                    # Update input data with current result with key 'output' (can be used in next rules)
                    input_data_copy["output"][group_id] = copy.deepcopy(results_dict[group_id])

                    # We can only have one result per group => break when "action_result" in rule_details
                    break

        # Handling non-verbose mode
        if not verbose:
            results_dict.pop("verbosity")

        return results_dict

    @staticmethod
    def _get_object_from_source_modules(module_list: list[str]) -> dict[str, Any]:
        """(Protected)
        Collect all functions defined in the list of modules.

        Args:
            module_list: List of source module names.

        Returns:
            Dictionary with objects found in the modules.
        """
        object_dict: dict[str, Any] = {}

        for module_name in module_list:
            # Import module
            mod: ModuleType = importlib.import_module(module_name)

            # Collect functions
            module_functions: dict[str, Any] = {key: val for key, val in getmembers(mod, isfunction)}
            object_dict.update(module_functions)

            # Collect classes
            module_classes: dict[str, Any] = {key: val for key, val in getmembers(mod, isclass)}
            object_dict.update(module_classes)

        return object_dict

    def _build_rules(
        self,
        std_condition_instances: dict[str, StandardCondition],
        action_functions: dict[str, Callable],
        config: dict[str, Any],
        factory_mapping_classes: dict[str, type[BaseCondition]],
    ) -> dict[str, dict[str, list[Any]]]:
        """(Protected)
        Return a dictionary of Rule instances built from the configuration.

        Args:
            rule_sets: Sets of rules to be loaded in the Rules Engine (as needed by further uses).
            std_condition_instances: Dictionary of condition instances (k: condition id, v: StandardCondition instance)
            actions_dict: Dictionary of action functions (k: action name, v: Callable)
            config: Dictionary of the imported configuration from yaml files.
            factory_mapping_classes: A mapping dictionary (k: condition conf. key, v: custom class object)

        Returns:
            A dictionary of rules.
        """
        # Var init.
        rules_dict: dict[str, dict[str, list[Any]]] = {}

        # Retrieve rule set ids from config
        rule_set_ids: list[str] = list(config[self.CONST_RULE_SETS_CONF_KEY].keys())

        # Going all way down to the rules (rule set > rule group > rule)
        for set_id in rule_set_ids:
            rules_conf: dict[str, Any] = config[self.CONST_RULE_SETS_CONF_KEY][set_id]
            rules_dict[set_id] = {}
            rule_set_dict: dict[str, list[Any]] = rules_dict[set_id]

            # Looping throught groups
            for group_id, group_rules in rules_conf.items():
                # Initialize list or rules in the group
                rule_set_dict[group_id] = []

                # Looping through rules (inside a group)
                for rule_id, rule_dict in group_rules.items():
                    # Get action function
                    action_function_name: str = rule_dict[self.CONST_ACTION_CONF_KEY]

                    if action_function_name not in action_functions:
                        raise KeyError(f"Unknwown action function : {action_function_name}")

                    action: Callable = action_functions[action_function_name]

                    # Look for condition conf. keys inside the rule
                    condition_conf_keys: set[str] = set(rule_dict.keys()) - {
                        self.CONST_ACTION_CONF_KEY,
                        self.CONST_ACTION_PARAMETERS_CONF_KEY,
                    }

                    # Store the cond. expressions with the same order as in the configuration file (very important)
                    condition_exprs: dict[str, str | None] = {
                        key: value for key, value in rule_dict.items() if key in condition_conf_keys
                    }

                    # Create the corresponding Rule instance
                    rule: Rule = Rule(
                        set_id=set_id,
                        group_id=group_id,
                        rule_id=rule_id,
                        action=action,
                        action_parameters=rule_dict[self.CONST_ACTION_PARAMETERS_CONF_KEY],
                        condition_exprs=condition_exprs,
                        std_condition_instances=std_condition_instances,
                        condition_factory_mapping=factory_mapping_classes,
                    )
                    rule_set_dict[group_id].append(rule)

        return rules_dict

    def _build_std_conditions(
        self, config: dict[str, Any], condition_functions_dict: dict[str, Callable]
    ) -> dict[str, StandardCondition]:
        """(Protected)
        Return a dictionary of Condition instances built from the configuration file.

        Args:
            config: Dictionary of the imported configuration from yaml files.
            condition_functions_dict: A dictionary where k:condition id, v:Callable (validation function).

        Returns:
            A dictionary of StandardCondition instances (k: condition id, v: StandardCondition instance).
        """
        # Var init.
        conditions_dict: dict[str, StandardCondition] = {}

        # Condition configuration (under conditions' key)
        conditions_conf: dict[str, dict[str, Any]] = config[self.CONST_STD_CONDITIONS_CONF_KEY]

        # Looping through conditions (inside a group)
        for condition_id, condition_params in conditions_conf.items():
            # Get condition validation function
            validation_function_name: str = condition_params[self.CONST_CONDITION_VALIDATION_FUNCTION_CONF_KEY]

            if validation_function_name not in condition_functions_dict:
                raise KeyError(f"Unknwown validation function : {validation_function_name}")

            # Get Callable from function name
            validation_function: Callable = condition_functions_dict[validation_function_name]

            # Create Condition instance
            condition_instance: StandardCondition = StandardCondition(
                condition_id=condition_id,
                description=condition_params[self.CONST_CONDITION_DESCRIPTION_CONF_KEY],
                validation_function=validation_function,
                validation_function_parameters=condition_params[self.CONST_CONDITION_VALIDATION_PARAMETERS_CONF_KEY],
            )
            conditions_dict[condition_id] = condition_instance

        return conditions_dict

    def _adapt_user_rules_dict(self, rules_dict: dict[str, dict[str, Any]]) -> dict[str, dict[str, list[Any]]]:
        """(Protected)
        Return a dictionary of Rule's instances built from user's rules dictionary.

        Args:
            rules_dict: User raw rules dictionary.

        Returns:
            A rules dictionary made from the user input rules.
        """
        # Var init.
        rules_dict_formatted: dict[str, list[Any]] = {}

        # Looping throught groups
        for group_id, group_rules in rules_dict.items():
            # Initialize list or rules in the group
            rules_dict_formatted[group_id] = []

            # Looping through rules (inside a group)
            for rule_id, rule_dict in group_rules.items():
                # Get action function
                action = rule_dict["action"]

                # Trigger if not **kwargs
                if "kwargs" not in inspect.signature(action).parameters:
                    raise KeyError(f"The action function {action} must have a '**kwargs' parameter.")

                # Create Rule instance
                rule = Rule(
                    set_id=self.CONST_DFLT_RULE_SET_ID,
                    group_id=group_id,
                    rule_id=rule_id,
                    action=action,
                    action_parameters=rule_dict.get(self.CONST_ACTION_PARAMETERS_CONF_KEY),
                    condition_exprs={self.CONST_STD_RULE_CONDITION_CONF_KEY: self.CONST_USER_CONDITION_STRING}
                    if self.CONST_STD_RULE_CONDITION_CONF_KEY in rule_dict
                    and rule_dict.get(self.CONST_STD_RULE_CONDITION_CONF_KEY) is not None
                    else {self.CONST_STD_RULE_CONDITION_CONF_KEY: None},
                    std_condition_instances={
                        self.CONST_USER_CONDITION_STRING: StandardCondition(
                            condition_id=self.CONST_USER_CONDITION_STRING,
                            description="Automatic description",
                            validation_function=rule_dict.get(self.CONST_STD_RULE_CONDITION_CONF_KEY),
                            validation_function_parameters=rule_dict.get(
                                self.CONST_CONDITION_VALIDATION_PARAMETERS_CONF_KEY
                            ),
                        )
                    },
                    condition_factory_mapping=self.BUILTIN_FACTORY_MAPPING,
                )
                rules_dict_formatted[group_id].append(rule)

        return {self.CONST_DFLT_RULE_SET_ID: rules_dict_formatted}

    def __str__(self) -> str:
        """Object human string representation (called by str()).

        Returns:
            A string representation of the instance.
        """
        # Vars init.
        attrs_str: str = ""

        # Get some instance attributes infos
        class_name: str = self.__class__.__name__
        attrs: list[tuple[str, Any]] = [
            attr
            for attr in inspect.getmembers(self)
            if not (
                attr[0].startswith("_")
                or attr[0].startswith("CONST_")
                or isinstance(attr[1], (FunctionType, MethodType))
            )
        ]

        # Build string representation
        for attr, val in attrs:
            attrs_str += f"{attr}={str(val)}, "

        return f"{class_name}({attrs_str})"

__init__(*, rules_dict=None, config_path=None, config_dict=None)

Initialize the rules.

3 possibilities: either one of 'rules_dict', or 'config_path', or 'config_dict'.

Parameters:

Name Type Description Default
rules_dict dict[str, dict[str, Any]] | None

A dictionary containing the rules' definitions.

None
config_path str | None

Path of a directory containing the YAML files.

None
config_dict dict[str, Any] | None

A dictionary containing the configuration (same as YAML files but already parsed in a dictionary).

None

Raises:

Type Description
KeyError

Key not found.

TypeError

Wrong type.

ValueError

Bad given parameters.

Source code in arta/_engine.py
def __init__(
    self,
    *,
    rules_dict: dict[str, dict[str, Any]] | None = None,
    config_path: str | None = None,
    config_dict: dict[str, Any] | None = None,
) -> None:
    """Initialize the rules.

    3 possibilities: either one of 'rules_dict', or 'config_path', or 'config_dict'.

    Args:
        rules_dict: A dictionary containing the rules' definitions.
        config_path: Path of a directory containing the YAML files.
        config_dict: A dictionary containing the configuration (same as YAML files but already
                     parsed in a dictionary).

    Raises:
        KeyError: Key not found.
        TypeError: Wrong type.
        ValueError: Bad given parameters.
    """
    # Var init.
    factory_mapping_classes: dict[str, type[BaseCondition]] = {}
    std_condition_instances: dict[str, StandardCondition] = {}

    given_params: list[bool] = [config_path is not None, rules_dict is not None, config_dict is not None]

    if given_params.count(True) != 1:
        raise ValueError(
            "RulesEngine takes one (and only one) parameter: 'rules_dict' or 'config_path' or 'config_dict'."
        )

    # Init. default parsing_error_strategy (probably not needed because already defined elsewhere)
    self._parsing_error_strategy: ParsingErrorStrategy = ParsingErrorStrategy.RAISE

    # Initialize directly with a rules dict
    if rules_dict is not None:
        # Data validation
        RulesDict.parse_obj(rules_dict)

        # Edge cases data validation
        if not isinstance(rules_dict, dict):
            raise TypeError(f"'rules_dict' must be dict type, not '{type(rules_dict)}'")
        elif len(rules_dict) == 0:
            raise KeyError("'rules_dict' couldn't be empty.")

        # Attribute definition
        self.rules: dict[str, dict[str, list[Rule]]] = self._adapt_user_rules_dict(rules_dict)

    # Initialize with a config_path or config_dict
    else:
        if config_path is not None:
            # Load config in attribute
            config_dict = load_config(config_path)

        if config_dict is not None:
            # Data validation
            config: Configuration = Configuration(**config_dict)

        if config.parsing_error_strategy is not None:
            # Set parsing error handling strategy from config
            self._parsing_error_strategy = ParsingErrorStrategy(config.parsing_error_strategy)

        # dict of available action functions (k: function name, v: function object)
        action_modules: list[str] = config.actions_source_modules
        action_functions: dict[str, Callable] = self._get_object_from_source_modules(action_modules)

        # dict of available standard condition functions (k: function name, v: function object)
        condition_modules: list[str] = (
            config.conditions_source_modules if config.conditions_source_modules is not None else []
        )
        std_condition_functions: dict[str, Callable] = self._get_object_from_source_modules(condition_modules)

        # Dictionary of condition instances (k: condition id, v: instance), built from config data
        if len(std_condition_functions) > 0:
            std_condition_instances = self._build_std_conditions(
                config=config.dict(), condition_functions_dict=std_condition_functions
            )

        # User-defined/custom conditions
        if config.condition_factory_mapping is not None and config.custom_classes_source_modules is not None:
            # dict of custom condition classes (k: classe name, v: class object)
            custom_condition_classes: dict[str, type[BaseCondition]] = self._get_object_from_source_modules(
                config.custom_classes_source_modules
            )

            # Build a factory mapping dictionary (k: conf key, v:class object)
            factory_mapping_classes.update(
                {
                    conf_key: custom_condition_classes[class_name]
                    for conf_key, class_name in config.condition_factory_mapping.items()
                }
            )

        # Arta built-in conditions
        factory_mapping_classes.update(self.BUILTIN_FACTORY_MAPPING)

        # Attribute definition
        self.rules = self._build_rules(
            std_condition_instances=std_condition_instances,
            action_functions=action_functions,
            config=config.dict(),
            factory_mapping_classes=factory_mapping_classes,
        )

__str__()

Object human string representation (called by str()).

Returns:

Type Description
str

A string representation of the instance.

Source code in arta/_engine.py
def __str__(self) -> str:
    """Object human string representation (called by str()).

    Returns:
        A string representation of the instance.
    """
    # Vars init.
    attrs_str: str = ""

    # Get some instance attributes infos
    class_name: str = self.__class__.__name__
    attrs: list[tuple[str, Any]] = [
        attr
        for attr in inspect.getmembers(self)
        if not (
            attr[0].startswith("_")
            or attr[0].startswith("CONST_")
            or isinstance(attr[1], (FunctionType, MethodType))
        )
    ]

    # Build string representation
    for attr, val in attrs:
        attrs_str += f"{attr}={str(val)}, "

    return f"{class_name}({attrs_str})"

apply_rules(input_data, *, rule_set=None, ignored_rules=None, verbose=False, **kwargs)

Apply the rules and return results.

For each rule group of a given rule set, rules are applied sequentially, The loop is broken when a rule is applied (an action is triggered). Then, the next rule group is evaluated. And so on...

This means that the order of the rules in the configuration file (e.g., rules.yaml) is meaningful.

Parameters:

Name Type Description Default
input_data dict[str, Any]

Input data to apply rules on.

required
rule_set str | None

Apply rules associated with the specified rule set.

None
ignored_rules set[str] | None

A set/list of rule's ids to be ignored/disabled during evaluation.

None
verbose bool

If True, add extra ids (group_id, rule_id) for result explicability.

False
**kwargs Any

For user extra arguments.

{}

Returns:

Type Description
dict[str, Any]

A dictionary containing the rule groups' results (k: group id, v: action result).

Raises:

Type Description
TypeError

Wrong type (e.g., input_data is not a dictionary).

KeyError

Key not found (e.g., input_data is an empty dictionary).

RuleExecutionError

A rule fails during execution.

ConditionExecutionError

A condition fails during execution.

Source code in arta/_engine.py
def apply_rules(
    self,
    input_data: dict[str, Any],
    *,
    rule_set: str | None = None,
    ignored_rules: set[str] | None = None,
    verbose: bool = False,
    **kwargs: Any,
) -> dict[str, Any]:
    """Apply the rules and return results.

    For each rule group of a given rule set, rules are applied sequentially,
    The loop is broken when a rule is applied (an action is triggered).
    Then, the next rule group is evaluated.
    And so on...

    This means that the order of the rules in the configuration file
    (e.g., rules.yaml) is meaningful.

    Args:
        input_data: Input data to apply rules on.
        rule_set: Apply rules associated with the specified rule set.
        ignored_rules: A set/list of rule's ids to be ignored/disabled during evaluation.
        verbose: If True, add extra ids (group_id, rule_id) for result explicability.
        **kwargs: For user extra arguments.

    Returns:
        A dictionary containing the rule groups' results (k: group id, v: action result).

    Raises:
        TypeError: Wrong type (e.g., input_data is not a dictionary).
        KeyError: Key not found (e.g., input_data is an empty dictionary).
        RuleExecutionError: A rule fails during execution.
        ConditionExecutionError: A condition fails during execution.
    """
    # Input_data validation
    if not isinstance(input_data, dict):
        raise TypeError(f"'input_data' must be dict type, not '{type(input_data)}'")
    elif len(input_data) == 0:
        raise KeyError("'input_data' couldn't be empty.")

    # Var init.
    input_data_copy: dict[str, Any] = copy.deepcopy(input_data)
    ignored_ids: set[str] = ignored_rules if ignored_rules is not None else set()

    # Prepare the result key
    input_data_copy["output"] = {}

    # If there is no given rule set param. and there is only one rule set in self.rules
    # and its value is 'default_rule_set', look for this one (rule_set='default_rule_set')
    if rule_set is None and len(self.rules) == 1 and self.rules.get(self.CONST_DFLT_RULE_SET_ID) is not None:
        rule_set = self.CONST_DFLT_RULE_SET_ID

    # Check if given rule set is in self.rules?
    if rule_set not in self.rules:
        raise KeyError(
            f"Rule set '{rule_set}' not found in the rules, available rule sets are : {list(self.rules.keys())}."
        )

    # Var init.
    results_dict: dict[str, Any] = {"verbosity": {"rule_set": rule_set, "results": []}}

    # Groups' loop
    for group_id, rules_list in self.rules[rule_set].items():
        # Initialize result of the rule group with None
        results_dict[group_id] = None

        # Rules' loop (inside a group)
        for rule in rules_list:
            if rule._rule_id in ignored_ids:
                # Ignore that rule
                continue

            # Apply rules
            action_result, rule_details = rule.apply(
                input_data_copy, parsing_error_strategy=self._parsing_error_strategy, **kwargs
            )

            # Check if the rule has been applied (= action activated)
            if "action_result" in rule_details:
                # Save result and details
                results_dict[group_id] = action_result
                results_dict["verbosity"]["results"].append(rule_details)

                # Update input data with current result with key 'output' (can be used in next rules)
                input_data_copy["output"][group_id] = copy.deepcopy(results_dict[group_id])

                # We can only have one result per group => break when "action_result" in rule_details
                break

    # Handling non-verbose mode
    if not verbose:
        results_dict.pop("verbosity")

    return results_dict

condition.py

Condition implementation.

Classes: BaseCondition, StandardCondition, SimpleCondition

BaseCondition

Bases: ABC

Base class of a Condition object (Strategy Pattern).

Is an abstract class and can't be instantiated.

Attributes:

Name Type Description
condition_id

Id of a condition.

description

Description of a condition.

validation_function

Validation function of a condition.

validation_function_parameters

Arguments of the validation function.

Source code in arta/condition.py
class BaseCondition(ABC):
    """Base class of a Condition object (Strategy Pattern).

    Is an abstract class and can't be instantiated.

    Attributes:
        condition_id: Id of a condition.
        description: Description of a condition.
        validation_function: Validation function of a condition.
        validation_function_parameters: Arguments of the validation function.
    """

    # Class constants
    CONDITION_DATA_LABEL: str = "Undefined condition data (not needed)"
    CONDITION_ID_PATTERN: str = r"\b[A-Z_0-9]+\b"

    def __init__(
        self,
        condition_id: str,
        description: str,
        validation_function: Callable | None = None,
        validation_function_parameters: dict[str, Any] | None = None,
    ) -> None:
        """
        Initialize attributes.

        Args:
            condition_id: Id of a condition.
            description: Description of a condition.
            validation_function: Validation function of a condition.
            validation_function_parameters: Arguments of the validation function.
        """
        self._condition_id = condition_id
        self._description = description
        self._validation_function = validation_function
        self._validation_function_parameters = validation_function_parameters

    @abstractmethod
    def verify(self, input_data: dict[str, Any], parsing_error_strategy: ParsingErrorStrategy, **kwargs: Any) -> bool:
        """(Abstract)
        Return True if the condition is verified.

        Args:
            input_data: Input data to apply rules on.
            parsing_error_strategy: Error handling strategy for parameter parsing.
            **kwargs: For user extra arguments.

        Returns:
            True if the condition is verified, otherwise False.
        """
        raise NotImplementedError

    def get_sanitized_id(self) -> str:
        """Return the sanitized (regex) condition id.

        E.g., 'CONDITION_2'       --> '\\bCONDITION_2\\b'

        Returns:
            A sanitized regex pattern string.
        """
        return rf"\b{self._condition_id}\b"

    @classmethod
    def extract_condition_ids_from_expression(cls, condition_expr: str | None = None) -> set[str]:
        """Get the condition ids from a string (e.g., UPPERCASE words).

        E.g., CONDITION_1 and not CONDITION_2

        Warning: implementation is based on the current class constant CONDITION_SPLIT_PATTERN.

        Args:
            condition_expr: A boolean expression (string).

        Returns:
            A set of extracted condition ids.
        """
        cond_ids: set[str] = set()

        if condition_expr is not None:
            cond_ids = set(re.findall(cls.CONDITION_ID_PATTERN, condition_expr))

        return cond_ids

__init__(condition_id, description, validation_function=None, validation_function_parameters=None)

Initialize attributes.

Parameters:

Name Type Description Default
condition_id str

Id of a condition.

required
description str

Description of a condition.

required
validation_function Callable | None

Validation function of a condition.

None
validation_function_parameters dict[str, Any] | None

Arguments of the validation function.

None
Source code in arta/condition.py
def __init__(
    self,
    condition_id: str,
    description: str,
    validation_function: Callable | None = None,
    validation_function_parameters: dict[str, Any] | None = None,
) -> None:
    """
    Initialize attributes.

    Args:
        condition_id: Id of a condition.
        description: Description of a condition.
        validation_function: Validation function of a condition.
        validation_function_parameters: Arguments of the validation function.
    """
    self._condition_id = condition_id
    self._description = description
    self._validation_function = validation_function
    self._validation_function_parameters = validation_function_parameters

extract_condition_ids_from_expression(condition_expr=None) classmethod

Get the condition ids from a string (e.g., UPPERCASE words).

E.g., CONDITION_1 and not CONDITION_2

Warning: implementation is based on the current class constant CONDITION_SPLIT_PATTERN.

Parameters:

Name Type Description Default
condition_expr str | None

A boolean expression (string).

None

Returns:

Type Description
set[str]

A set of extracted condition ids.

Source code in arta/condition.py
@classmethod
def extract_condition_ids_from_expression(cls, condition_expr: str | None = None) -> set[str]:
    """Get the condition ids from a string (e.g., UPPERCASE words).

    E.g., CONDITION_1 and not CONDITION_2

    Warning: implementation is based on the current class constant CONDITION_SPLIT_PATTERN.

    Args:
        condition_expr: A boolean expression (string).

    Returns:
        A set of extracted condition ids.
    """
    cond_ids: set[str] = set()

    if condition_expr is not None:
        cond_ids = set(re.findall(cls.CONDITION_ID_PATTERN, condition_expr))

    return cond_ids

get_sanitized_id()

Return the sanitized (regex) condition id.

E.g., 'CONDITION_2' --> '\bCONDITION_2\b'

Returns:

Type Description
str

A sanitized regex pattern string.

Source code in arta/condition.py
def get_sanitized_id(self) -> str:
    """Return the sanitized (regex) condition id.

    E.g., 'CONDITION_2'       --> '\\bCONDITION_2\\b'

    Returns:
        A sanitized regex pattern string.
    """
    return rf"\b{self._condition_id}\b"

verify(input_data, parsing_error_strategy, **kwargs) abstractmethod

(Abstract) Return True if the condition is verified.

Parameters:

Name Type Description Default
input_data dict[str, Any]

Input data to apply rules on.

required
parsing_error_strategy ParsingErrorStrategy

Error handling strategy for parameter parsing.

required
**kwargs Any

For user extra arguments.

{}

Returns:

Type Description
bool

True if the condition is verified, otherwise False.

Source code in arta/condition.py
@abstractmethod
def verify(self, input_data: dict[str, Any], parsing_error_strategy: ParsingErrorStrategy, **kwargs: Any) -> bool:
    """(Abstract)
    Return True if the condition is verified.

    Args:
        input_data: Input data to apply rules on.
        parsing_error_strategy: Error handling strategy for parameter parsing.
        **kwargs: For user extra arguments.

    Returns:
        True if the condition is verified, otherwise False.
    """
    raise NotImplementedError

SimpleCondition

Bases: BaseCondition

Class implementing a built-in simple condition.

Attributes:

Name Type Description
condition_id

Id of a condition.

description

Description of a condition.

validation_function

Validation function of a condition.

validation_function_parameters

Arguments of the validation function.

Source code in arta/condition.py
class SimpleCondition(BaseCondition):
    """Class implementing a built-in simple condition.

    Attributes:
        condition_id: Id of a condition.
        description: Description of a condition.
        validation_function: Validation function of a condition.
        validation_function_parameters: Arguments of the validation function.
    """

    # Class constants
    CONDITION_DATA_LABEL: str = "Simple condition data (not needed)"
    CONDITION_ID_PATTERN: str = r"(?:input\.|output\.)(?:[a-zA-Z0-9!=<>\"NTF\.\*\+\-_/]*)(?:[a-zA-Z\s\-_]*\"|)"

    def verify(self, input_data: dict[str, Any], parsing_error_strategy: ParsingErrorStrategy, **kwargs: Any) -> bool:
        """Return True if the condition is verified.

        Example of a unitary simple condition to be verified: 'input.age>=100'

        Args:
            input_data: Request or input data to apply rules on.
            parsing_error_strategy: Error handling strategy for parameter parsing.
            **kwargs: For user extra arguments.

        Returns:
            True if the condition is verified, otherwise False.

        Raises:
            AttributeError: Check the validation function or its parameters.
        """
        bool_var: bool = False
        unitary_expr: str = self._condition_id

        data_path_patt: str = r"(?:input\.|output\.)(?:[a-zA-Z_\.]*)"

        # Retrieve only the data path
        path_matches: list[str] = re.findall(data_path_patt, unitary_expr)

        if len(path_matches) > 0:
            # Regular case: we have a data paths
            for idx, path in enumerate(path_matches):
                # Read data from the path
                locals()[f"data_{idx}"] = parse_dynamic_parameter(  # noqa
                    parameter=path, input_data=input_data, parsing_error_strategy=parsing_error_strategy
                )

                # Replace with the variable name in the expression
                unitary_expr = unitary_expr.replace(path, f"data_{idx}")

            # Evaluate the expression
            try:
                bool_var = eval(unitary_expr)  # noqa
            except TypeError:
                # Ignore evaluation --> False
                pass

        elif parsing_error_strategy == ParsingErrorStrategy.RAISE:
            # Raise an error because of no match for a data path
            raise ConditionExecutionError(f"Error when verifying simple condition: '{unitary_expr}'")

        else:
            # Other case: ignore, default value => return False
            pass

        return bool_var

    def get_sanitized_id(self) -> str:
        """Return the sanitized (regex) condition id.

        E.g., 'input.power=="fly"' --> 'input\\.power==\\"fly\\"'

        Returns:
            A sanitized regex pattern string.
        """
        return re.escape(self._condition_id)

get_sanitized_id()

Return the sanitized (regex) condition id.

E.g., 'input.power=="fly"' --> 'input.power==\"fly\"'

Returns:

Type Description
str

A sanitized regex pattern string.

Source code in arta/condition.py
def get_sanitized_id(self) -> str:
    """Return the sanitized (regex) condition id.

    E.g., 'input.power=="fly"' --> 'input\\.power==\\"fly\\"'

    Returns:
        A sanitized regex pattern string.
    """
    return re.escape(self._condition_id)

verify(input_data, parsing_error_strategy, **kwargs)

Return True if the condition is verified.

Example of a unitary simple condition to be verified: 'input.age>=100'

Parameters:

Name Type Description Default
input_data dict[str, Any]

Request or input data to apply rules on.

required
parsing_error_strategy ParsingErrorStrategy

Error handling strategy for parameter parsing.

required
**kwargs Any

For user extra arguments.

{}

Returns:

Type Description
bool

True if the condition is verified, otherwise False.

Raises:

Type Description
AttributeError

Check the validation function or its parameters.

Source code in arta/condition.py
def verify(self, input_data: dict[str, Any], parsing_error_strategy: ParsingErrorStrategy, **kwargs: Any) -> bool:
    """Return True if the condition is verified.

    Example of a unitary simple condition to be verified: 'input.age>=100'

    Args:
        input_data: Request or input data to apply rules on.
        parsing_error_strategy: Error handling strategy for parameter parsing.
        **kwargs: For user extra arguments.

    Returns:
        True if the condition is verified, otherwise False.

    Raises:
        AttributeError: Check the validation function or its parameters.
    """
    bool_var: bool = False
    unitary_expr: str = self._condition_id

    data_path_patt: str = r"(?:input\.|output\.)(?:[a-zA-Z_\.]*)"

    # Retrieve only the data path
    path_matches: list[str] = re.findall(data_path_patt, unitary_expr)

    if len(path_matches) > 0:
        # Regular case: we have a data paths
        for idx, path in enumerate(path_matches):
            # Read data from the path
            locals()[f"data_{idx}"] = parse_dynamic_parameter(  # noqa
                parameter=path, input_data=input_data, parsing_error_strategy=parsing_error_strategy
            )

            # Replace with the variable name in the expression
            unitary_expr = unitary_expr.replace(path, f"data_{idx}")

        # Evaluate the expression
        try:
            bool_var = eval(unitary_expr)  # noqa
        except TypeError:
            # Ignore evaluation --> False
            pass

    elif parsing_error_strategy == ParsingErrorStrategy.RAISE:
        # Raise an error because of no match for a data path
        raise ConditionExecutionError(f"Error when verifying simple condition: '{unitary_expr}'")

    else:
        # Other case: ignore, default value => return False
        pass

    return bool_var

StandardCondition

Bases: BaseCondition

Class implementing a built-in condition, named standard condition.

Attributes:

Name Type Description
condition_id

Id of a condition.

description

Description of a condition.

validation_function

Validation function of a condition.

validation_function_parameters

Arguments of the validation function.

Source code in arta/condition.py
class StandardCondition(BaseCondition):
    """Class implementing a built-in condition, named standard condition.

    Attributes:
        condition_id: Id of a condition.
        description: Description of a condition.
        validation_function: Validation function of a condition.
        validation_function_parameters: Arguments of the validation function.
    """

    # Class constants
    CONDITION_DATA_LABEL: str = "Standard condition (will be overwritten)"

    def verify(self, input_data: dict[str, Any], parsing_error_strategy: ParsingErrorStrategy, **kwargs: Any) -> bool:
        """Return True if the condition is verified.

        Example of a unitary standard condition: CONDITION_1

        Args:
            input_data: Request or input data to apply rules on.
            parsing_error_strategy: Error handling strategy for parameter parsing.
            **kwargs: For user extra arguments.

        Returns:
            True if the condition is verified, otherwise False.

        Raises:
            AttributeError: Check the validation function or its parameters.
        """
        if self._validation_function is None:
            raise AttributeError("Validation function should not be None")

        if self._validation_function_parameters is None:
            raise AttributeError("Validation function parameters should not be None")

        # Parse dynamic parameters
        parameters: dict[str, Any] = {}

        for key, value in self._validation_function_parameters.items():
            parameters[key] = parse_dynamic_parameter(
                parameter=value, input_data=input_data, parsing_error_strategy=parsing_error_strategy
            )

        # Run validation_function
        return self._validation_function(**parameters)

verify(input_data, parsing_error_strategy, **kwargs)

Return True if the condition is verified.

Example of a unitary standard condition: CONDITION_1

Parameters:

Name Type Description Default
input_data dict[str, Any]

Request or input data to apply rules on.

required
parsing_error_strategy ParsingErrorStrategy

Error handling strategy for parameter parsing.

required
**kwargs Any

For user extra arguments.

{}

Returns:

Type Description
bool

True if the condition is verified, otherwise False.

Raises:

Type Description
AttributeError

Check the validation function or its parameters.

Source code in arta/condition.py
def verify(self, input_data: dict[str, Any], parsing_error_strategy: ParsingErrorStrategy, **kwargs: Any) -> bool:
    """Return True if the condition is verified.

    Example of a unitary standard condition: CONDITION_1

    Args:
        input_data: Request or input data to apply rules on.
        parsing_error_strategy: Error handling strategy for parameter parsing.
        **kwargs: For user extra arguments.

    Returns:
        True if the condition is verified, otherwise False.

    Raises:
        AttributeError: Check the validation function or its parameters.
    """
    if self._validation_function is None:
        raise AttributeError("Validation function should not be None")

    if self._validation_function_parameters is None:
        raise AttributeError("Validation function parameters should not be None")

    # Parse dynamic parameters
    parameters: dict[str, Any] = {}

    for key, value in self._validation_function_parameters.items():
        parameters[key] = parse_dynamic_parameter(
            parameter=value, input_data=input_data, parsing_error_strategy=parsing_error_strategy
        )

    # Run validation_function
    return self._validation_function(**parameters)