Source code for py_entitymatching.tuner.tuner_down_sample

import logging

from dask import delayed
import math
import multiprocessing
import time

import pandas as pd
import numpy as np
from dask import delayed
from dask.diagnostics import ProgressBar

import py_entitymatching.catalog.catalog_manager as cm
from py_entitymatching.dask.dask_down_sample import sample_right_table, \
    _get_str_cols_list, process_tokenize_concat_strings, remove_punctuations, \
    _get_stop_words, build_inverted_index, probe
from py_entitymatching.utils.validation_helper import validate_object_type



logger = logging.getLogger(__name__)

# ----------------------------

[docs]def tuner_down_sample(ltable, rtable, size, y_param, seed, rem_stop_words, rem_puncs, n_bins=50, sample_proportion=0.1, repeat=1): """ WARNING THIS COMMAND IS EXPERIMENTAL AND NOT TESTED. USE AT YOUR OWN RISK. Tunes the parameters for down sampling command implemented using Dask. Given the input tables and the parameters for Dask-based down sampling command, this command returns the configuration including whether the input tables need to be swapped, the number of left table chunks, and the number of right table chunks. It uses "Staged Tuning" approach to select the configuration setting. The key idea of this approach select the configuration for one parameter at a time. Conceptually, this command performs the following steps. First, it samples the left table and down sampled rtable using stratified sampling. Next, it uses the sampled tables to decide if they need to be swapped or not (by running the down sample command and comparing the runtimes). Next, it finds the number of rtable partitions using the sampled tables (by trying the a fixed set of partitions and comparing the runtimes). The number of partitions is selected to be the number before which the runtime starts increasing. Then it finds the number of right table partitions similar to selecting the number of left table partitions. while doing this, set the number of right table partitions is set to the value found in the previous step. Finally, it returns the configuration setting back to the user as a triplet (x, y, z) where x indicates if the tables need to be swapped or not, y indicates the number of left table partitions (if the tables need to be swapped, then this indicates the number of left table partitions after swapping), and z indicates the number of down sampled right table partitions. Args: ltable (DataFrame): The left input table, i.e., table A. rtable (DataFrame): The right input table, i.e., table B. size (int): The size that table B should be down sampled to. y_param (int): The parameter to control the down sample size of table A. Specifically, the down sampled size of table A should be close to size * y_param. seed (int): The seed for the pseudo random number generator to select the tuples from A and B (defaults to None). rem_stop_words (boolean): A flag to indicate whether a default set of stop words must be removed. rem_puncs (boolean): A flag to indicate whether the punctuations must be removed from the strings. n_bins (int): The number of bins to be used for stratified sampling. sample_proportion (float): The proportion used to sample the tables. This value is expected to be greater than 0 and less thank 1. repeat (int): The number of times to execute the down sample command while selecting the values for the parameters. Returns: A tuple containing 3 values. For example if the tuple is represented as (x, y, z) then x indicates if the tables need to be swapped or not, y indicates the number of left table partitions (if the tables need to be swapped, then this indicates the number of left table partitions after swapping), and z indicates the number of down sampled right table partitions. Examples: >>> from py_entitymatching.tuner.tuner_down_sample import tuner_down_sample >>> (swap_or_not, n_ltable_chunks, n_sample_rtable_chunks) = tuner_down_sample(ltable, rtable, size, y_param, seed, rem_stop_words, rem_puncs) """ logger.info('WARNING THIS COMMAND IS EXPERIMENTAL AND NOT TESTED. USE AT YOUR OWN ' 'RISK.') # Sample the tables # # Down sample the right table rtable_sampled = sample_right_table(rtable, size, seed) # # Sample the tables for selecting the configuration. sampled_tables_orig_order = get_sampled_tables(ltable, rtable_sampled, rem_puncs, rem_stop_words, n_bins, sample_proportion, seed) # # Repeat the above two steps, but swap the left and right input tables ltable_sampled = sample_right_table(ltable, size, seed) sampled_tables_swap_order = get_sampled_tables(rtable, ltable_sampled, rem_puncs, rem_stop_words, n_bins, sample_proportion, seed) # select if the tables need to be swapped swap_config = should_swap(sampled_tables_orig_order, sampled_tables_swap_order, y_param, seed, rem_puncs, rem_stop_words, repeat=repeat) # use the appropriate sampled tables for further processing s_ltable, s_rtable = sampled_tables_orig_order if swap_config == True: s_ltable, s_rtable = sampled_tables_swap_order # print('Swapping: {0} done'.format(swap_config)) # use the sampled tables to find the number of right table partitions n_rtable_chunks = find_rtable_chunks(s_ltable, s_rtable, y_param, seed, rem_puncs, rem_stop_words) # use the sampled tables to find the number of left table partitions n_ltable_chunks = find_ltable_chunks(s_ltable, s_rtable, y_param, seed, rem_puncs, rem_stop_words, n_rtable_chunks) # return the configuration back to the user return (swap_config, n_ltable_chunks, n_rtable_chunks)
# ------ utility functions for tuner ---- # def get_sampled_tables(ltable, rtable, should_rem_puncs, should_rem_stop_words, n_bins, sample_proportion, seed): """ Function to sample the tables. """ # sample left table. specifically, sample it by stratifying on the lengths of the # concatenated string attributes. s_ltable, processed_tokenized_result = sample_ltable(ltable, should_rem_puncs, should_rem_stop_words, n_bins, sample_proportion, seed) inverted_index = build_inverted_index(processed_tokenized_result) # sample left table. specifically, sample it by stratifying on the lnumber of # tuples that each tuple in down sampled right table must probe s_rtable = sample_rtable(rtable, should_rem_puncs, should_rem_stop_words, n_bins, sample_proportion, inverted_index, seed) # return the sampled ltable and rtable return s_ltable, s_rtable def execute(ltable, rtable, y_param, seed, should_rem_puncs, should_rem_stop_words, n_ltable_chunks, n_rtable_chunks, repeat): """ Function to execute the down sample command and return the run time, """ times = [] for i in range(repeat): t1 = time.time() _, _ = _down_sample(ltable, rtable, y_param, show_progress=False, seed=seed, rem_stop_words=should_rem_stop_words, rem_puncs = should_rem_puncs, n_ltable_chunks=n_ltable_chunks, n_rtable_chunks=n_rtable_chunks) t2 = time.time() times.append(t2-t1) return np.mean(times) def should_swap(orig_order, swap_order, y_param, seed, should_rem_puncs, should_rem_stop_words, repeat=1, epsilon=1): """ Function to decide if we need to swap the input tables or npt """ # Execute the command using sampled input tables in the original order t_orig_order = execute(orig_order[0], orig_order[1], y_param, seed, should_rem_puncs, should_rem_stop_words, -1, -1, repeat) # Execute the command using sampled input tables in the swap order t_swap_order = execute(swap_order[0], swap_order[1], y_param, seed, should_rem_puncs, should_rem_stop_words, -1, -1, repeat) swap_config = False # Here epsilon is used to get the config such that the runtime difference greather # than espsilon if t_swap_order < t_orig_order+epsilon: swap_config = True return swap_config def get_chunk_range(): """ Get the range of partitions to try. """ n_chunks = multiprocessing.cpu_count() if n_chunks > 128: raise NotImplementedError('Currently we consider the num. procs in machine to ' 'be < 128') chunk_range = [n_chunks] while n_chunks < 128: n_chunks *= 2 chunk_range += [n_chunks] return chunk_range def find_rtable_chunks(ltable, rtable, y_param, seed, should_rem_puncs, should_rem_stop_words, repeat=1, epsilon=0): """ Function to find the number of down sampled right table chunks """ # Get the chunk range chunk_range = get_chunk_range() hist_times = [] hist_chunks = [] # Select the number of partitions by executing it over the values in the chunk range t = execute(ltable, rtable, y_param, seed, should_rem_puncs, should_rem_stop_words, n_rtable_chunks=chunk_range[0], n_ltable_chunks=-1, repeat=repeat) hist_times += [t] hist_chunks += [chunk_range[0]] for n_chunks in chunk_range[1:]: t = execute(ltable, rtable,y_param, seed, should_rem_puncs, should_rem_stop_words, n_rtable_chunks=n_chunks, n_ltable_chunks=-1, repeat=repeat) # print('{0} chunk: {1} time: {2}'.format('Rtable', n_chunks, t)) if t > hist_times[-1]+epsilon: return hist_chunks[-1] else: hist_times += [t] hist_chunks += [n_chunks] return hist_chunks[-1] def find_ltable_chunks(ltable, rtable, y_param, seed, should_rem_puncs, should_rem_stop_words, n_rtable_chunks, repeat=1, epsilon=1): """ Function to find the number of left table chunks """ # Get the chunk range to try chunk_range = get_chunk_range() hist_times = [] hist_chunks = [] # Select the number of partitions by executing it over the values in the chunk range t = execute(ltable, rtable, y_param, seed, should_rem_puncs, should_rem_stop_words, n_ltable_chunks=chunk_range[0], n_rtable_chunks=n_rtable_chunks, repeat=repeat) hist_times += [t] hist_chunks += [chunk_range[0]] for n_chunks in chunk_range[1:]: t = execute(ltable, rtable, y_param, seed, should_rem_puncs, should_rem_stop_words, n_ltable_chunks=n_chunks, n_rtable_chunks=n_rtable_chunks, repeat=repeat) if t > hist_times[-1]+epsilon: return hist_chunks[-1] else: hist_times += [t] hist_chunks += [n_chunks] return hist_chunks[-1] def sample_ltable(table, should_rem_puncs, should_rem_stop_words, n_bins, sample_proportion, seed): """ Function to sample the left table """ tbl_str_cols = _get_str_cols_list(table) proj_table = table[table.columns[tbl_str_cols]] # preprocess and tokenize the concatenated strings processed_concat_result, processed_tokenized_result = _process_tokenize_concat_strings(proj_table, 0, should_rem_puncs, should_rem_stop_words) # define temporary columns to store thr concatenated column, their strlengyhs, etc. concat_col = '__concat_column__' strlen_col = '__strlen_column__' key_col = '__key_colum__' df = pd.DataFrame.from_dict(processed_concat_result, orient="index", columns=[concat_col]) df.insert(0, key_col, df.index.values) df.reset_index(inplace=True, drop=True) # get the number of samples to be selected from each bin num_samples = int(math.floor(sample_proportion*len(proj_table))) # get the string lengths and then group by the string lengths df[strlen_col] = df[concat_col].str.len() len_groups = df.groupby(strlen_col) group_ids_len = {} for group_id, group in len_groups: group_ids_len[group_id] = list(group[key_col]) str_lens = list(df[strlen_col].values) str_lens += [max(str_lens) + 1] # bin the string lengths freq, edges = np.histogram(str_lens, bins=n_bins) # compute the bin to which the string length map to bins = [[] for _ in range(n_bins)] keys = sorted(group_ids_len.keys()) positions = np.digitize(keys, edges) # compute the number of entries in each bin for i in range(len(keys)): k, p = keys[i], positions[i] bins[p-1].extend(group_ids_len[k]) len_of_bins = [len(bins[i]) for i in range(len(bins))] # compute the relative weight of each bin and the number of tuples we need to # sample from each bin weights = [len_of_bins[i]/float(sum(len_of_bins)) for i in range(len(bins))] num_tups_from_each_bin = [int(math.ceil(weights[i]*num_samples)) for i in range( len(weights))] # sample from each bin sampled = [] for i in range(len(bins)): num_tuples = num_tups_from_each_bin[i] if len_of_bins[i]: np.random.seed(seed) tmp_samples = np.random.choice(bins[i], num_tuples, replace=False) if len(tmp_samples): sampled.extend(tmp_samples) # retain the same order of tuples as in the input table table_copy = table.copy() table_copy[key_col] = range(len(table)) table_copy = table_copy.set_index(key_col) table_copy['_pos'] = list(range(len(table))) s_table = table_copy.loc[sampled] s_table = s_table.sort_values('_pos') # reset the index and drop the pos column s_table.reset_index(drop=False, inplace=True) s_table.drop(['_pos'], axis=1, inplace=True) return s_table, processed_tokenized_result #--------------- sample rtable ----------------- def sample_rtable(table, should_rem_puncs, should_rem_stop_words, n_bins, sample_proportion, inverted_index, seed): """ Function to sample the down sampled right table. """ tbl_str_cols = _get_str_cols_list(table) proj_table = table[table.columns[tbl_str_cols]] processed_tokenized_vals = process_tokenize_concat_strings(proj_table, 0, should_rem_puncs, should_rem_stop_words) key = '__key_colum__' proj_table.insert(0, key, range(len(proj_table))) proj_table.reset_index(inplace=True, drop=True) token_cnt = {} token_map = {} for row_id in processed_tokenized_vals.keys(): tuple_id = proj_table.iloc[row_id][key] tokens = processed_tokenized_vals[row_id] cnt = 0 for token in tokens: if token not in token_map: token_map[token] = len(inverted_index[token]) cnt += token_map[token] token_cnt[tuple_id] = cnt # cnt_df = pd.DataFrame(token_cnt.items(), columns=[key, 'count']) cnt_df = pd.DataFrame.from_dict(token_cnt, orient='index', columns=['count']) cnt_df.insert(0, key, cnt_df.index.values) cnt_df.reset_index(drop=False, inplace=True) count_groups = cnt_df.groupby('count') cnt_ids = {} for group_id, group in count_groups: cnt_ids[group_id] = list(group[key].values) counts = list(cnt_df['count'].values) counts += [max(counts) + 1] freq, edges = np.histogram(counts, bins=n_bins) # get the number of samples to be selected from each bin num_samples = int(math.floor(sample_proportion * len(table))) # compute the bin to which the string length map to bins = [[] for _ in range(n_bins)] keys = sorted(cnt_ids.keys()) positions = np.digitize(keys, edges) # compute the number of entries in each bin for i in range(len(keys)): k, p = keys[i], positions[i] bins[p - 1].extend(cnt_ids[k]) len_of_bins = [len(bins[i]) for i in range(len(bins))] # compute the relative weight of each bin and the number of tuples we need to # sample from each bin weights = [len_of_bins[i] / float(sum(len_of_bins)) for i in range(len(bins))] num_tups_from_each_bin = [int(math.ceil(weights[i] * num_samples)) for i in range( len(weights))] # sample from each bin sampled = [] for i in range(len(bins)): num_tuples = num_tups_from_each_bin[i] if len_of_bins[i]: np.random.seed(seed) tmp_samples = np.random.choice(bins[i], num_tuples, replace=False) if len(tmp_samples): sampled.extend(tmp_samples) # retain the same order of tuples as in the input table key_col = '__key_col__' table_copy = table.copy() table_copy[key_col] = range(len(table)) table_copy = table_copy.set_index(key_col) table_copy['_pos'] = list(range(len(table))) s_table = table_copy.loc[sampled] s_table = s_table.sort_values('_pos') # reset the index and drop the pos column s_table.reset_index(drop=False, inplace=True) s_table.drop(['_pos'], axis=1, inplace=True) return s_table #---------------- down sample command ----------------# def _process_tokenize_concat_strings(table, start_row_id, should_rem_puncs, should_rem_stop_words): """ Function to process and tokenize the concatenated strings. """ result_concat_vals = {} result_tokenized_vals = {} row_id = start_row_id stop_words = _get_stop_words() for row in table.itertuples(index=False): str_val = ' '.join(col_val.lower().strip() for col_val in row[:] if not pd.isnull(col_val)) if should_rem_puncs: str_val = remove_punctuations(str_val) # tokenize them str_val = set(str_val.split()) if should_rem_stop_words: str_val = str_val.difference(stop_words) result_tokenized_vals[row_id] = str_val result_concat_vals[row_id] = ' '.join(str_val) row_id += 1 return result_concat_vals, result_tokenized_vals def _down_sample(ltable, rtable, y_param, show_progress=True, verbose=False, seed=None, rem_puncs=True, rem_stop_words=True, n_ltable_chunks=-1, n_rtable_chunks=-1): """ Down sampling command implementation. We have reproduced the down sample command because the input to the down sample command is the down sampled right table. """ if not isinstance(ltable, pd.DataFrame): logger.error('Input table A (ltable) is not of type pandas DataFrame') raise AssertionError( 'Input table A (ltable) is not of type pandas DataFrame') if not isinstance(rtable, pd.DataFrame): logger.error('Input table B (rtable) is not of type pandas DataFrame') raise AssertionError( 'Input table B (rtable) is not of type pandas DataFrame') if len(ltable) == 0 or len(rtable) == 0: logger.error('Size of the input table is 0') raise AssertionError('Size of the input table is 0') if y_param == 0: logger.error( 'y cannot be zero (3rd and 4th parameter of downsample)') raise AssertionError( 'y_param cannot be zero (3rd and 4th parameter of downsample)') if seed is not None and not isinstance(seed, int): logger.error('Seed is not of type integer') raise AssertionError('Seed is not of type integer') validate_object_type(verbose, bool, 'Parameter verbose') validate_object_type(show_progress, bool, 'Parameter show_progress') validate_object_type(rem_stop_words, bool, 'Parameter rem_stop_words') validate_object_type(rem_puncs, bool, 'Parameter rem_puncs') validate_object_type(n_ltable_chunks, int, 'Parameter n_ltable_chunks') validate_object_type(n_rtable_chunks, int, 'Parameter n_rtable_chunks') # rtable_sampled = sample_right_table(rtable, size) rtable_sampled = rtable ltbl_str_cols = _get_str_cols_list(ltable) proj_ltable = ltable[ltable.columns[ltbl_str_cols]] if n_ltable_chunks == -1: n_ltable_chunks = multiprocessing.cpu_count() ltable_chunks = np.array_split(proj_ltable, n_ltable_chunks) preprocessed_tokenized_tbl = [] start_row_id = 0 for i in range(len(ltable_chunks)): result = delayed(process_tokenize_concat_strings)(ltable_chunks[i], start_row_id, rem_puncs, rem_stop_words) preprocessed_tokenized_tbl.append(result) start_row_id += len(ltable_chunks[i]) preprocessed_tokenized_tbl = delayed(wrap)(preprocessed_tokenized_tbl) if show_progress: with ProgressBar(): logger.info('Preprocessing/tokenizing ltable') preprocessed_tokenized_tbl_vals = preprocessed_tokenized_tbl.compute( scheduler="processes", num_workers=multiprocessing.cpu_count()) else: preprocessed_tokenized_tbl_vals = preprocessed_tokenized_tbl.compute( scheduler="processes", num_workers=multiprocessing.cpu_count()) ltable_processed_dict = {} for i in range(len(preprocessed_tokenized_tbl_vals)): ltable_processed_dict.update(preprocessed_tokenized_tbl_vals[i]) inverted_index = build_inverted_index(ltable_processed_dict) rtbl_str_cols = _get_str_cols_list(rtable_sampled) proj_rtable_sampled = rtable_sampled[rtable_sampled.columns[rtbl_str_cols]] if n_rtable_chunks == -1: n_rtable_chunks = multiprocessing.cpu_count() rtable_chunks = np.array_split(proj_rtable_sampled, n_rtable_chunks) probe_result = [] for i in range(len(rtable_chunks)): result = delayed(probe)(rtable_chunks[i], y_param, len(proj_ltable), inverted_index, rem_puncs, rem_stop_words, seed) probe_result.append(result) probe_result = delayed(wrap)(probe_result) if show_progress: with ProgressBar(): logger.info('Probing using rtable') probe_result = probe_result.compute(scheduler="processes", num_workers=multiprocessing.cpu_count()) else: probe_result = probe_result.compute(scheduler="processes", num_workers=multiprocessing.cpu_count()) probe_result = map(list, probe_result) l_tbl_indices = set(sum(probe_result, [])) l_tbl_indices = list(l_tbl_indices) ltable_sampled = ltable.iloc[l_tbl_indices] # update catalog if cm.is_dfinfo_present(ltable): cm.copy_properties(ltable, ltable_sampled) if cm.is_dfinfo_present(rtable): cm.copy_properties(rtable, rtable_sampled) return ltable_sampled, rtable_sampled def wrap(object): return object