Source code for blitzml._core

# coding: utf-8

import os
import ctypes
import numpy as np 
from scipy import sparse as sp
import pickle
import glob

from ._lib_file_extension import get_lib_file_extension

def load_library():
  current_dir = os.path.abspath(os.path.dirname(__file__))
  glob_string = "**libblitzml.{}*".format(get_lib_file_extension())
  glob_pathname = os.path.join(current_dir, glob_string)
  for path in glob.glob(glob_pathname):
    try:
      return np.ctypeslib.load_library(path, "/")    
    except OSError:
      pass
  raise Exception("Could not load C library")
  
lib = load_library()

index_t = ctypes.c_int
value_t = ctypes.c_double
float_t = ctypes.c_float
double_t = ctypes.c_double
int_t = ctypes.c_int
unsigned_t = ctypes.c_uint
bool_t = ctypes.c_bool
long_t = ctypes.c_long
size_t = ctypes.c_size_t

pointer_t = ctypes.POINTER(ctypes.c_void_p)
value_t_p = ctypes.POINTER(value_t)
index_t_p = ctypes.POINTER(index_t)
float_t_p = ctypes.POINTER(float_t)
double_t_p = ctypes.POINTER(double_t)
int_t_p = ctypes.POINTER(int_t)
bool_t_p = ctypes.POINTER(bool_t)
size_t_p = ctypes.POINTER(size_t)
char_p = ctypes.c_char_p

lib.BlitzML_solve_problem.argtypes = [pointer_t, pointer_t, pointer_t, value_t_p, char_p, char_p]
lib.BlitzML_new_parameters.restype = pointer_t
lib.BlitzML_new_parameters.argtypes = [value_t_p, size_t]
lib.BlitzML_first_parameter.restype = value_t
lib.BlitzML_first_parameter.argtypes = [pointer_t]
lib.BlitzML_delete_parameters.restype = None
lib.BlitzML_delete_parameters.argtypes = [pointer_t]
lib.BlitzML_delete_solver.restype = None
lib.BlitzML_delete_solver.argtypes = [pointer_t]
lib.BlitzML_set_tolerance.restype = None
lib.BlitzML_set_tolerance.argtypes = [pointer_t, value_t]
lib.BlitzML_tolerance.restype = value_t
lib.BlitzML_tolerance.argtypes = []
lib.BlitzML_set_max_time.restype = None
lib.BlitzML_set_max_time.argtypes = [pointer_t, value_t]
lib.BlitzML_max_time.restype = value_t
lib.BlitzML_max_time.argtypes = []
lib.BlitzML_set_min_time.restype = None
lib.BlitzML_set_min_time.argtypes = [pointer_t, value_t]
lib.BlitzML_min_time.restype = value_t
lib.BlitzML_min_time.argtypes = []
lib.BlitzML_set_verbose.restype = None
lib.BlitzML_set_verbose.argtypes = [pointer_t, bool_t]
lib.BlitzML_verbose.restype = bool_t
lib.BlitzML_verbose.argtypes = []
lib.BlitzML_set_use_screening.restype = None
lib.BlitzML_set_use_screening.argtypes = [pointer_t, bool_t]
lib.BlitzML_use_screening.restype = bool_t
lib.BlitzML_use_screening.argtypes = []
lib.BlitzML_set_use_working_sets.restype = None
lib.BlitzML_set_use_working_sets.argtypes = [pointer_t, bool_t]
lib.BlitzML_use_working_sets.restype = bool_t
lib.BlitzML_use_working_sets.argtypes = []
lib.BlitzML_set_log_vectors.restype = None
lib.BlitzML_set_log_vectors.argtypes = [pointer_t, bool_t]
lib.BlitzML_set_suppress_warnings.restype = None
lib.BlitzML_set_suppress_warnings.argtypes = [pointer_t, bool_t]
lib.BlitzML_log_vectors.restype = bool_t
lib.BlitzML_log_vectors.argtypes = []
lib.BlitzML_set_max_iterations.restype = None
lib.BlitzML_set_max_iterations.argtypes = [pointer_t, unsigned_t]
lib.BlitzML_max_iterations.restype = unsigned_t
lib.BlitzML_max_iterations.argtypes = []

