cloud-sat/src/distributed/heartbeat.cpp
2023-03-30 09:26:46 +00:00

98 lines
2.7 KiB
C++

#include "heartbeat.h"
HeartBeat::HeartBeat(int num_procs) {
this->num_procs = num_procs;
this->online = new int[num_procs];
for(int i=0; i<num_procs; i++) {
this->online[i] = 1;
}
this->last_send_time = new time_t[num_procs];
for(int i=0; i<num_procs; i++) {
this->last_send_time[i] = 0;
}
this->requests = new MPI_Request[num_procs];
this->send_requests = new MPI_Request[num_procs];
for(int i=1; i<num_procs; i++) {
MPI_Irecv(&last_send_time[i], 1, MPI_INT, i, 0, MPI_COMM_WORLD, &requests[i]);
int buf[2];
buf[0] = next_online(i);
buf[1] = last_online(i);
MPI_Isend(buf, 2, MPI_INT, i, 0, MPI_COMM_WORLD, &send_requests[i]);
}
auto t = time(NULL);
}
HeartBeat::~HeartBeat() {
delete []this->online;
delete []this->last_send_time;
delete []this->requests;
}
void HeartBeat::update() {
for(int i=1; i<num_procs; i++) {
int flag;
while(MPI_Test(&requests[i], &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 1) {
printf("[leader] receive headbeat from: %d - time: %d\n", i, time(0) - last_send_time[i]);
MPI_Irecv(&last_send_time[i], 1, MPI_INT, i, 0, MPI_COMM_WORLD, &requests[i]);
}
}
time_t now = time(NULL);
for(int i=1; i<num_procs; i++) {
if(now - last_send_time[i] > 5) {
if(online[i]) printf("[leader] ERROR: worker %d offline\n", i);
online[i] = 0;
} else {
if(!online[i]) printf("[leader] RECONNECT: worker %d online\n", i);
online[i] = 1;
}
}
for(int i=1; i<num_procs; i++) {
int flag;
if(MPI_Test(&requests[i], &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 1) {
int buf[2];
buf[0] = next_online(i);
buf[1] = last_online(i);
MPI_Isend(buf, 2, MPI_INT, i, 0, MPI_COMM_WORLD, &send_requests[i]);
}
}
}
void HeartBeat::waiting_all() {
MPI_Waitall(num_procs-1, requests+1, MPI_STATUS_IGNORE);
for(int i=1; i<num_procs; i++) {
int flag;
if(MPI_Test(&requests[i], &flag, MPI_STATUS_IGNORE) == MPI_SUCCESS && flag == 1) {
int buf[2];
buf[0] = next_online(i);
buf[1] = last_online(i);
printf("buf: %d %d\n", buf[0], buf[1]);
MPI_Isend(buf, 2, MPI_INT, i, 0, MPI_COMM_WORLD, &send_requests[i]);
}
}
}
int HeartBeat::is_online(int p) {
return online[p];
}
int HeartBeat::next_online(int p) {
do {
p = (p + 1) % (num_procs - 1);
} while(!is_online(p));
return p + 1;
}
int HeartBeat::last_online(int p) {
do {
p = (p - 1 + num_procs - 1) % (num_procs - 1);
} while(!is_online(p));
return p + 1;
}