25#if BITPIT_ENABLE_MPI==1
27#include "bitpit_common.hpp"
29#include "communications.hpp"
30#include "communications_tags.hpp"
46 : m_communicator(communicator), m_rank(-1),
47 m_recvsContinuous(false)
50 MPI_Comm_rank(m_communicator, &m_rank);
53 setTags(TAG_AUTO, TAG_AUTO, TAG_AUTO);
61 if (!m_customExchangeTag) {
65 if (!m_customDiscoverTag) {
69 if (!m_customNotificationTag) {
73 if (!m_customExchangeTag || !m_customDiscoverTag || !m_customNotificationTag) {
74 MPI_Barrier(m_communicator);
86 MPI_Comm_rank(m_communicator, &rank);
99 MPI_Comm_size(m_communicator, &nProcs);
111 return m_communicator;
173 m_customExchangeTag = (tag != TAG_AUTO);
174 if (m_customExchangeTag) {
191 m_customDiscoverTag = (tag != TAG_AUTO);
192 if (m_customDiscoverTag) {
209 m_customNotificationTag = (tag != TAG_AUTO);
210 if (m_customNotificationTag) {
211 m_notificationTag = tag;
234 return m_exchangeTag;
244 return m_discoverTag;
254 return m_notificationTag;
270 if (m_recvsContinuous == enabled) {
276 int nRecvBuffers = m_recvBuffers.size();
277 for (
int k = 0; k < nRecvBuffers; ++k) {
278 size_t size = m_recvBuffers[k].getSize();
282 m_recvsContinuous = enabled;
295 return m_recvsContinuous;
316 std::vector<long> discoverSizes(nRecvs, 0);
317 std::vector<MPI_Request> discoverRequests(nRecvs, MPI_REQUEST_NULL);
318 for (
int i = 0; i < nRecvs; ++i) {
319 int rank = m_recvRanks[i];
321 discoverSizes[i] = buffer.
getSize();
322 MPI_Issend(discoverSizes.data() + i, 1, MPI_LONG, rank, m_discoverTag, m_communicator, discoverRequests.data() + i);
326 int localDiscoverSendsCompleted = 0;
327 MPI_Request exchangeCompletedRequest;
330 int messageAvailable = 1;
331 while (messageAvailable) {
333 MPI_Iprobe(MPI_ANY_SOURCE, m_discoverTag, m_communicator, &messageAvailable, &status);
334 if (messageAvailable) {
336 MPI_Recv(&dataSize, 1, MPI_LONG, status.MPI_SOURCE, m_discoverTag, m_communicator, MPI_STATUS_IGNORE);
337 setSend(status.MPI_SOURCE, dataSize);
342 if (!localDiscoverSendsCompleted) {
343 MPI_Testall(discoverRequests.size(), discoverRequests.data(), &localDiscoverSendsCompleted, MPI_STATUSES_IGNORE);
344 if (localDiscoverSendsCompleted) {
345 MPI_Ibarrier(m_communicator, &exchangeCompletedRequest);
354 if (localDiscoverSendsCompleted) {
355 int exchangeCompleted = 0;
356 MPI_Test(&exchangeCompletedRequest, &exchangeCompleted, MPI_STATUS_IGNORE);
357 if (exchangeCompleted) {
395 std::vector<long> discoverSizes(nSends, 0);
396 std::vector<MPI_Request> discoverRequests(nSends, MPI_REQUEST_NULL);
397 for (
int i = 0; i < nSends; ++i) {
398 int rank = m_sendRanks[i];
400 discoverSizes[i] = buffer.
getSize();
401 MPI_Issend(discoverSizes.data() + i, 1, MPI_LONG, rank, m_discoverTag, m_communicator, discoverRequests.data() + i);
405 int localDiscoverSendsCompleted = 0;
406 MPI_Request exchangeCompletedRequest;
409 int messageAvailable = 1;
410 while (messageAvailable) {
412 MPI_Iprobe(MPI_ANY_SOURCE, m_discoverTag, m_communicator, &messageAvailable, &status);
413 if (messageAvailable) {
415 MPI_Recv(&dataSize, 1, MPI_LONG, status.MPI_SOURCE, m_discoverTag, m_communicator, MPI_STATUS_IGNORE);
416 setRecv(status.MPI_SOURCE, dataSize);
421 if (!localDiscoverSendsCompleted) {
422 MPI_Testall(discoverRequests.size(), discoverRequests.data(), &localDiscoverSendsCompleted, MPI_STATUSES_IGNORE);
423 if (localDiscoverSendsCompleted) {
424 MPI_Ibarrier(m_communicator, &exchangeCompletedRequest);
433 if (localDiscoverSendsCompleted) {
434 int exchangeCompleted = 0;
435 MPI_Test(&exchangeCompletedRequest, &exchangeCompleted, MPI_STATUS_IGNORE);
436 if (exchangeCompleted) {
463 if (m_sendIds.count(rank) == 0) {
471 int id = m_sendIds[rank];
472 m_sendIds.erase(rank);
473 for (
auto &entry : m_sendIds) {
474 if (entry.second >
id) {
479 m_sendRanks.erase(m_sendRanks.begin() + id);
480 m_sendRequests.erase(m_sendRequests.begin() + id);
481 m_sendBuffers.erase(m_sendBuffers.begin() + id);
491 if (m_recvIds.count(rank) == 0) {
499 int id = m_recvIds[rank];
500 m_recvIds.erase(rank);
501 for (
auto &entry : m_recvIds) {
502 if (entry.second >
id) {
507 m_recvRanks.erase(m_recvRanks.begin() + id);
508 m_recvRequests.erase(m_recvRequests.begin() + id);
509 m_recvBuffers.erase(m_recvBuffers.begin() + id);
530 m_sendRequests.clear();
531 m_sendBuffers.clear();
552 m_recvRequests.clear();
553 m_recvBuffers.clear();
568 int id = m_sendIds.size();
569 m_sendIds[rank] = id;
571 m_sendRanks.push_back(rank);
572 m_sendRequests.push_back(MPI_REQUEST_NULL);
573 m_sendBuffers.emplace_back(length);
588 int id = m_recvIds.size();
589 m_recvIds[rank] = id;
591 m_recvRanks.push_back(rank);
592 m_recvRequests.push_back(MPI_REQUEST_NULL);
593 m_recvBuffers.emplace_back(length, m_recvsContinuous);
611 if (m_sendIds.count(rank) == 0) {
620 int id = m_sendIds[rank];
621 m_sendBuffers[id].setSize(size);
634 if (m_recvIds.count(rank) == 0) {
643 int id = m_recvIds[rank];
644 m_recvBuffers[id].setSize(size);
654 return m_sendBuffers.size();
664 return m_recvBuffers.size();
698 int id = m_sendIds.at(rank);
700 return m_sendBuffers[id];
711 int id = m_recvIds.at(rank);
713 return m_recvBuffers[id];
727 int id = m_sendIds.at(dstRank);
742 for (
int rank : m_sendRanks) {
752void DataCommunicator::_startSend(
int dstRank)
755 int id = m_sendIds.at(dstRank);
761 MPI_Datatype chunkDataType = getChunkDataType(chunkSize);
763 MPI_Isend(buffer.
data(), buffer.
getChunkCount(), chunkDataType, dstRank, m_exchangeTag,
764 m_communicator, &m_sendRequests[
id]);
786 for (
int rank : m_recvRanks) {
796void DataCommunicator::_startRecv(
int srcRank)
799 int id = m_recvIds.at(srcRank);
805 MPI_Datatype chunkDataType = getChunkDataType(chunkSize);
807 MPI_Irecv(buffer.
data(), buffer.
getChunkCount(), chunkDataType, srcRank, m_exchangeTag,
808 m_communicator, &m_recvRequests[
id]);
824 std::vector<MPI_Request> requestList(m_sendRequests);
825 for (
const int rank : blackList) {
826 int id = m_sendIds.at(rank);
827 requestList[id] = MPI_REQUEST_NULL;
832 MPI_Waitany(requestList.size(), requestList.data(), &
id, MPI_STATUS_IGNORE);
833 if (
id == MPI_UNDEFINED) {
834 return MPI_UNDEFINED;
837 m_sendRequests[id] = requestList[id];
840 m_sendBuffers[id].seekg(0);
843 return m_sendRanks[id];
854 int id = m_sendIds.at(rank);
855 auto request = m_sendRequests[id];
856 if (request == MPI_REQUEST_NULL) {
860 MPI_Wait(&m_sendRequests[
id], MPI_STATUS_IGNORE);
863 m_sendBuffers[id].seekg(0);
871 if (m_sendRequests.size() == 0) {
876 MPI_Waitall(m_sendRequests.size(), m_sendRequests.data(), MPI_STATUS_IGNORE);
879 for (
auto &buffer : m_sendBuffers) {
897 std::vector<MPI_Request> requestList(m_recvRequests);
898 for (
const int rank : blackList) {
899 int id = m_recvIds.at(rank);
900 requestList[id] = MPI_REQUEST_NULL;
905 MPI_Waitany(requestList.size(), requestList.data(), &
id, MPI_STATUS_IGNORE);
906 if (
id == MPI_UNDEFINED) {
907 return MPI_UNDEFINED;
910 m_recvRequests[id] = requestList[id];
919 int rank = m_recvRanks[id];
938 int id = m_recvIds.at(rank);
939 auto request = m_recvRequests[id];
940 if (request == MPI_REQUEST_NULL) {
944 MPI_Wait(&m_recvRequests[
id], MPI_STATUS_IGNORE);
963 if (m_recvRequests.size() == 0) {
968 MPI_Waitall(m_recvRequests.size(), m_recvRequests.data(), MPI_STATUS_IGNORE);
972 if (buffer.isDouble()) {
979 for (
int rank : m_recvRanks) {
995 int id = m_sendIds[rank];
997 return (m_sendRequests[
id] != MPI_REQUEST_NULL);
1010 int id = m_recvIds[rank];
1012 return (m_recvRequests[
id] != MPI_REQUEST_NULL);
1022 if (m_sendIds.count(rank) == 0) {
1026 int id = m_sendIds[rank];
1027 if (m_sendRequests[
id] == MPI_REQUEST_NULL) {
1031 MPI_Cancel(&m_sendRequests[
id]);
1032 MPI_Request_free(&m_sendRequests[
id]);
1042 if (m_recvIds.count(rank) == 0) {
1046 int id = m_recvIds[rank];
1047 if (m_recvRequests[
id] == MPI_REQUEST_NULL) {
1051 MPI_Cancel(&m_recvRequests[
id]);
1052 MPI_Request_free(&m_recvRequests[
id]);
1069 for (
int rank : m_sendRanks) {
1119 std::vector<MPI_Request> recvNotificationRequests(nRecvNotifications);
1120 std::vector<int> remoteCancelCompleted(nRecvNotifications, 0);
1121 for (
int i = 0; i < nRecvNotifications; ++i) {
1122 int rank = m_recvRanks[i];
1123 MPI_Irecv(remoteCancelCompleted.data() + i, 1, MPI_INT, rank, m_notificationTag, m_communicator, &recvNotificationRequests[i]);
1132 std::vector<MPI_Request> sendNotificationRequests(nSendNotifications);
1133 std::vector<int> cancelCompleted(nSendNotifications, 1);
1134 for (
int i = 0; i < nSendNotifications; ++i) {
1135 int rank = m_sendRanks[i];
1136 MPI_Isend(cancelCompleted.data() + i, 1, MPI_INT, rank, m_notificationTag, m_communicator, &sendNotificationRequests[i]);
1143 MPI_Waitall(nRecvNotifications, recvNotificationRequests.data(), MPI_STATUS_IGNORE);
1144 for (
int i = 0; i < nRecvNotifications; ++i) {
1145 if (!remoteCancelCompleted[i]) {
1146 log::cout() <<
"Unable to properly cancel the sends.";
1147 MPI_Abort(m_communicator, 1);
1152 MPI_Waitall(nSendNotifications, sendNotificationRequests.data(), MPI_STATUS_IGNORE);
1154#if BITPIT_ENABLE_DEBUG
1156 MPI_Barrier(m_communicator);
1174 for (
int rank : m_recvRanks) {
1196 std::vector<MPI_Request> recvNotificationRequests(nRecvNotifications);
1197 std::vector<int> remoteCancelCompleted(nRecvNotifications, 0);
1198 for (
int i = 0; i < nRecvNotifications; ++i) {
1199 int rank = m_sendRanks[i];
1200 MPI_Irecv(remoteCancelCompleted.data() + i, 1, MPI_INT, rank, m_notificationTag, m_communicator, &recvNotificationRequests[i]);
1209 std::vector<MPI_Request> sendNotificationRequests(nSendNotifications);
1210 std::vector<int> cancelCompleted(nSendNotifications, 1);
1211 for (
int i = 0; i < nSendNotifications; ++i) {
1212 int rank = m_recvRanks[i];
1213 MPI_Isend(cancelCompleted.data() + i, 1, MPI_INT, rank, m_notificationTag, m_communicator, &sendNotificationRequests[i]);
1220 MPI_Waitall(nRecvNotifications, recvNotificationRequests.data(), MPI_STATUS_IGNORE);
1221 for (
int i = 0; i < nRecvNotifications; ++i) {
1222 if (!remoteCancelCompleted[i]) {
1223 log::cout() <<
"Unable to properly cancel the receives.";
1224 MPI_Abort(m_communicator, 1);
1229 MPI_Waitall(nSendNotifications, sendNotificationRequests.data(), MPI_STATUS_IGNORE);
1231#if BITPIT_ENABLE_DEBUG
1233 MPI_Barrier(m_communicator);
1243MPI_Datatype DataCommunicator::getChunkDataType(
int chunkSize)
const
1245 MPI_Datatype chunkDataType;
1246 if (chunkSize == 1) {
1247 chunkDataType = MPI_CHAR;
1249 MPI_Type_contiguous(chunkSize, MPI_CHAR, &chunkDataType);
1250 MPI_Type_commit(&chunkDataType);
1253 return chunkDataType;
bool seekg(std::size_t pos)
RawBufferType & getBack()
int waitAnySend(const std::vector< int > &blackList=std::vector< int >())
void clearAllSends(bool synchronous=false)
void finalize(bool synchronous=false)
void setRecv(int rank, long length=0)
const MPI_Comm & getCommunicator() const
void startRecv(int srcRank)
SendBuffer & getSendBuffer(int rank)
bool isRecvActive(int rank)
int getExchangeTag() const
const std::vector< int > & getRecvRanks() const
int getProcessorCount() const
void resizeSend(int rank, long resize)
void setTags(int exchangeTag, int discoverTag, int notificationTag)
const std::vector< int > & getSendRanks() const
bool isSendActive(int rank)
void cancelRecv(int rank)
int waitAnyRecv(const std::vector< int > &blackList=std::vector< int >())
DataCommunicator(MPI_Comm communicator)
int getNotificationTag() const
void resizeRecv(int rank, long resize)
void setSend(int rank, long length=0)
void setExchangeTag(int tag)
int getDiscoverTag() const
void cancelAllRecvs(bool synchronous=false)
void cancelSend(int rank)
void cancelAllSends(bool synchronous=false)
void setTag(int exchangeTag)
bool areRecvsContinuous()
void setNotificationTag(int tag)
void setDiscoverTag(int tag)
void clearAllRecvs(bool synchronous=false)
void setRecvsContinuous(bool enabled)
RecvBuffer & getRecvBuffer(int rank)
void startSend(int dstRank)
Buffer to be used for receive communications.
Buffer to be used for send communications.
CommunicationTags & tags()
Logger & cout(log::Level defaultSeverity, log::Visibility defaultVisibility)