class CPointerWrapper(object):
  def __init__(self, c_pointer, create_func, delete_func):
    self.c_pointer = c_pointer
    self._create_func = create_func
    self._delete_c_obj = delete_func

  def __del__(self):
    self._delete_c_obj(self.c_pointer)

def wrap_new_call(func, delete_func):
  def new_func(*args, **kwargs):
    obj = func(*args, **kwargs)
    obj_wrapper = CPointerWrapper(obj, func, delete_func)
    return obj_wrapper
  return new_func

lib.BlitzML_new_parameters_wrap = wrap_new_call(lib.BlitzML_new_parameters,
                                                lib.BlitzML_delete_parameters)

new_solver_calls = [lib.BlitzML_new_sparse_linear_solver,
                    lib.BlitzML_new_lasso_solver,
                    lib.BlitzML_new_sparse_logreg_solver]
for call in new_solver_calls:
  call.argtypes = []
  call.restype = pointer_t
  wrapper_call_name = "{}_wrapper".format(call.__name__)
  setattr(lib, wrapper_call_name, wrap_new_call(call, lib.BlitzML_delete_solver))
  

_module_vars = {"warnings_suppressed": False}

def add_line_breaks(message, length=0):
  max_chars = 79
  result = []
  for w in message.split():
    length += len(w) + 1
    if length > max_chars:
      result.append("\n")
      length = len(w) + 1
    result.append(w + " ")
  return "".join(result).strip()

def print_if_not_suppressed(message):
  if not _module_vars["warnings_suppressed"]:
    message = add_line_breaks(message)
    print(message)
    
def warn(message):
  print_if_not_suppressed("Warning: {}".format(message))

def value_error(message):
  raise ValueError(add_line_breaks(message, length=12))

