Source code for py_entitymatching.blocker.black_box_blocker

from collections import OrderedDict
import logging
import time
import sys

import pandas as pd
import numpy as np
import pyprind
from joblib import Parallel, delayed
import cloudpickle as cp
import pickle

from py_entitymatching.blocker.blocker import Blocker
import py_entitymatching.catalog.catalog_manager as cm
from py_entitymatching.utils.catalog_helper import log_info, get_name_for_key, add_key_column

logger = logging.getLogger(__name__)

[docs]class BlackBoxBlocker(Blocker): """ Blocks based on a black box function specified by the user. """ def __init__(self, *args, **kwargs): super(Blocker, self).__init__(*args, **kwargs) self.black_box_function = None
[docs] def set_black_box_function(self, function): """Sets black box function to be used for blocking. Args: function (function): the black box function to be used for blocking . """ self.black_box_function = function
[docs] def block_tables(self, ltable, rtable, l_output_attrs=None, r_output_attrs=None, l_output_prefix='ltable_', r_output_prefix='rtable_', verbose=False, show_progress=True, n_jobs=1): """ Blocks two tables based on a black box blocking function specified by the user. Finds tuple pairs from left and right tables that survive the black box function. A tuple pair survives the black box blocking function if the function returns False for that pair, otherwise the tuple pair is dropped. Args: ltable (DataFrame): The left input table. rtable (DataFrame): The right input table. l_output_attrs (list): A list of attribute names from the left table to be included in the output candidate set (defaults to None). r_output_attrs (list): A list of attribute names from the right table to be included in the output candidate set (defaults to None). l_output_prefix (string): The prefix to be used for the attribute names coming from the left table in the output candidate set (defaults to 'ltable\_'). r_output_prefix (string): The prefix to be used for the attribute names coming from the right table in the output candidate set (defaults to 'rtable\_'). verbose (boolean): A flag to indicate whether the debug information should be logged (defaults to False). show_progress (boolean): A flag to indicate whether progress should be displayed to the user (defaults to True). n_jobs (int): The number of parallel jobs to be used for computation (defaults to 1). If -1 all CPUs are used. If 0 or 1, no parallel computation is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used (where n_cpus are the total number of CPUs in the machine).Thus, for n_jobs = -2, all CPUs but one are used. If (n_cpus + 1 + n_jobs) is less than 1, then no parallel computation is used (i.e., equivalent to the default). Returns: A candidate set of tuple pairs that survived blocking (DataFrame). Raises: AssertionError: If `ltable` is not of type pandas DataFrame. AssertionError: If `rtable` is not of type pandas DataFrame. AssertionError: If `l_output_attrs` is not of type of list. AssertionError: If `r_output_attrs` is not of type of list. AssertionError: If values in `l_output_attrs` is not of type string. AssertionError: If values in `r_output_attrs` is not of type string. AssertionError: If `l_output_prefix` is not of type string. AssertionError: If `r_output_prefix` is not of type string. AssertionError: If `verbose` is not of type boolean. AssertionError: If `show_progress` is not of type boolean. AssertionError: If `n_jobs` is not of type int. AssertionError: If `l_out_attrs` are not in the ltable. AssertionError: If `r_out_attrs` are not in the rtable. Examples: >>> def match_last_name(ltuple, rtuple): # assume that there is a 'name' attribute in the input tables # and each value in it has two words l_last_name = ltuple['name'].split()[1] r_last_name = rtuple['name'].split()[1] if l_last_name != r_last_name: return True else: return False >>> import py_entitymatching as em >>> bb = em.BlackBoxBlocker() >>> bb.set_black_box_function(match_last_name) >>> C = bb.block_tables(A, B, l_output_attrs=['name'], r_output_attrs=['name'] ) """ # validate data types of standard input parameters self.validate_types_params_tables(ltable, rtable, l_output_attrs, r_output_attrs, l_output_prefix, r_output_prefix, verbose, n_jobs) # validate data type of show_progress self.validate_show_progress(show_progress) # validate black box function assert self.black_box_function != None, 'Black box function is not set' # validate output attributes self.validate_output_attrs(ltable, rtable, l_output_attrs,r_output_attrs) # get and validate metadata log_info(logger, 'Required metadata: ltable key, rtable key', verbose) # # get metadata l_key, r_key = cm.get_keys_for_ltable_rtable(ltable, rtable, logger, verbose) # # validate metadata cm._validate_metadata_for_table(ltable, l_key, 'ltable', logger, verbose) cm._validate_metadata_for_table(rtable, r_key, 'rtable', logger, verbose) # do blocking # # set index for convenience l_df = ltable.set_index(l_key, drop=False) r_df = rtable.set_index(r_key, drop=False) # # remove l_key from l_output_attrs and r_key from r_output_attrs l_output_attrs_1 = [] if l_output_attrs: l_output_attrs_1 = [x for x in l_output_attrs if x != l_key] r_output_attrs_1 = [] if r_output_attrs: r_output_attrs_1 = [x for x in r_output_attrs if x != r_key] # # determine the number of processes to launch parallely n_procs = self.get_num_procs(n_jobs, len(l_df) * len(r_df)) # # pickle the black-box function before passing it as an arg to # # _block_tables_split to be executed by each child process black_box_function_pkl = cp.dumps(self.black_box_function) if n_procs <= 1: # single process candset = _block_tables_split(l_df, r_df, l_key, r_key, l_output_attrs_1, r_output_attrs_1, l_output_prefix, r_output_prefix, black_box_function_pkl, show_progress) else: # multiprocessing m, n = self.get_split_params(n_procs, len(l_df), len(r_df)) l_splits = np.array_split(l_df, m) r_splits = np.array_split(r_df, n) c_splits = Parallel(n_jobs=m*n)(delayed(_block_tables_split)(l_splits[i], r_splits[j], l_key, r_key, l_output_attrs_1, r_output_attrs_1, l_output_prefix, r_output_prefix, black_box_function_pkl, show_progress and i == len(l_splits) - 1 and j == len(r_splits) - 1) for i in range(len(l_splits)) for j in range(len(r_splits))) candset = pd.concat(c_splits, ignore_index=True) # # determine the attributes to retain in the output candidate set retain_cols = self.get_attrs_to_retain(l_key, r_key, l_output_attrs, r_output_attrs, l_output_prefix, r_output_prefix) if len(candset) > 0: candset = candset[retain_cols] else: candset =pd.DataFrame(columns=retain_cols) # update catalog key = get_name_for_key(candset.columns) candset = add_key_column(candset, key) cm.set_candset_properties(candset, key, l_output_prefix+l_key, r_output_prefix+r_key, ltable, rtable) # return candidate set return candset
[docs] def block_candset(self, candset, verbose=True, show_progress=True, n_jobs=1): """ Blocks an input candidate set of tuple pairs based on a black box blocking function specified by the user. Finds tuple pairs from an input candidate set of tuple pairs that survive the black box function. A tuple pair survives the black box blocking function if the function returns False for that pair, otherwise the tuple pair is dropped. Args: candset (DataFrame): The input candidate set of tuple pairs. verbose (boolean): A flag to indicate whether logging should be done (defaults to False). show_progress (boolean): A flag to indicate whether progress should be displayed to the user (defaults to True). n_jobs (int): The number of parallel jobs to be used for computation (defaults to 1). If -1 all CPUs are used. If 0 or 1, no parallel computation is used at all, which is useful for debugging. For n_jobs below -1, (n_cpus + 1 + n_jobs) are used (where n_cpus is the total number of CPUs in the machine).Thus, for n_jobs = -2, all CPUs but one are used. If (n_cpus + 1 + n_jobs) is less than 1, then no parallel computation is used (i.e., equivalent to the default). Returns: A candidate set of tuple pairs that survived blocking (DataFrame). Raises: AssertionError: If `candset` is not of type pandas DataFrame. AssertionError: If `verbose` is not of type boolean. AssertionError: If `n_jobs` is not of type int. AssertionError: If `show_progress` is not of type boolean. AssertionError: If `l_block_attr` is not in the ltable columns. AssertionError: If `r_block_attr` is not in the rtable columns. Examples: >>> def match_last_name(ltuple, rtuple): # assume that there is a 'name' attribute in the input tables # and each value in it has two words l_last_name = ltuple['name'].split()[1] r_last_name = rtuple['name'].split()[1] if l_last_name != r_last_name: return True else: return False >>> import py_entitymatching as em >>> bb = em.BlackBoxBlocker() >>> bb.set_black_box_function(match_last_name) >>> D = bb.block_candset(C) # C is an output from block_tables """ # validate data types of standard input parameters self.validate_types_params_candset(candset, verbose, show_progress, n_jobs) # validate black box functionn assert self.black_box_function != None, 'Black box function is not set' # get and validate metadata log_info(logger, 'Required metadata: cand.set key, fk ltable, fk rtable, ' 'ltable, rtable, ltable key, rtable key', verbose) # # get metadata key, fk_ltable, fk_rtable, ltable, rtable, l_key, r_key = cm.get_metadata_for_candset(candset, logger, verbose) # # validate metadata cm._validate_metadata_for_candset(candset, key, fk_ltable, fk_rtable, ltable, rtable, l_key, r_key, logger, verbose) # do blocking # # set index for convenience l_df = ltable.set_index(l_key, drop=False) r_df = rtable.set_index(r_key, drop=False) # # project candset to keep only the ID attributes c_df = candset[[key, fk_ltable, fk_rtable]] # # determine the number of processes to launch parallely n_procs = self.get_num_procs(n_jobs, len(c_df)) # # pickle the black-box function before passing it as an arg to # # _block_candset_split to be executed by each child process black_box_function_pkl = cp.dumps(self.black_box_function) valid = [] if n_procs <= 1: # single process valid = _block_candset_split(c_df, l_df, r_df, l_key, r_key, fk_ltable, fk_rtable, black_box_function_pkl, show_progress) else: # multiprocessing c_splits = np.array_split(c_df, n_procs) valid_splits = Parallel(n_jobs=n_procs)(delayed(_block_candset_split)(c_splits[i], l_df, r_df, l_key, r_key, fk_ltable, fk_rtable, black_box_function_pkl, show_progress and i == len(c_splits) - 1) for i in range(len(c_splits))) valid = sum(valid_splits, []) # construct output table if len(c_df) > 0: c_df = candset[valid] else: c_df = pd.DataFrame(columns=candset.columns) # update catalog cm.set_candset_properties(c_df, key, fk_ltable, fk_rtable, ltable, rtable) # return candidate set return c_df
[docs] def block_tuples(self, ltuple, rtuple): """ Blocks a tuple pair based on a black box blocking function specified by the user. Takes a tuple pair as input, applies the black box blocking function to it, and returns True (if the intention is to drop the pair) or False (if the intention is to keep the tuple pair). Args: ltuple (Series): input left tuple. rtuple (Series): input right tuple. Returns: A status indicating if the tuple pair should be dropped or kept, based on the black box blocking function (boolean). Examples: >>> def match_last_name(ltuple, rtuple): # assume that there is a 'name' attribute in the input tables # and each value in it has two words l_last_name = ltuple['name'].split()[1] r_last_name = rtuple['name'].split()[1] if l_last_name != r_last_name: return True else: return False >>> import py_entitymatching as em >>> bb = em.BlackBoxBlocker() >>> bb.set_black_box_function(match_last_name) >>> status = bb.block_tuples(A.loc[0], B.loc[0]) # A, B are input tables. """ # validate black box function assert self.black_box_function is not None, 'Black box function is not set' return self.black_box_function(ltuple, rtuple)
def _block_tables_split(l_df, r_df, l_key, r_key, l_output_attrs, r_output_attrs, l_output_prefix, r_output_prefix, black_box_function_pkl, show_progress): # initialize progress bar if show_progress: bar = pyprind.ProgBar(len(l_df)*len(r_df)) # create look up dictionaries for faster processing l_dict = {} for k, r in l_df.iterrows(): l_dict[k] = r r_dict = {} for k, r in r_df.iterrows(): r_dict[k] = r # get the position of the ID attribute in the tables l_id_pos = list(l_df.columns).index(l_key) r_id_pos = list(r_df.columns).index(r_key) # create candset column names for the ID attributes of the tables ltable_id = l_output_prefix + l_key rtable_id = r_output_prefix + r_key # list to keep the tuple pairs that survive blocking valid = [] # unpickle the black box function black_box_function = pickle.loads(black_box_function_pkl) # iterate through the two tables for l_t in l_df.itertuples(index=False): # # get ltuple from the look up dictionary ltuple = l_dict[l_t[l_id_pos]] for r_t in r_df.itertuples(index=False): # # update the progress bar if show_progress: bar.update() # # get rtuple from the look up dictionary rtuple = r_dict[r_t[r_id_pos]] # # apply the black box function to the tuple pair res = black_box_function(ltuple, rtuple) if res != True: # # this tuple pair survives blocking # # an ordered dictionary to keep a surviving tuple pair d = OrderedDict() # # add ltable and rtable ids to an ordered dictionary d[ltable_id] = ltuple[l_key] d[rtable_id] = rtuple[r_key] # # add l/r output attributes to the ordered dictionary l_out = ltuple[l_output_attrs] l_out.index = l_output_prefix + l_out.index d.update(l_out) r_out = rtuple[r_output_attrs] r_out.index = r_output_prefix + r_out.index d.update(r_out) # # add the ordered dict to the list valid.append(d) # construct candidate set candset = pd.DataFrame(valid) return candset def _block_candset_split(c_df, l_df, r_df, l_key, r_key, fk_ltable, fk_rtable, black_box_function_pkl, show_progress): # initialize the progress bar if show_progress: bar = pyprind.ProgBar(len(c_df)) # create lookup dictionaries for faster processing l_dict = {} r_dict = {} # list to keep track of valid ids valid = [] # find positions of the ID attributes of the two tables in the candset l_id_pos = list(c_df.columns).index(fk_ltable) r_id_pos = list(c_df.columns).index(fk_rtable) # unpickle the black box function black_box_function = pickle.loads(black_box_function_pkl) # iterate candidate set for row in c_df.itertuples(index=False): # # update progress bar if show_progress: bar.update() # # get ltuple, try dictionary first, then dataframe row_lkey = row[l_id_pos] if row_lkey not in l_dict: l_dict[row_lkey] = l_df.loc[row_lkey] ltuple = l_dict[row_lkey] # # get rtuple, try dictionary first, then dataframe row_rkey = row[r_id_pos] if row_rkey not in r_dict: r_dict[row_rkey] = r_df.loc[row_rkey] rtuple = r_dict[row_rkey] # # apply the black box function to the tuple pair res = black_box_function(ltuple, rtuple) if res != True: valid.append(True) else: valid.append(False) return valid