From 2080eb5e37b60e53813ba89716e51654e3796add Mon Sep 17 00:00:00 2001 From: YuhangQ Date: Fri, 7 Apr 2023 11:47:15 +0000 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BA=86BUG=EF=BC=8C?= =?UTF-8?q?=E7=8E=AF=E5=BD=A2=E5=AD=90=E5=8F=A5=E4=BC=A0=E9=80=92=E5=AE=8C?= =?UTF-8?q?=E5=B7=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- run.sh | 2 +- src/distributed/worker.hpp | 13 +++ src/light.hpp | 2 +- src/solve.cpp | 34 +++--- src/workers/sharer.cpp | 210 ++++++++++++++++++------------------- src/workers/sharer.hpp | 5 + 6 files changed, 140 insertions(+), 126 deletions(-) diff --git a/run.sh b/run.sh index 847f599..12d7f5e 100755 --- a/run.sh +++ b/run.sh @@ -4,4 +4,4 @@ #valgrind -make -j 16 && mpirun -np 4 --allow-run-as-root valgrind ./light -i data/class_1_easy_10_0.cnf --share=1 --threads=32 --times=1000 \ No newline at end of file +make -j 16 && mpirun -np 4 --allow-run-as-root ./light -i data/class_1_easy_10_0.cnf --share=1 --threads=32 --times=1000 \ No newline at end of file diff --git a/src/distributed/worker.hpp b/src/distributed/worker.hpp index ce1f9d4..65681ca 100644 --- a/src/distributed/worker.hpp +++ b/src/distributed/worker.hpp @@ -81,6 +81,11 @@ void worker_main(light* S, int num_procs, int rank) { } } + int flag; + if(MPI_Test(solved_request, &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 0) { + MPI_Cancel(solved_request); + } + } else if(res == 20) { int flag; int is_sat = 0; @@ -89,6 +94,14 @@ void worker_main(light* S, int num_procs, int rank) { // when unknown do nothing. } + int flag; + if(MPI_Test(model_request, &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 0) { + MPI_Cancel(model_request); + } + if(MPI_Test(&S->terminal_request, &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 0) { + MPI_Cancel(&S->terminal_request); + } + //delete(S); MPI_Barrier(MPI_COMM_WORLD); diff --git a/src/light.hpp b/src/light.hpp index 16ff6c4..7dc250a 100644 --- a/src/light.hpp +++ b/src/light.hpp @@ -76,7 +76,7 @@ public: void diversity_workers(); void parse_input(); int run(); - void share(); + //void share(); int solve(); void terminate_workers(); void print_model(); diff --git a/src/solve.cpp b/src/solve.cpp index d998bdb..b95e3a1 100644 --- a/src/solve.cpp +++ b/src/solve.cpp @@ -140,7 +140,7 @@ void light::parse_input() { } int light::solve() { - // printf("c -----------------solve start----------------------\n"); + //printf("c -----------------solve start----------------------\n"); pthread_t *ptr = new pthread_t[OPT(threads)]; for (int i = 0; i < OPT(threads); i++) { @@ -151,9 +151,26 @@ int light::solve() { auto clk_sol_st = std::chrono::high_resolution_clock::now(); int pre_time = std::chrono::duration_cast(clk_sol_st - clk_st).count(); int sol_thd = 0, intv_time = OPT(reset_time); + + // 初始化共享子句类 + sharer* s; + if(OPT(share)) { + s = new sharer(0, OPT(share_intv), OPT(share_lits), OPT(DPS)); + for (int j = 0; j < OPT(threads); j++) { + s->producers.push(workers[j]); + s->consumers.push(workers[j]); + workers[j]->in_sharer = s; + } + s->clause_sharing_init(); + } + + + while (!terminated) { usleep(100000); + if(OPT(share)) s->do_clause_sharing(); + int flag; // when getting terminate signal if(MPI_Test(&terminal_request, &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 1) { @@ -174,15 +191,6 @@ int light::solve() { for (int i = 0; i < OPT(threads); i++) { pthread_join(ptr[i], NULL); } - - - // 等待 share 进程结束 - if(OPT(share)) { - //pthread_create(&ptr[i], NULL, share_worker,); - //pthread_join(&sharers[0], NULL); - } - - // printf("ending join\n"); if (result == 10) @@ -204,12 +212,8 @@ int light::run() { diversity_workers(); parse_input(); - if (OPT(share)) share(); - int res = solve(); - if(OPT(share)) { - pthread_join(sharer_ptrs[0], NULL); - } + int res = solve(); return res; } diff --git a/src/workers/sharer.cpp b/src/workers/sharer.cpp index 2998a49..85bd068 100644 --- a/src/workers/sharer.cpp +++ b/src/workers/sharer.cpp @@ -6,11 +6,17 @@ #include "../distributed/comm_tag.h" #include -const int BUF_SIZE = 1024 * 1024; + +int nums = 0; +double share_time = 0; +int num_procs, rank; + +const int BUF_SIZE = 100 * 1024 * 1024; std::vector> send_data_struct; MPI_Request receive_request; int buf[BUF_SIZE]; + void share_clauses_to_next_node(const vec &cls) { // 清理 send_data_struct,把发送完毕的发送数据结构清理掉 @@ -113,93 +119,79 @@ bool receive_clauses_from_last_node(vec &clauses) { return received; } -void * share_worker(void *arg) { - int nums = 0; - sharer * sq = (sharer *)arg; - auto clk_st = std::chrono::high_resolution_clock::now(); - double share_time = 0; - int num_procs, rank; + +void sharer::clause_sharing_init() { MPI_Comm_size(MPI_COMM_WORLD, &num_procs); MPI_Comm_rank(MPI_COMM_WORLD, &rank); - int from = (rank - 2 + num_procs - 1) % (num_procs - 1) + 1; MPI_Irecv(buf, BUF_SIZE, MPI_INT, from, SHARE_CLAUSES_TAG, MPI_COMM_WORLD, &receive_request); +} - while (true) { - ++nums; - usleep(sq->share_intv); - auto clk_now = std::chrono::high_resolution_clock::now(); - int solve_time = std::chrono::duration_cast(clk_now - clk_st).count(); - //LOGGER->info("round %v, time: %v.%v", nums, solve_time / 1000, solve_time % 1000); - - printf("c [worker] round %d, time: %d.%d\n", nums, solve_time / 1000, solve_time % 1000); - if (terminated) { - - MPI_Cancel(&receive_request); - for(int i=0; ishare_intv); - for (int i = 0; i < sq->producers.size(); i++) { - sq->cls.clear(); - sq->producers[i]->export_clauses_to(sq->cls); - - //printf("c size %d\n", sq->cls.size()); - int number = sq->cls.size(); - - printf("c [worker] thread-%d: get %d exported clauses\n", i, number); - - //分享当前节点产生的子句 - if(sq->cls.size() > 0) share_clauses_to_next_node(sq->cls); - - // 导入外部网络传输的子句 - vec clauses; - if(receive_clauses_from_last_node(clauses)) { - for (int j = 0; j < sq->consumers.size(); j++) { - for (int k = 0; k < clauses.size(); k++) - clauses[k]->increase_refs(1); - sq->consumers[j]->import_clauses_from(clauses); - } - - // 传递外部网络传输的子句给下个节点 - share_clauses_to_next_node(clauses); - - for (int k = 0; k < clauses.size(); k++) { - clauses[k]->free_clause(); - } - } - - // 导入当前节点产生的子句 - int percent = sq->sort_clauses(i); - if (percent < 75) { - sq->producers[i]->broaden_export_limit(); - } - else if (percent > 98) { - sq->producers[i]->restrict_export_limit(); - } - - for (int j = 0; j < sq->consumers.size(); j++) { - if (sq->producers[i]->id == sq->consumers[j]->id) continue; - for (int k = 0; k < sq->cls.size(); k++) - sq->cls[k]->increase_refs(1); - sq->consumers[j]->import_clauses_from(sq->cls); - } - for (int k = 0; k < sq->cls.size(); k++) { - sq->cls[k]->free_clause(); - } - } - auto clk_ed = std::chrono::high_resolution_clock::now(); - share_time += 0.001 * std::chrono::duration_cast(clk_ed - clk_now).count(); - } +void sharer::clause_sharing_end() { printf("c sharing nums: %d\nc sharing time: %.2lf\n", nums, share_time); - // if (terminated) puts("terminated set to 1"); - return NULL; +} + +void sharer::do_clause_sharing() { + + static auto clk_st = std::chrono::high_resolution_clock::now(); + + ++nums; + auto clk_now = std::chrono::high_resolution_clock::now(); + int solve_time = std::chrono::duration_cast(clk_now - clk_st).count(); + printf("c [worker] round %d, time: %d.%d\n", nums, solve_time / 1000, solve_time % 1000); + // printf("start sharing %d\n", sq->share_intv); + for (int i = 0; i < producers.size(); i++) { + cls.clear(); + producers[i]->export_clauses_to(cls); + + //printf("c size %d\n", sq->cls.size()); + int number = cls.size(); + + printf("c [worker] thread-%d: get %d exported clauses\n", i, number); + + //分享当前节点产生的子句 + if(cls.size() > 0) share_clauses_to_next_node(cls); + + // 导入外部网络传输的子句 + vec clauses; + if(receive_clauses_from_last_node(clauses)) { + for (int j = 0; j < consumers.size(); j++) { + for (int k = 0; k < clauses.size(); k++) + clauses[k]->increase_refs(1); + consumers[j]->import_clauses_from(clauses); + } + + // 传递外部网络传输的子句给下个节点 + share_clauses_to_next_node(clauses); + + for (int k = 0; k < clauses.size(); k++) { + clauses[k]->free_clause(); + } + } + + // 导入当前节点产生的子句 + int percent = sort_clauses(i); + if (percent < 75) { + producers[i]->broaden_export_limit(); + } + else if (percent > 98) { + producers[i]->restrict_export_limit(); + } + + for (int j = 0; j < consumers.size(); j++) { + if (producers[i]->id == consumers[j]->id) continue; + for (int k = 0; k < cls.size(); k++) + cls[k]->increase_refs(1); + consumers[j]->import_clauses_from(cls); + } + for (int k = 0; k < cls.size(); k++) { + cls[k]->free_clause(); + } + } + + auto clk_ed = std::chrono::high_resolution_clock::now(); + share_time += 0.001 * std::chrono::duration_cast(clk_ed - clk_now).count(); } int sharer::import_clauses(int id) { @@ -265,32 +257,32 @@ int sharer::sort_clauses(int x) { return (share_lits - space) * 100 / share_lits; } -void light::share() { - // printf("c sharing start\n"); - if (OPT(DPS)) { - sharer* s = new sharer(0, OPT(share_intv), OPT(share_lits), OPT(DPS)); - s->margin = OPT(margin); - for (int j = 0; j < OPT(threads); j++) { - s->producers.push(workers[j]); - workers[j]->in_sharer = s; - } - sharers.push(s); - } - else { - int sharers_number = 1; - for (int i = 0; i < sharers_number; i++) { - sharer* s = new sharer(i, OPT(share_intv), OPT(share_lits), OPT(DPS)); - for (int j = 0; j < OPT(threads); j++) { - s->producers.push(workers[j]); - s->consumers.push(workers[j]); - workers[j]->in_sharer = s; - } - sharers.push(s); - } +// void light::share() { +// // printf("c sharing start\n"); +// if (OPT(DPS)) { +// sharer* s = new sharer(0, OPT(share_intv), OPT(share_lits), OPT(DPS)); +// s->margin = OPT(margin); +// for (int j = 0; j < OPT(threads); j++) { +// s->producers.push(workers[j]); +// workers[j]->in_sharer = s; +// } +// sharers.push(s); +// } +// else { +// int sharers_number = 1; +// for (int i = 0; i < sharers_number; i++) { +// sharer* s = new sharer(i, OPT(share_intv), OPT(share_lits), OPT(DPS)); +// for (int j = 0; j < OPT(threads); j++) { +// s->producers.push(workers[j]); +// s->consumers.push(workers[j]); +// workers[j]->in_sharer = s; +// } +// sharers.push(s); +// } - sharer_ptrs = new pthread_t[sharers_number]; - for (int i = 0; i < sharers_number; i++) { - pthread_create(&sharer_ptrs[i], NULL, share_worker, sharers[i]); - } - } -} \ No newline at end of file +// sharer_ptrs = new pthread_t[sharers_number]; +// for (int i = 0; i < sharers_number; i++) { +// pthread_create(&sharer_ptrs[i], NULL, share_worker, sharers[i]); +// } +// } +// } \ No newline at end of file diff --git a/src/workers/sharer.hpp b/src/workers/sharer.hpp index bcbd0f9..234e130 100644 --- a/src/workers/sharer.hpp +++ b/src/workers/sharer.hpp @@ -26,6 +26,11 @@ public: id = idx; waitings = terminated = 0; } + + void do_clause_sharing(); + void clause_sharing_init(); + void clause_sharing_end(); + void set_terminated() { boost::mutex::scoped_lock lock(mtx); terminated = 1;