Source code for pyMentalModels.infer

#!/usr/bin/python3
# -*- coding: iso-8859-15 -*-

import numpy as np
import logging

# from itertools import product
from functools import reduce

# from pyMentalModels.numpy_reasoner import _merge_models
from pyMentalModels.mental_model import mental_model
from pyMentalModels.constants import EXPL_NEG, POS_VAL, IMPL_NEG

from typing import List
from enum import Enum


[docs]class InferenceTask(Enum): FOLLOWS = "what_follows" NECESSARY = "necessary" POSSIBLE = "possible" PROBABILITY = "probability" VERIFY = "verify" ONLY_MODELS = "only_models"
[docs]def normalize_combined_model_values(model): # after adding values can either be 2, -2 , -3 or -4 for the indexes that are active in both models # for the other indices values are 0, -1, -2 or 1 # for the active indices map 4, -2, -3 and -4 to 1, -1, -2 # XXX with exxxplicit mode, implicit neg == Expl neg model[model == POS_VAL + POS_VAL] = POS_VAL model[model == POS_VAL + IMPL_NEG] = POS_VAL model[model == IMPL_NEG + IMPL_NEG] = IMPL_NEG model[model == EXPL_NEG + IMPL_NEG] = EXPL_NEG model[model == EXPL_NEG + EXPL_NEG] = EXPL_NEG return model
[docs]def combine_mental_models(model1, model2, atom_index_mapping, exp_atoms): logging.debug("Combining mental models: ") logging.debug(str(model1)) logging.debug(str(model2)) if not model1.size or not model2.size: logging.debug("Either model1 or model2 is the empty model") return np.array([]) """ pre-process every submodel so that the combinations are allowed only compare relevant indices, i.e. (A & B) | (B & C) 1 1 0 0 1 1 """ # Gather indices of atoms that are active in the models merged_model_active_indices = {i for i, val in enumerate(model1.any(axis=0)) if val} model_active_indices = {i for i, val in enumerate(model2.any(axis=0)) if val} # single out the overlapping atoms in both models (i.e. (A B) & (B C) -> B) atom_indices_to_check = list(merged_model_active_indices & model_active_indices) logging.debug("Atoms in both models: {}".format(list(map(lambda x: exp_atoms[x], atom_indices_to_check)))) if atom_indices_to_check: # if there are overlapping indices for both models sub_models_merged_model = [] def same_val(val1, val2): return (val1 in (EXPL_NEG, IMPL_NEG) and val2 in (IMPL_NEG, EXPL_NEG)) \ or (val1 in (IMPL_NEG, POS_VAL) and val2 in (IMPL_NEG, POS_VAL)) for submodel in model1: allowed_models = [] for sub_model_to_check in model2: # print(all(same_val(*vals) for vals in zip(submodel[atom_indices_to_check], sub_model_to_check[atom_indices_to_check]))) if all(same_val(*vals) for vals in zip(submodel[atom_indices_to_check], sub_model_to_check[atom_indices_to_check])): allowed_models.append(sub_model_to_check) logging.debug("Allowed models are: {}".format(allowed_models)) if not allowed_models: continue allowed_models = np.stack(allowed_models) reshaped_submodel = np.repeat(np.atleast_2d(submodel), len(allowed_models), axis=0) logging.debug("Reshaped submodel:".format(reshaped_submodel)) submodel_added_with_allowed_models = reshaped_submodel + allowed_models logging.info("SUBMODEL: {}".format(submodel_added_with_allowed_models)) # after adding values can either be 2, -2 , -3 or -4 for the indexes that are active in both models # for the other indices values are 0, -1, -2 or 1 # for the active indices map 4, -2, -3 and -4 to 1, -1, -2 submodel_added_with_allowed_models = normalize_combined_model_values(submodel_added_with_allowed_models) logging.debug("added submodel with allowed model".format(submodel_added_with_allowed_models)) sub_models_merged_model.append(submodel_added_with_allowed_models) logging.debug("List of valid submodels until now:".format(sub_models_merged_model)) # finished iterating through all submodels # has collected all valid combinations of both the models # if there are still no combinations of any submodel with the other model # return the empty array if sub_models_merged_model: merged_models = np.vstack(sub_models_merged_model) print("MERGED: ", merged_models) else: print("The models contradict each other") return np.array([[]]) else: reshaped_model1 = np.repeat(model1, len(model2), axis=0) reshaped_model2 = np.tile(model2, (len(model1), 1)) merged_models = reshaped_model1 + reshaped_model2 logging.info("The combination of both models yields: ") logging.info(str(merged_models)) return merged_models
[docs]def resize_model(model, atom_index_mapping_all, all_atoms_in_all_models): """ Resize models so that they all share the same form to be able to compare them Parameters ---------- model: MentalModel NamedTuple atom_index_mapping_all: Dict maps each Atom to its column index in the mental_model.model np.ndarray all_atoms_in_all_models Returns ------- resized model """ resized_mental_model = np.zeros((len(model.model), len(all_atoms_in_all_models))) resized_mental_model[:, list(map(lambda atom: atom_index_mapping_all[atom], model.atoms_model))] = model.model logging.debug("Resized model: {}".format(resized_mental_model)) return resized_mental_model
[docs]def infer(models: List, task: InferenceTask): """ Parameters ---------- models: List of mental_model NamedTuples with attributes: Attributes: expression: Logical expression that has been processed (Sympy.object) model: The resulting mental model representation (np.ndarry) atoms_model: list of atoms in the expression (list) atom_index_mapping: mapping of atoms to their column in `model` (Dict) task: InferenceTask One of the InferenceTask.values: 1. what_follows?: Set task: Infer what follows from all premises 2. necessary?: Set task: Given all but last premises, infer if last premise necessarily follows 3. possible?: Set task: Given all but last premises, infer if last premise possibly follows 4. probability?: Set task: Given all but last premises, infer the probability of last premis 5. verify?: Set task: Given evidence last premise, verify all but last 6. only_models Builds models of all the premises Returns ------- Returns a conclusion or inference of sort. """ print() print("Inference task is: {}".format(task)) if task == InferenceTask.ONLY_MODELS or len(models) <= 1: return models # first preprocess all mental models to share the same column space all_atoms_in_all_models = sorted(set().union(*(set(model.atoms_model) for model in models)), key=str) # type: List atom_index_mapping_all = {atom: i for i, atom in enumerate(all_atoms_in_all_models)} # XXX CHOOSE: IF contradiction abort early or just remove empty model from model list resized_mental_models = [ resize_model(model, atom_index_mapping_all, all_atoms_in_all_models) for model in models if model.model.size # getting rid of contradictions ] for i, model in enumerate(resized_mental_models): logging.debug("The {}th model is: {}".format(i + 1, model)) print("The {}th model is:".format(i + 1)) print(model) if task == InferenceTask.FOLLOWS: print() print("Combine mental models...") possible_models = reduce( lambda model1, model2: combine_mental_models( model1, model2, atom_index_mapping_all, all_atoms_in_all_models ), resized_mental_models ) return mental_model(tuple(model.expr for model in models), possible_models, all_atoms_in_all_models, atom_index_mapping_all) # all the other modes separate the last expression from the first n-1 *all_but_last_models, last_model = resized_mental_models all_but_last_str, last_str = ", ".join(str(model.expr) for model in models[:-1]), models[-1].expr # All the following will return the Model for n-1, last_model and an inference_string # possible models given the first n-1 premises print() print("Combine all but last mental models...") possible_models = reduce( lambda model1, model2: combine_mental_models( model1, model2, atom_index_mapping_all, all_atoms_in_all_models ), all_but_last_models ) if task == InferenceTask.NECESSARY: if not last_model.shape[0] == 1: raise ValueError("The last premise should be a simple one that yields a single possible model") necessary_atoms = { i for i, val in enumerate( np.all(possible_models == possible_models[0, :], axis=0) ) if val } last_model_active_indices = { i for i, val in enumerate( last_model.all(axis=0) ) if val } if necessary_atoms.issuperset(last_model_active_indices): infer_string = "The last premise '{}' necessarily follows from the previous expressions '{}'".format(last_str, all_but_last_str) else: infer_string = "The last premise '{}' does not necessarily follow from the previous expressons '{}'".format(last_str, all_but_last_str) return ( mental_model(tuple(model.expr for model in models[:-1]), possible_models, all_atoms_in_all_models, atom_index_mapping_all), mental_model(tuple(model.expr for model in models[-1:]), last_model, all_atoms_in_all_models, atom_index_mapping_all), infer_string) if task == InferenceTask.POSSIBLE: if not last_model.shape[0] == 1: raise ValueError("The last premise should be a simple one that yields a single possible model") print(possible_models.shape, last_model.shape) # Get the active indices for the last model to compare agains the different # possible_models in the all_but_last model last_model_active_indices = list({i for i, val in enumerate(last_model.all(axis=0)) if val}) values_at_active_indices = last_model[0, last_model_active_indices] for possible_model in possible_models: if np.equal( # This will compare the following to arrays elementwise values_at_active_indices, possible_model[last_model_active_indices] ).all(): # And return if all the values where the same infer_string = "The last premise '{}' possibly follows from the previous expressions '{}'".format(last_str, all_but_last_str) return ( mental_model(tuple(model.expr for model in models[:-1]), possible_models, all_atoms_in_all_models, atom_index_mapping_all), mental_model(tuple(model.expr for model in models[-1:]), last_model, all_atoms_in_all_models, atom_index_mapping_all), infer_string) return ( mental_model(tuple(model.expr for model in models[:-1]), possible_models, all_atoms_in_all_models, atom_index_mapping_all), mental_model(tuple(models[-1].expr), last_model, all_atoms_in_all_models, atom_index_mapping_all), "The last premise '{}' does not possibly follow from the previous expressons '{}'".format(last_str, all_but_last_str)) if task == InferenceTask.PROBABILITY: raise NotImplementedError("Future work...") # XXX Requires probabilities based on facts I do not have implemented if task == InferenceTask.VERIFY: raise NotImplementedError("") print() print("Combine mental models...") possible_models = reduce(lambda model1, model2: combine_mental_models(model1, model2, atom_index_mapping_all, all_atoms_in_all_models), resized_mental_models) return mental_model(tuple(model.expr for model in models), possible_models, all_atoms_in_all_models, atom_index_mapping_all)