Skip to content

Commit

Permalink
Added additional APIs to better support JNI on Spark (#2032)
Browse files Browse the repository at this point in the history
* added API changes required for JNI performance optimizations (e.g. predict is 3-4x faster)

* removed commented variables

* removed commented header

* renamed method to make it obvious it is created for Spark

* fixed comment alignment

* replaced GetPrimitiveArrayCritical with GetIntArrayElements for training. fixed dead-lock on databricks
  • Loading branch information
eisber authored and guolinke committed Mar 18, 2019
1 parent 95246cd commit beeb6e0
Show file tree
Hide file tree
Showing 3 changed files with 395 additions and 3 deletions.
98 changes: 95 additions & 3 deletions include/LightGBM/c_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,25 @@ LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSR(const void* indptr,
const DatasetHandle reference,
DatasetHandle* out);


/*!
* \brief create a dataset from CSR format through callbacks
* \param get_row_funptr pointer to std::function<void(int idx, std::vector<std::pair<int, double>>& ret). CAlled for every row and expected to clear and fill ret
* \param num_rows number of rows
* \param num_col number of columns
* \param parameters additional parameters
* \param reference used to align bin mapper with other dataset, nullptr means don't used
* \param out created dataset
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_DatasetCreateFromCSRFunc(void* get_row_funptr,
int num_rows,
int64_t num_col,
const char* parameters,
const DatasetHandle reference,
DatasetHandle* out);


/*!
* \brief create a dataset from CSC format
* \param col_ptr pointer to col headers
Expand Down Expand Up @@ -622,7 +641,7 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterCalcNumPredict(BoosterHandle handle,
/*!
* \brief make prediction for an new data set
* Note: should pre-allocate memory for out_result,
* for noraml and raw score: its length is equal to num_class * num_data
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param indptr pointer to row headers
Expand Down Expand Up @@ -658,10 +677,51 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForCSR(BoosterHandle handle,
int64_t* out_len,
double* out_result);

/*!
* \brief make prediction for an new data set. This method re-uses the internal predictor structure
* from previous calls and is optimized for single row invocation.
* Note: should pre-allocate memory for out_result,
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param indptr pointer to row headers
* \param indptr_type type of indptr, can be C_API_DTYPE_INT32 or C_API_DTYPE_INT64
* \param indices findex
* \param data fvalue
* \param data_type type of data pointer, can be C_API_DTYPE_FLOAT32 or C_API_DTYPE_FLOAT64
* \param nindptr number of rows in the matrix + 1
* \param nelem number of nonzero elements in the matrix
* \param num_col number of columns; when it's set to 0, then guess from data
* \param predict_type
* C_API_PREDICT_NORMAL: normal prediction, with transform (if needed)
* C_API_PREDICT_RAW_SCORE: raw score
* C_API_PREDICT_LEAF_INDEX: leaf index
* \param num_iteration number of iteration for prediction, <= 0 means no limit
* \param parameter Other parameters for the parameters, e.g. early stopping for prediction.
* \param out_len len of output result
* \param out_result used to set a pointer to array, should allocate memory before call this function
* \return 0 when succeed, -1 when failure happens
*/
LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForCSRSingleRow(BoosterHandle handle,
const void* indptr,
int indptr_type,
const int32_t* indices,
const void* data,
int data_type,
int64_t nindptr,
int64_t nelem,
int64_t num_col,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result);


/*!
* \brief make prediction for an new data set
* Note: should pre-allocate memory for out_result,
* for noraml and raw score: its length is equal to num_class * num_data
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param col_ptr pointer to col headers
Expand Down Expand Up @@ -700,7 +760,7 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForCSC(BoosterHandle handle,
/*!
* \brief make prediction for an new data set
* Note: should pre-allocate memory for out_result,
* for noraml and raw score: its length is equal to num_class * num_data
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param data pointer to the data space
Expand Down Expand Up @@ -730,6 +790,38 @@ LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForMat(BoosterHandle handle,
int64_t* out_len,
double* out_result);

/*!
* \brief make prediction for an new data set. This method re-uses the internal predictor structure
* from previous calls and is optimized for single row invocation.
* Note: should pre-allocate memory for out_result,
* for normal and raw score: its length is equal to num_class * num_data
* for leaf index, its length is equal to num_class * num_data * num_iteration
* \param handle handle
* \param data pointer to the data space
* \param data_type type of data pointer, can be C_API_DTYPE_FLOAT32 or C_API_DTYPE_FLOAT64
* \param nrow number of rows
* \param ncol number columns
* \param is_row_major 1 for row major, 0 for column major
* \param predict_type
* C_API_PREDICT_NORMAL: normal prediction, with transform (if needed)
* C_API_PREDICT_RAW_SCORE: raw score
* C_API_PREDICT_LEAF_INDEX: leaf index
* \param num_iteration number of iteration for prediction, <= 0 means no limit
* \param parameter Other parameters for the parameters, e.g. early stopping for prediction.
* \param out_len len of output result
* \param out_result used to set a pointer to array, should allocate memory before call this function
* \return 0 when succeed, -1 when failure happens
*/LIGHTGBM_C_EXPORT int LGBM_BoosterPredictForMatSingleRow(BoosterHandle handle,
const void* data,
int data_type,
int ncol,
int is_row_major,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result);

/*!
* \brief save model into file
* \param handle handle
Expand Down
164 changes: 164 additions & 0 deletions src/c_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,41 @@ class Booster {
boosting_->RollbackOneIter();
}

void PredictSingleRow(int num_iteration, int predict_type,
std::function<std::vector<std::pair<int, double>>(int row_idx)> get_row_fun,
const Config& config,
double* out_result, int64_t* out_len) {
std::lock_guard<std::mutex> lock(mutex_);

if (single_row_predictor_.get() == nullptr) {
bool is_predict_leaf = false;
bool is_raw_score = false;
bool predict_contrib = false;
if (predict_type == C_API_PREDICT_LEAF_INDEX) {
is_predict_leaf = true;
} else if (predict_type == C_API_PREDICT_RAW_SCORE) {
is_raw_score = true;
} else if (predict_type == C_API_PREDICT_CONTRIB) {
predict_contrib = true;
} else {
is_raw_score = false;
}

// TODO: config could be optimized away... (maybe using lambda callback?)
single_row_predictor_.reset(new Predictor(boosting_.get(), num_iteration, is_raw_score, is_predict_leaf, predict_contrib,
config.pred_early_stop, config.pred_early_stop_freq, config.pred_early_stop_margin));
single_row_num_pred_in_one_row_ = boosting_->NumPredictOneRow(num_iteration, is_predict_leaf, predict_contrib);
single_row_predict_function_ = single_row_predictor_->GetPredictFunction();
}

auto one_row = get_row_fun(0);
auto pred_wrt_ptr = out_result;
single_row_predict_function_(one_row, pred_wrt_ptr);

*out_len = single_row_num_pred_in_one_row_;
}


void Predict(int num_iteration, int predict_type, int nrow,
std::function<std::vector<std::pair<int, double>>(int row_idx)> get_row_fun,
const Config& config,
Expand Down Expand Up @@ -326,6 +361,10 @@ class Booster {
private:
const Dataset* train_data_;
std::unique_ptr<Boosting> boosting_;
std::unique_ptr<Predictor> single_row_predictor_;
PredictFunction single_row_predict_function_;
int64_t single_row_num_pred_in_one_row_;

/*! \brief All configs */
Config config_;
/*! \brief Metric for training data */
Expand Down Expand Up @@ -665,6 +704,77 @@ int LGBM_DatasetCreateFromCSR(const void* indptr,
API_END();
}

int LGBM_DatasetCreateFromCSRFunc(void* get_row_funptr,
int num_rows,
int64_t num_col,
const char* parameters,
const DatasetHandle reference,
DatasetHandle* out) {

API_BEGIN();

auto get_row_fun = *static_cast<std::function<void(int idx, std::vector<std::pair<int, double>>&)>*>(get_row_funptr);

auto param = Config::Str2Map(parameters);
Config config;
config.Set(param);
if (config.num_threads > 0) {
omp_set_num_threads(config.num_threads);
}
std::unique_ptr<Dataset> ret;
int32_t nrow = num_rows;
if (reference == nullptr) {
// sample data first
Random rand(config.data_random_seed);
int sample_cnt = static_cast<int>(nrow < config.bin_construct_sample_cnt ? nrow : config.bin_construct_sample_cnt);
auto sample_indices = rand.Sample(nrow, sample_cnt);
sample_cnt = static_cast<int>(sample_indices.size());
std::vector<std::vector<double>> sample_values(num_col);
std::vector<std::vector<int>> sample_idx(num_col);
// local buffer to re-use memory
std::vector<std::pair<int, double>> buffer;
for (size_t i = 0; i < sample_indices.size(); ++i) {
auto idx = sample_indices[i];
get_row_fun(static_cast<int>(idx), buffer);
for (std::pair<int, double>& inner_data : buffer) {
CHECK(inner_data.first < num_col);
if (std::fabs(inner_data.second) > kZeroThreshold || std::isnan(inner_data.second)) {
sample_values[inner_data.first].emplace_back(inner_data.second);
sample_idx[inner_data.first].emplace_back(static_cast<int>(i));
}
}
}
DatasetLoader loader(config, nullptr, 1, nullptr);
ret.reset(loader.CostructFromSampleData(Common::Vector2Ptr<double>(sample_values).data(),
Common::Vector2Ptr<int>(sample_idx).data(),
static_cast<int>(sample_values.size()),
Common::VectorSize<double>(sample_values).data(),
sample_cnt, nrow));
} else {
ret.reset(new Dataset(nrow));
ret->CreateValid(
reinterpret_cast<const Dataset*>(reference));
}

OMP_INIT_EX();
std::vector<std::pair<int, double>> threadBuffer;
#pragma omp parallel for schedule(static) private(threadBuffer)
for (int i = 0; i < num_rows; ++i) {
OMP_LOOP_EX_BEGIN();
{
const int tid = omp_get_thread_num();
get_row_fun(i, threadBuffer);

ret->PushOneRow(tid, i, threadBuffer);
}
OMP_LOOP_EX_END();
}
OMP_THROW_EX();
ret->FinishLoad();
*out = ret.release();
API_END();
}

int LGBM_DatasetCreateFromCSC(const void* col_ptr,
int col_ptr_type,
const int32_t* indices,
Expand Down Expand Up @@ -1175,6 +1285,35 @@ int LGBM_BoosterPredictForCSR(BoosterHandle handle,
API_END();
}

int LGBM_BoosterPredictForCSRSingleRow(BoosterHandle handle,
const void* indptr,
int indptr_type,
const int32_t* indices,
const void* data,
int data_type,
int64_t nindptr,
int64_t nelem,
int64_t,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result) {
API_BEGIN();
auto param = Config::Str2Map(parameter);
Config config;
config.Set(param);
if (config.num_threads > 0) {
omp_set_num_threads(config.num_threads);
}
Booster* ref_booster = reinterpret_cast<Booster*>(handle);
auto get_row_fun = RowFunctionFromCSR(indptr, indptr_type, indices, data, data_type, nindptr, nelem);
ref_booster->PredictSingleRow(num_iteration, predict_type, get_row_fun,
config, out_result, out_len);
API_END();
}


int LGBM_BoosterPredictForCSC(BoosterHandle handle,
const void* col_ptr,
int col_ptr_type,
Expand Down Expand Up @@ -1252,6 +1391,31 @@ int LGBM_BoosterPredictForMat(BoosterHandle handle,
API_END();
}

int LGBM_BoosterPredictForMatSingleRow(BoosterHandle handle,
const void* data,
int data_type,
int32_t ncol,
int is_row_major,
int predict_type,
int num_iteration,
const char* parameter,
int64_t* out_len,
double* out_result) {
API_BEGIN();
auto param = Config::Str2Map(parameter);
Config config;
config.Set(param);
if (config.num_threads > 0) {
omp_set_num_threads(config.num_threads);
}
Booster* ref_booster = reinterpret_cast<Booster*>(handle);
auto get_row_fun = RowPairFunctionFromDenseMatric(data, 1, ncol, data_type, is_row_major);
ref_booster->PredictSingleRow(num_iteration, predict_type, get_row_fun,
config, out_result, out_len);
API_END();
}


int LGBM_BoosterSaveModel(BoosterHandle handle,
int start_iteration,
int num_iteration,
Expand Down
Loading

0 comments on commit beeb6e0

Please sign in to comment.