修复了BUG,环形子句传递完工

This commit is contained in:
YuhangQ 2023-04-07 11:47:15 +00:00
parent 8406414f06
commit 2080eb5e37
6 changed files with 140 additions and 126 deletions

2
run.sh
View File

@ -4,4 +4,4 @@
#valgrind
make -j 16 && mpirun -np 4 --allow-run-as-root valgrind ./light -i data/class_1_easy_10_0.cnf --share=1 --threads=32 --times=1000
make -j 16 && mpirun -np 4 --allow-run-as-root ./light -i data/class_1_easy_10_0.cnf --share=1 --threads=32 --times=1000

View File

@ -81,6 +81,11 @@ void worker_main(light* S, int num_procs, int rank) {
}
}
int flag;
if(MPI_Test(solved_request, &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 0) {
MPI_Cancel(solved_request);
}
} else if(res == 20) {
int flag;
int is_sat = 0;
@ -89,6 +94,14 @@ void worker_main(light* S, int num_procs, int rank) {
// when unknown do nothing.
}
int flag;
if(MPI_Test(model_request, &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 0) {
MPI_Cancel(model_request);
}
if(MPI_Test(&S->terminal_request, &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 0) {
MPI_Cancel(&S->terminal_request);
}
//delete(S);
MPI_Barrier(MPI_COMM_WORLD);

View File

@ -76,7 +76,7 @@ public:
void diversity_workers();
void parse_input();
int run();
void share();
//void share();
int solve();
void terminate_workers();
void print_model();

View File

@ -140,7 +140,7 @@ void light::parse_input() {
}
int light::solve() {
// printf("c -----------------solve start----------------------\n");
//printf("c -----------------solve start----------------------\n");
pthread_t *ptr = new pthread_t[OPT(threads)];
for (int i = 0; i < OPT(threads); i++) {
@ -151,9 +151,26 @@ int light::solve() {
auto clk_sol_st = std::chrono::high_resolution_clock::now();
int pre_time = std::chrono::duration_cast<std::chrono::seconds>(clk_sol_st - clk_st).count();
int sol_thd = 0, intv_time = OPT(reset_time);
// 初始化共享子句类
sharer* s;
if(OPT(share)) {
s = new sharer(0, OPT(share_intv), OPT(share_lits), OPT(DPS));
for (int j = 0; j < OPT(threads); j++) {
s->producers.push(workers[j]);
s->consumers.push(workers[j]);
workers[j]->in_sharer = s;
}
s->clause_sharing_init();
}
while (!terminated) {
usleep(100000);
if(OPT(share)) s->do_clause_sharing();
int flag;
// when getting terminate signal
if(MPI_Test(&terminal_request, &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 1) {
@ -175,15 +192,6 @@ int light::solve() {
pthread_join(ptr[i], NULL);
}
// 等待 share 进程结束
if(OPT(share)) {
//pthread_create(&ptr[i], NULL, share_worker,);
//pthread_join(&sharers[0], NULL);
}
// printf("ending join\n");
if (result == 10)
workers[winner_id]->model.copyTo(model);
@ -204,12 +212,8 @@ int light::run() {
diversity_workers();
parse_input();
if (OPT(share)) share();
int res = solve();
if(OPT(share)) {
pthread_join(sharer_ptrs[0], NULL);
}
int res = solve();
return res;
}

View File

@ -6,11 +6,17 @@
#include "../distributed/comm_tag.h"
#include <boost/thread/thread.hpp>
const int BUF_SIZE = 1024 * 1024;
int nums = 0;
double share_time = 0;
int num_procs, rank;
const int BUF_SIZE = 100 * 1024 * 1024;
std::vector<std::pair<MPI_Request*, int*>> send_data_struct;
MPI_Request receive_request;
int buf[BUF_SIZE];
void share_clauses_to_next_node(const vec<clause_store *> &cls) {
// 清理 send_data_struct把发送完毕的发送数据结构清理掉
@ -113,93 +119,79 @@ bool receive_clauses_from_last_node(vec<clause_store*> &clauses) {
return received;
}
void * share_worker(void *arg) {
int nums = 0;
sharer * sq = (sharer *)arg;
auto clk_st = std::chrono::high_resolution_clock::now();
double share_time = 0;
int num_procs, rank;
void sharer::clause_sharing_init() {
MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int from = (rank - 2 + num_procs - 1) % (num_procs - 1) + 1;
MPI_Irecv(buf, BUF_SIZE, MPI_INT, from, SHARE_CLAUSES_TAG, MPI_COMM_WORLD, &receive_request);
}
while (true) {
++nums;
usleep(sq->share_intv);
auto clk_now = std::chrono::high_resolution_clock::now();
int solve_time = std::chrono::duration_cast<std::chrono::milliseconds>(clk_now - clk_st).count();
//LOGGER->info("round %v, time: %v.%v", nums, solve_time / 1000, solve_time % 1000);
printf("c [worker] round %d, time: %d.%d\n", nums, solve_time / 1000, solve_time % 1000);
if (terminated) {
MPI_Cancel(&receive_request);
for(int i=0; i<send_data_struct.size(); i++) {
auto& request = send_data_struct[i].first;
auto& send_buf = send_data_struct[i].second;
MPI_Cancel(request);
delete []send_buf;
}
break;
}
// printf("start sharing %d\n", sq->share_intv);
for (int i = 0; i < sq->producers.size(); i++) {
sq->cls.clear();
sq->producers[i]->export_clauses_to(sq->cls);
//printf("c size %d\n", sq->cls.size());
int number = sq->cls.size();
printf("c [worker] thread-%d: get %d exported clauses\n", i, number);
//分享当前节点产生的子句
if(sq->cls.size() > 0) share_clauses_to_next_node(sq->cls);
// 导入外部网络传输的子句
vec<clause_store*> clauses;
if(receive_clauses_from_last_node(clauses)) {
for (int j = 0; j < sq->consumers.size(); j++) {
for (int k = 0; k < clauses.size(); k++)
clauses[k]->increase_refs(1);
sq->consumers[j]->import_clauses_from(clauses);
}
// 传递外部网络传输的子句给下个节点
share_clauses_to_next_node(clauses);
for (int k = 0; k < clauses.size(); k++) {
clauses[k]->free_clause();
}
}
// 导入当前节点产生的子句
int percent = sq->sort_clauses(i);
if (percent < 75) {
sq->producers[i]->broaden_export_limit();
}
else if (percent > 98) {
sq->producers[i]->restrict_export_limit();
}
for (int j = 0; j < sq->consumers.size(); j++) {
if (sq->producers[i]->id == sq->consumers[j]->id) continue;
for (int k = 0; k < sq->cls.size(); k++)
sq->cls[k]->increase_refs(1);
sq->consumers[j]->import_clauses_from(sq->cls);
}
for (int k = 0; k < sq->cls.size(); k++) {
sq->cls[k]->free_clause();
}
}
auto clk_ed = std::chrono::high_resolution_clock::now();
share_time += 0.001 * std::chrono::duration_cast<std::chrono::milliseconds>(clk_ed - clk_now).count();
}
void sharer::clause_sharing_end() {
printf("c sharing nums: %d\nc sharing time: %.2lf\n", nums, share_time);
// if (terminated) puts("terminated set to 1");
return NULL;
}
void sharer::do_clause_sharing() {
static auto clk_st = std::chrono::high_resolution_clock::now();
++nums;
auto clk_now = std::chrono::high_resolution_clock::now();
int solve_time = std::chrono::duration_cast<std::chrono::milliseconds>(clk_now - clk_st).count();
printf("c [worker] round %d, time: %d.%d\n", nums, solve_time / 1000, solve_time % 1000);
// printf("start sharing %d\n", sq->share_intv);
for (int i = 0; i < producers.size(); i++) {
cls.clear();
producers[i]->export_clauses_to(cls);
//printf("c size %d\n", sq->cls.size());
int number = cls.size();
printf("c [worker] thread-%d: get %d exported clauses\n", i, number);
//分享当前节点产生的子句
if(cls.size() > 0) share_clauses_to_next_node(cls);
// 导入外部网络传输的子句
vec<clause_store*> clauses;
if(receive_clauses_from_last_node(clauses)) {
for (int j = 0; j < consumers.size(); j++) {
for (int k = 0; k < clauses.size(); k++)
clauses[k]->increase_refs(1);
consumers[j]->import_clauses_from(clauses);
}
// 传递外部网络传输的子句给下个节点
share_clauses_to_next_node(clauses);
for (int k = 0; k < clauses.size(); k++) {
clauses[k]->free_clause();
}
}
// 导入当前节点产生的子句
int percent = sort_clauses(i);
if (percent < 75) {
producers[i]->broaden_export_limit();
}
else if (percent > 98) {
producers[i]->restrict_export_limit();
}
for (int j = 0; j < consumers.size(); j++) {
if (producers[i]->id == consumers[j]->id) continue;
for (int k = 0; k < cls.size(); k++)
cls[k]->increase_refs(1);
consumers[j]->import_clauses_from(cls);
}
for (int k = 0; k < cls.size(); k++) {
cls[k]->free_clause();
}
}
auto clk_ed = std::chrono::high_resolution_clock::now();
share_time += 0.001 * std::chrono::duration_cast<std::chrono::milliseconds>(clk_ed - clk_now).count();
}
int sharer::import_clauses(int id) {
@ -265,32 +257,32 @@ int sharer::sort_clauses(int x) {
return (share_lits - space) * 100 / share_lits;
}
void light::share() {
// printf("c sharing start\n");
if (OPT(DPS)) {
sharer* s = new sharer(0, OPT(share_intv), OPT(share_lits), OPT(DPS));
s->margin = OPT(margin);
for (int j = 0; j < OPT(threads); j++) {
s->producers.push(workers[j]);
workers[j]->in_sharer = s;
}
sharers.push(s);
}
else {
int sharers_number = 1;
for (int i = 0; i < sharers_number; i++) {
sharer* s = new sharer(i, OPT(share_intv), OPT(share_lits), OPT(DPS));
for (int j = 0; j < OPT(threads); j++) {
s->producers.push(workers[j]);
s->consumers.push(workers[j]);
workers[j]->in_sharer = s;
}
sharers.push(s);
}
// void light::share() {
// // printf("c sharing start\n");
// if (OPT(DPS)) {
// sharer* s = new sharer(0, OPT(share_intv), OPT(share_lits), OPT(DPS));
// s->margin = OPT(margin);
// for (int j = 0; j < OPT(threads); j++) {
// s->producers.push(workers[j]);
// workers[j]->in_sharer = s;
// }
// sharers.push(s);
// }
// else {
// int sharers_number = 1;
// for (int i = 0; i < sharers_number; i++) {
// sharer* s = new sharer(i, OPT(share_intv), OPT(share_lits), OPT(DPS));
// for (int j = 0; j < OPT(threads); j++) {
// s->producers.push(workers[j]);
// s->consumers.push(workers[j]);
// workers[j]->in_sharer = s;
// }
// sharers.push(s);
// }
sharer_ptrs = new pthread_t[sharers_number];
for (int i = 0; i < sharers_number; i++) {
pthread_create(&sharer_ptrs[i], NULL, share_worker, sharers[i]);
}
}
}
// sharer_ptrs = new pthread_t[sharers_number];
// for (int i = 0; i < sharers_number; i++) {
// pthread_create(&sharer_ptrs[i], NULL, share_worker, sharers[i]);
// }
// }
// }

View File

@ -26,6 +26,11 @@ public:
id = idx;
waitings = terminated = 0;
}
void do_clause_sharing();
void clause_sharing_init();
void clause_sharing_end();
void set_terminated() {
boost::mutex::scoped_lock lock(mtx);
terminated = 1;