[docs]def suppress_warnings(): """Stop BlitzML from printing warning messages to sys.stdout. By default, warning messages are unsuppressed. """ _module_vars["warnings_suppressed"] = True
[docs]def unsuppress_warnings(): """Allow BlitzML to print warning messages to sys.stdout.""" _module_vars["warnings_suppressed"] = False
data_copy_warning_cutoff = 1e7 def data_as(obj, ctypes_type): if obj.dtype != np.dtype(ctypes_type): nnz = np.prod(obj.shape) if nnz > data_copy_warning_cutoff: msg = ("Copying numpy.ndarray of size {:d} from type {} to {}. " "Refer to documentation for tips on avoiding copying.") msg = msg.format(nnz, obj.dtype, ctypes_type.__name__) warn(message) obj = obj.astype(ctypes_type) return (obj, obj.ctypes.data_as(ctypes_type)) def decode_c_char_array(c_char_array): return c_char_array.value.decode() class Problem(object): def _set_solver_options(self, **kwargs): for k, v in kwargs.items(): setattr(self, "_{}".format(k), v) suppress_warnings_ = _module_vars["warnings_suppressed"] lib.BlitzML_set_suppress_warnings(self._solver_c_wrap.c_pointer, bool_t(suppress_warnings_)) def _set_parameters(self, parameters): num_params_c = size_t(len(parameters)) self._parameters, params_array_c = data_as(parameters, value_t_p) _parameters_c_wrap = lib.BlitzML_new_parameters_wrap(params_array_c, num_params_c) return _parameters_c_wrap def _set_result(self, result): self._result, result_c = data_as(result, value_t_p) return result_c def _setup_solution_status(self): self._solution_status = ctypes.create_string_buffer(b"", 64) return self._solution_status def _set_log_directory(self, log_directory): if log_directory is None: log_directory = "" check_log_directory(log_directory) self._log_directory = os.path.join(log_directory, "") return char_p(self._log_directory.encode("utf8")) def _format_solution_status(self): self._solution_status = decode_c_char_array(self._solution_status) @property def _stopping_tolerance(self): """Stopping tolerance for solve. Optimization terminates if duality_gap / objective_value < stopping_tolerance . Default is 1e-3. """ return lib.BlitzML_tolerance(self._solver_c_wrap.c_pointer) @_stopping_tolerance.setter def _stopping_tolerance(self, value): lib.BlitzML_set_tolerance(self._solver_c_wrap.c_pointer, value_t(value)) @property def _max_time(self): """Time limit in seconds for solve call. If stopping tolerance is not reached, optimization terminates after this number of seconds. Default is 1 year. """ return lib.BlitzML_max_time(self._solver_c_wrap.c_pointer) @_max_time.setter def _max_time(self, value): lib.BlitzML_set_max_time(self._solver_c_wrap.c_pointer, value_t(value)) @property def _max_iterations(self): """Iterations limit for solve call. If stopping tolerance is not reached after this number of iterations, optimization terminates. Default is 100000. """ return lib.BlitzML_max_iterations(self._solver_c_wrap.c_pointer) @_max_iterations.setter def _max_iterations(self, value): lib.BlitzML_set_max_iterations(self._solver_c_wrap.c_pointer, unsigned_t(value)) @property def _min_time(self): """Minimum time in seconds for solve call. Optimization continues until this amount of time passes, even after reaching stopping tolerance. Default is zero. """ return lib.BlitzML_min_time(self._solver_c_wrap.c_pointer) @_min_time.setter def _min_time(self, value): lib.BlitzML_set_min_time(self._solver_c_wrap.c_pointer, value_t(value)) @property def _verbose(self): """If True, algorithm prints convergence info (such as objective value) to stdout after each iteration during solve call. Default is False. """ return lib.BlitzML_verbose(self._solver_c_wrap.c_pointer) @_verbose.setter def _verbose(self, value): lib.BlitzML_set_verbose(self._solver_c_wrap.c_pointer, bool_t(value)) @property def _use_working_sets(self): """If True, BlitzML solves the problem using working sets. Default is True. Setting to False likely results in slower optimization. """ return lib.BlitzML_use_working_sets(self._solver_c_wrap.c_pointer) @_use_working_sets.setter def _use_working_sets(self, value): lib.BlitzML_set_use_working_sets(self._solver_c_wrap.c_pointer, bool_t(value)) @property def _use_screening(self): """If True, BlitzML solves the problem using safe screening. Default is True. Setting to False may result in slower optimization. """ return lib.BlitzML_use_screening(self._solver_c_wrap.c_pointer) @_use_screening.setter def _use_screening(self, value): lib.BlitzML_set_use_screening(self._solver_c_wrap.c_pointer, bool_t(value)) @property def _log_vectors(self): """If True, BlitzML solves the problem using working sets. Default is True. Setting to False likely results in slower optimization. """ return lib.BlitzML_log_vectors(self._solver_c_wrap.c_pointer) @_log_vectors.setter def _log_vectors(self, value): lib.BlitzML_set_log_vectors(self._solver_c_wrap.c_pointer, bool_t(value)) class BlitzMLSolution(object): def __init__(self, weights, bias, dual_solution, status, duality_gap, objective_value): self._weights = weights self._bias = bias self._dual_solution = dual_solution self._objective_value = objective_value self._duality_gap = duality_gap self._solution_status = status @property def weights(self): """Array of model's weight values.""" return self._weights @weights.setter def weights(self, value): self._weights = weights @property def bias(self): """Value of model's bias term.""" return self._bias @bias.setter def bias(self, value): self._bias = bias @property def dual_solution(self): """Dual solution to optimization problem.""" return self._dual_solution @dual_solution.setter def dual_solution(self, value): self._dual_solution = dual_solution @property def objective_value(self): """Objective value of solution.""" return self._objective_value @objective_value.setter def objective_value(self, value): self._objective_value = objective_value @property def duality_gap(self): """Duality gap between primal and dual solutions (which upper bounds the suboptimality of these solutions). """ return self._duality_gap @duality_gap.setter def duality_gap(self, value): self._duality_gap = duality_gap @property def solution_status(self): """Status of Blitz algorithm upon returning model.""" return self._solution_status @solution_status.setter def solution_status(self, value): self._solution_status = solution_status def compute_loss(self, A, b): """Compute the sum .. math:: \sum_i L(a_i^T w, b_i) , where L is the problem's loss function. Parameters ---------- A : numpy.ndarray or scipy.sparse matrix n x d design matrix for evaluating loss. b : numpy.ndarray Corresponding labels array of length n. Returns ------- loss : float """ Aw = self._compute_A_times_weights(A) return self._compute_loss(Aw, b) def save(self, filepath): """Save model to disk. Parameters ---------- filepath : string Location to save solution. """ with open(filepath, "wb") as outfile: pickle.dump(self, outfile) def _compute_A_times_weights(self, A): if sp.issparse(A): result = A * np.mat(self.weights).T + self.bias return np.array(result).flatten() else: return np.dot(A, self.weights) + self.bias class RegressionSolution(BlitzMLSolution): def predict(self, A): """Predict label values from feature vectors. Parameters ---------- A : numpy.ndarray or scipy.sparse matrix n x d design matrix to make predictions for. Returns ------- predictions : numpy.ndarray of length n. """ return self._compute_A_times_weights(A) class ClassificationSolution(BlitzMLSolution): def predict(self, A): """Predict label values from feature vectors. Parameters ---------- A : numpy.ndarray or scipy.sparse matrix n x d design matrix to make predictions for. Returns ------- predictions : numpy.ndarray of length n containing values +/-1.0. """ preds = self._compute_A_times_weights(A) preds[preds < 0] = -1. preds[preds >= 0] = 1. return preds
[docs]def load_solution(filepath): """Load BlitzMLSolution from disk. Parameters ---------- filepath : string Path to saved solution. Returns ------- solution : BlitzMLSolution. """ with open(filepath) as f: solution = pickle.load(f) return solution
def check_log_directory(dir_path): if dir_path == "": return if not os.path.exists(dir_path): warn("Provided log directory does not exist") elif os.listdir(dir_path): warn("Starting optimization with non-empty log directory") def check_classification_labels(b): min_b = np.min(b) max_b = np.max(b) if min_b < -1.0: msg = ("Labels vector conatins values less than -1.0, which is " "unusual. Typically labels are +/-1 for classification.") warn(msg) if max_b > 1.0: msg = ("Labels vector conatins values greater than 1.0, which is " "unusual. Typically labels are +/-1 for classification.") warn(msg) if min_b == 0.0 and max_b == 0.0: msg = "Labels vector contains only zero values." warn(msg) return if min_b >= 0.: msg = ("Labels vector contains no values less than zero, which is " "unusual. Use -1 to label negative training instances.") warn(msg) if max_b <= 0.: msg = ("Labels vector contains no values greater than zero, which is " "unusual. Use 1 to label positive training instances.") warn(msg) def check_regression_labels(b): min_b = np.min(b) max_b = np.max(b) if min_b == max_b: msg = "Labels vector contains only one unique value ({}).".format(min_b) warn(msg) def check_logreg_labels(b): min_b = np.min(b) max_b = np.max(b) invalid_labels = False if min_b < -1.0 or max_b > 1.0: invalid_labels = True else: unique_b = np.unique(b) if len(unique_b) != 2 or set(unique_b).difference({-1, 1}): invalid_labels = True if invalid_labels: msg = ("Labels vector contains values other than -1 and 1. Labels will be " "treated as 1 if greater than zero and -1 otherwise.") warn(msg) if min_b > 0.: msg = ("Labels vector contains no negative labels. Label negative training " "instances with -1.") warn(msg) elif max_b <= 0.: msg = ("Labels vector contains no positive labels. Label positive training " "instances with 1.") warn(msg)