Loading...
Searching...
No Matches
communications.cpp
1/*---------------------------------------------------------------------------*\
2 *
3 * bitpit
4 *
5 * Copyright (C) 2015-2021 OPTIMAD engineering Srl
6 *
7 * -------------------------------------------------------------------------
8 * License
9 * This file is part of bitpit.
10 *
11 * bitpit is free software: you can redistribute it and/or modify it
12 * under the terms of the GNU Lesser General Public License v3 (LGPL)
13 * as published by the Free Software Foundation.
14 *
15 * bitpit is distributed in the hope that it will be useful, but WITHOUT
16 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
17 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
18 * License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public License
21 * along with bitpit. If not, see <http://www.gnu.org/licenses/>.
22 *
23\*---------------------------------------------------------------------------*/
24
25#if BITPIT_ENABLE_MPI==1
26
27#include "bitpit_common.hpp"
28
29#include "communications.hpp"
30#include "communications_tags.hpp"
31
32namespace bitpit {
33
46 : m_communicator(communicator), m_rank(-1),
47 m_recvsContinuous(false)
48{
49 // Get MPI information
50 MPI_Comm_rank(m_communicator, &m_rank);
51
52 // Set a tag
53 setTags(TAG_AUTO, TAG_AUTO, TAG_AUTO);
54}
55
60{
61 if (!m_customExchangeTag) {
62 communications::tags().trash(m_exchangeTag, m_communicator);
63 }
64
65 if (!m_customDiscoverTag) {
66 communications::tags().trash(m_discoverTag, m_communicator);
67 }
68
69 if (!m_customNotificationTag) {
70 communications::tags().trash(m_notificationTag, m_communicator);
71 }
72
73 if (!m_customExchangeTag || !m_customDiscoverTag || !m_customNotificationTag) {
74 MPI_Barrier(m_communicator);
75 }
76}
77
84{
85 int rank;
86 MPI_Comm_rank(m_communicator, &rank);
87
88 return rank;
89}
90
97{
98 int nProcs;
99 MPI_Comm_size(m_communicator, &nProcs);
100
101 return nProcs;
102}
103
109const MPI_Comm & DataCommunicator::getCommunicator() const
110{
111 return m_communicator;
112}
113
124void DataCommunicator::finalize(bool synchronous)
125{
126 // Cancels all sends
127 cancelAllSends(synchronous);
128
129 // Cancels all receives
130 cancelAllRecvs(synchronous);
131}
132
141void DataCommunicator::setTag(int exchangeTag)
142{
143 setExchangeTag(exchangeTag);
144}
145
156void DataCommunicator::setTags(int exchangeTag, int discoverTag, int notificationTag)
157{
158 setExchangeTag(exchangeTag);
159 setDiscoverTag(discoverTag);
160 setNotificationTag(notificationTag);
161}
162
172{
173 m_customExchangeTag = (tag != TAG_AUTO);
174 if (m_customExchangeTag) {
175 m_exchangeTag = tag;
176 } else {
177 m_exchangeTag = communications::tags().generate(m_communicator);
178 }
179}
180
190{
191 m_customDiscoverTag = (tag != TAG_AUTO);
192 if (m_customDiscoverTag) {
193 m_discoverTag = tag;
194 } else {
195 m_discoverTag = communications::tags().generate(m_communicator);
196 }
197}
198
208{
209 m_customNotificationTag = (tag != TAG_AUTO);
210 if (m_customNotificationTag) {
211 m_notificationTag = tag;
212 } else {
213 m_notificationTag = communications::tags().generate(m_communicator);
214 }
215}
216
223{
224 return getExchangeTag();
225}
226
233{
234 return m_exchangeTag;
235}
236
243{
244 return m_discoverTag;
245}
246
253{
254 return m_notificationTag;
255}
256
269{
270 if (m_recvsContinuous == enabled) {
271 return;
272 }
273
274 cancelAllRecvs(true);
275
276 int nRecvBuffers = m_recvBuffers.size();
277 for (int k = 0; k < nRecvBuffers; ++k) {
278 size_t size = m_recvBuffers[k].getSize();
279 m_recvBuffers[k] = RecvBuffer(size, enabled);
280 }
281
282 m_recvsContinuous = enabled;
283
285}
286
294{
295 return m_recvsContinuous;
296}
297
309{
310 // Cancel current sends
312
313 // Send the data sizes with a synchronous send
314 int nRecvs = getRecvCount();
315
316 std::vector<long> discoverSizes(nRecvs, 0);
317 std::vector<MPI_Request> discoverRequests(nRecvs, MPI_REQUEST_NULL);
318 for (int i = 0; i < nRecvs; ++i) {
319 int rank = m_recvRanks[i];
320 RecvBuffer &buffer = m_recvBuffers[i];
321 discoverSizes[i] = buffer.getSize();
322 MPI_Issend(discoverSizes.data() + i, 1, MPI_LONG, rank, m_discoverTag, m_communicator, discoverRequests.data() + i);
323 }
324
325 // Receive the data sizes and set the sends
326 int localDiscoverSendsCompleted = 0;
327 MPI_Request exchangeCompletedRequest;
328 while (true) {
329 // If there are messagea available receive them and set the sends
330 int messageAvailable = 1;
331 while (messageAvailable) {
332 MPI_Status status;
333 MPI_Iprobe(MPI_ANY_SOURCE, m_discoverTag, m_communicator, &messageAvailable, &status);
334 if (messageAvailable) {
335 long dataSize;
336 MPI_Recv(&dataSize, 1, MPI_LONG, status.MPI_SOURCE, m_discoverTag, m_communicator, MPI_STATUS_IGNORE);
337 setSend(status.MPI_SOURCE, dataSize);
338 }
339 }
340
341 // If all the sends are complete notify it
342 if (!localDiscoverSendsCompleted) {
343 MPI_Testall(discoverRequests.size(), discoverRequests.data(), &localDiscoverSendsCompleted, MPI_STATUSES_IGNORE);
344 if (localDiscoverSendsCompleted) {
345 MPI_Ibarrier(m_communicator, &exchangeCompletedRequest);
346 }
347 }
348
349 // If all sends are completed, check if also the other processes have
350 // completed the sends. Sice these are synchronous sends, they will
351 // be makred as completed only when the corresponding receive has
352 // completed. When all processes have completed the send/recevies
353 // all sizes have been exchanged.
354 if (localDiscoverSendsCompleted) {
355 int exchangeCompleted = 0;
356 MPI_Test(&exchangeCompletedRequest, &exchangeCompleted, MPI_STATUS_IGNORE);
357 if (exchangeCompleted) {
358 break;
359 }
360 }
361 }
362
363 // At this point we are sure that all the processes have sent the data
364 // sizes and the data sizes have reached the destination, however it is
365 // not guaranteed that the data sizes have already been received (the
366 // message is on the destination process, but we don't know if it has
367 // been processed and received). Consequently, we can't be sure that all
368 // sends have been set.
369 //
370 // For the above reason it is not possible to call this function multiple
371 // times without receiving the messages or changes the exchanges between
372 // one call an another. Nor it is possible to call a discover recives
373 // followed by a discover sends (or viceversa) without receviing the
374 // messages or canceling them in between the two calls.
375}
376
388{
389 // Cancel current receives
391
392 // Send the data sizes with a synchronous send
393 int nSends = getSendCount();
394
395 std::vector<long> discoverSizes(nSends, 0);
396 std::vector<MPI_Request> discoverRequests(nSends, MPI_REQUEST_NULL);
397 for (int i = 0; i < nSends; ++i) {
398 int rank = m_sendRanks[i];
399 SendBuffer &buffer = m_sendBuffers[i];
400 discoverSizes[i] = buffer.getSize();
401 MPI_Issend(discoverSizes.data() + i, 1, MPI_LONG, rank, m_discoverTag, m_communicator, discoverRequests.data() + i);
402 }
403
404 // Receive the data sizes and set the receives
405 int localDiscoverSendsCompleted = 0;
406 MPI_Request exchangeCompletedRequest;
407 while (true) {
408 // If there are messagea available receive them and set the receives
409 int messageAvailable = 1;
410 while (messageAvailable) {
411 MPI_Status status;
412 MPI_Iprobe(MPI_ANY_SOURCE, m_discoverTag, m_communicator, &messageAvailable, &status);
413 if (messageAvailable) {
414 long dataSize;
415 MPI_Recv(&dataSize, 1, MPI_LONG, status.MPI_SOURCE, m_discoverTag, m_communicator, MPI_STATUS_IGNORE);
416 setRecv(status.MPI_SOURCE, dataSize);
417 }
418 }
419
420 // If all the sends are complete notify it
421 if (!localDiscoverSendsCompleted) {
422 MPI_Testall(discoverRequests.size(), discoverRequests.data(), &localDiscoverSendsCompleted, MPI_STATUSES_IGNORE);
423 if (localDiscoverSendsCompleted) {
424 MPI_Ibarrier(m_communicator, &exchangeCompletedRequest);
425 }
426 }
427
428 // If all sends are completed, check if also the other processes have
429 // completed the sends. Sice these are synchronous sends, they will
430 // be makred as completed only when the corresponding receive has
431 // completed. When all processes have completed the send/recevies
432 // all sizes have been exchanged.
433 if (localDiscoverSendsCompleted) {
434 int exchangeCompleted = 0;
435 MPI_Test(&exchangeCompletedRequest, &exchangeCompleted, MPI_STATUS_IGNORE);
436 if (exchangeCompleted) {
437 break;
438 }
439 }
440 }
441
442 // At this point we are sure that all the processes have sent the data
443 // sizes and the data sizes have reached the destination, however it is
444 // not guaranteed that the data sizes have already been received (the
445 // message is on the destination process, but we don't know if it has
446 // been processed and received). Consequently, we can't be sure that all
447 // receives have been set.
448 //
449 // For the above reason it is not possible to call this function multiple
450 // times without receiving the messages or changes the exchanges between
451 // one call an another. Nor it is possible to call a discover recives
452 // followed by a discover sends (or viceversa) without receviing the
453 // messages or canceling them in between the two calls.
454}
455
462{
463 if (m_sendIds.count(rank) == 0) {
464 return;
465 }
466
467 // Cancel waiting send
468 cancelSend(rank);
469
470 // Remove the send associated to the specified rank
471 int id = m_sendIds[rank];
472 m_sendIds.erase(rank);
473 for (auto &entry : m_sendIds) {
474 if (entry.second > id) {
475 entry.second--;
476 }
477 }
478
479 m_sendRanks.erase(m_sendRanks.begin() + id);
480 m_sendRequests.erase(m_sendRequests.begin() + id);
481 m_sendBuffers.erase(m_sendBuffers.begin() + id);
482}
483
490{
491 if (m_recvIds.count(rank) == 0) {
492 return;
493 }
494
495 // Cancel waiting recv
496 cancelRecv(rank);
497
498 // Remove the recv associated to the specified rank
499 int id = m_recvIds[rank];
500 m_recvIds.erase(rank);
501 for (auto &entry : m_recvIds) {
502 if (entry.second > id) {
503 entry.second--;
504 }
505 }
506
507 m_recvRanks.erase(m_recvRanks.begin() + id);
508 m_recvRequests.erase(m_recvRequests.begin() + id);
509 m_recvBuffers.erase(m_recvBuffers.begin() + id);
510}
511
523{
524 // Cancel waiting sends
525 cancelAllSends(synchronous);
526
527 // Clear the sends
528 m_sendRanks.clear();
529 m_sendIds.clear();
530 m_sendRequests.clear();
531 m_sendBuffers.clear();
532}
533
545{
546 // Cancel waiting receives
547 cancelAllRecvs(synchronous);
548
549 // Clear data associated to the recevies
550 m_recvRanks.clear();
551 m_recvIds.clear();
552 m_recvRequests.clear();
553 m_recvBuffers.clear();
554}
555
562void DataCommunicator::setSend(int rank, long length)
563{
564 // Clear the send associated to the rank
565 clearSend(rank);
566
567 // Set send info
568 int id = m_sendIds.size();
569 m_sendIds[rank] = id;
570
571 m_sendRanks.push_back(rank);
572 m_sendRequests.push_back(MPI_REQUEST_NULL);
573 m_sendBuffers.emplace_back(length);
574}
575
582void DataCommunicator::setRecv(int rank, long length)
583{
584 // Clear the recv associated to the rank
585 clearRecv(rank);
586
587 // Set recv info
588 int id = m_recvIds.size();
589 m_recvIds[rank] = id;
590
591 m_recvRanks.push_back(rank);
592 m_recvRequests.push_back(MPI_REQUEST_NULL);
593 m_recvBuffers.emplace_back(length, m_recvsContinuous);
594
595 // If the receives are continous start the receive
596 if (areRecvsContinuous()) {
597 _startRecv(rank);
598 }
599}
600
607void DataCommunicator::resizeSend(int rank, long size)
608{
609 // If there is no send associate to the specified rank we have to set
610 // a new send from scratch
611 if (m_sendIds.count(rank) == 0) {
612 setSend(rank, size);
613 return;
614 }
615
616 // Cancel the send associated to the process
617 cancelSend(rank);
618
619 // Resize the buffer
620 int id = m_sendIds[rank];
621 m_sendBuffers[id].setSize(size);
622}
623
630void DataCommunicator::resizeRecv(int rank, long size)
631{
632 // If there is no receive associate to the specified rank we have to set
633 // a new receive from scratch
634 if (m_recvIds.count(rank) == 0) {
635 setRecv(rank, size);
636 return;
637 }
638
639 // Cancel the receive associated to the process
640 cancelRecv(rank);
641
642 // Resize the buffer
643 int id = m_recvIds[rank];
644 m_recvBuffers[id].setSize(size);
645}
646
653{
654 return m_sendBuffers.size();
655}
656
663{
664 return m_recvBuffers.size();
665}
666
673const std::vector<int> & DataCommunicator::getSendRanks() const
674{
675 return m_sendRanks;
676}
677
685const std::vector<int> & DataCommunicator::getRecvRanks() const
686{
687 return m_recvRanks;
688}
689
697{
698 int id = m_sendIds.at(rank);
699
700 return m_sendBuffers[id];
701}
702
710{
711 int id = m_recvIds.at(rank);
712
713 return m_recvBuffers[id];
714}
715
722{
723 // Wait for the previous send to finish
724 waitSend(dstRank);
725
726 // If the buffer is a double buffer, swap it
727 int id = m_sendIds.at(dstRank);
728 SendBuffer &sendBuffer = m_sendBuffers[id];
729 if (sendBuffer.isDouble()) {
730 sendBuffer.swap();
731 }
732
733 // Start the send
734 _startSend(dstRank);
735}
736
741{
742 for (int rank : m_sendRanks) {
743 startSend(rank);
744 }
745}
746
752void DataCommunicator::_startSend(int dstRank)
753{
754 // Get the buffer
755 int id = m_sendIds.at(dstRank);
756 SendBuffer &sendBuffer = m_sendBuffers[id];
757 OBinaryStream &buffer = sendBuffer.getBack();
758
759 // Start the send
760 int chunkSize = buffer.getChunkSize();
761 MPI_Datatype chunkDataType = getChunkDataType(chunkSize);
762
763 MPI_Isend(buffer.data(), buffer.getChunkCount(), chunkDataType, dstRank, m_exchangeTag,
764 m_communicator, &m_sendRequests[id]);
765}
766
773{
774 // Wait for the previous receive to finish
775 waitRecv(srcRank);
776
777 // Start the recevier
778 _startRecv(srcRank);
779}
780
785{
786 for (int rank : m_recvRanks) {
787 startRecv(rank);
788 }
789}
790
796void DataCommunicator::_startRecv(int srcRank)
797{
798 // Reset the position of the buffer
799 int id = m_recvIds.at(srcRank);
800 IBinaryStream &buffer = m_recvBuffers[id].getBack();
801 buffer.seekg(0);
802
803 // Start the receive
804 int chunkSize = buffer.getChunkSize();
805 MPI_Datatype chunkDataType = getChunkDataType(chunkSize);
806
807 MPI_Irecv(buffer.data(), buffer.getChunkCount(), chunkDataType, srcRank, m_exchangeTag,
808 m_communicator, &m_recvRequests[id]);
809}
810
821int DataCommunicator::waitAnySend(const std::vector<int> &blackList)
822{
823 // Exclude blackListed ranks
824 std::vector<MPI_Request> requestList(m_sendRequests);
825 for (const int rank : blackList) {
826 int id = m_sendIds.at(rank);
827 requestList[id] = MPI_REQUEST_NULL;
828 }
829
830 // Wait for a send to complete
831 int id;
832 MPI_Waitany(requestList.size(), requestList.data(), &id, MPI_STATUS_IGNORE);
833 if (id == MPI_UNDEFINED) {
834 return MPI_UNDEFINED;
835 }
836
837 m_sendRequests[id] = requestList[id];
838
839 // Reset the position of the buffer
840 m_sendBuffers[id].seekg(0);
841
842 // Return the rank associated to the completed send
843 return m_sendRanks[id];
844}
845
852{
853 // Wait for the send to complete
854 int id = m_sendIds.at(rank);
855 auto request = m_sendRequests[id];
856 if (request == MPI_REQUEST_NULL) {
857 return;
858 }
859
860 MPI_Wait(&m_sendRequests[id], MPI_STATUS_IGNORE);
861
862 // Reset the position of the buffer
863 m_sendBuffers[id].seekg(0);
864}
865
870{
871 if (m_sendRequests.size() == 0) {
872 return;
873 }
874
875 // Wait for all sends to complete
876 MPI_Waitall(m_sendRequests.size(), m_sendRequests.data(), MPI_STATUS_IGNORE);
877
878 // Reset the position of the buffers
879 for (auto &buffer : m_sendBuffers) {
880 buffer.seekg(0);
881 }
882}
883
894int DataCommunicator::waitAnyRecv(const std::vector<int> &blackList)
895{
896 // Exclude blackListed ranks
897 std::vector<MPI_Request> requestList(m_recvRequests);
898 for (const int rank : blackList) {
899 int id = m_recvIds.at(rank);
900 requestList[id] = MPI_REQUEST_NULL;
901 }
902
903 // Wait for a receive to complete
904 int id;
905 MPI_Waitany(requestList.size(), requestList.data(), &id, MPI_STATUS_IGNORE);
906 if (id == MPI_UNDEFINED) {
907 return MPI_UNDEFINED;
908 }
909
910 m_recvRequests[id] = requestList[id];
911
912 // If the buffer is a double buffer, swap it
913 RecvBuffer &recvBuffer = m_recvBuffers[id];
914 if (recvBuffer.isDouble()) {
915 recvBuffer.swap();
916 }
917
918 // Rank of the request
919 int rank = m_recvRanks[id];
920
921 // Restart the recevie
922 if (areRecvsContinuous()) {
923 _startRecv(rank);
924 }
925
926 // Return the rank associated to the completed receive
927 return rank;
928}
929
936{
937 // Wait for the receive to complete
938 int id = m_recvIds.at(rank);
939 auto request = m_recvRequests[id];
940 if (request == MPI_REQUEST_NULL) {
941 return;
942 }
943
944 MPI_Wait(&m_recvRequests[id], MPI_STATUS_IGNORE);
945
946 // If the buffer is a double buffer, swap it
947 RecvBuffer &recvBuffer = m_recvBuffers[id];
948 if (recvBuffer.isDouble()) {
949 recvBuffer.swap();
950 }
951
952 // Restart the recevie
953 if (areRecvsContinuous()) {
954 _startRecv(rank);
955 }
956}
957
962{
963 if (m_recvRequests.size() == 0) {
964 return;
965 }
966
967 // Wait for all the receives to complete
968 MPI_Waitall(m_recvRequests.size(), m_recvRequests.data(), MPI_STATUS_IGNORE);
969
970 // Swap double buffers
971 for (RecvBuffer &buffer : m_recvBuffers) {
972 if (buffer.isDouble()) {
973 buffer.swap();
974 }
975 }
976
977 // Restart all the receives
978 if (areRecvsContinuous()) {
979 for (int rank : m_recvRanks) {
980 _startRecv(rank);
981 }
982 }
983}
984
994{
995 int id = m_sendIds[rank];
996
997 return (m_sendRequests[id] != MPI_REQUEST_NULL);
998}
999
1009{
1010 int id = m_recvIds[rank];
1011
1012 return (m_recvRequests[id] != MPI_REQUEST_NULL);
1013}
1014
1021{
1022 if (m_sendIds.count(rank) == 0) {
1023 return;
1024 }
1025
1026 int id = m_sendIds[rank];
1027 if (m_sendRequests[id] == MPI_REQUEST_NULL) {
1028 return;
1029 }
1030
1031 MPI_Cancel(&m_sendRequests[id]);
1032 MPI_Request_free(&m_sendRequests[id]);
1033}
1034
1041{
1042 if (m_recvIds.count(rank) == 0) {
1043 return;
1044 }
1045
1046 int id = m_recvIds[rank];
1047 if (m_recvRequests[id] == MPI_REQUEST_NULL) {
1048 return;
1049 }
1050
1051 MPI_Cancel(&m_recvRequests[id]);
1052 MPI_Request_free(&m_recvRequests[id]);
1053}
1054
1067{
1068 // Cancel all the sends
1069 for (int rank : m_sendRanks) {
1070 cancelSend(rank);
1071 }
1072
1073 // Early return if no synchronization is needed
1074 if (!synchronous) {
1075 return;
1076 }
1077
1078 // Notify that the sends have been canceled
1079 //
1080 // The tag of the notifications has to be different from the tag used for
1081 // data size discovery. We may have a scenario like the following:
1082 //
1083 // discoverRecvs(); # <-- This call is using the discover tag
1084 //
1085 // startAllRecvs();
1086 //
1087 // <User fill send buffers>
1088 //
1089 // startAllSends();
1090 //
1091 // waitAllRecvs();
1092 //
1093 // waitAllSends();
1094 //
1095 // cancelAllSends(true); # <-- This call is using the notification tag
1096 //
1097 // cancelAllrecvs(true); # <-- This call is using the notification tag
1098 //
1099 // If a process completes all its exchanges, it may reach the function
1100 // that cancels the sends while other processes are still discovering
1101 // the receives. Thats's because data size discovery is a non-blocking
1102 // operation. At the end of data size discovery we know that the data
1103 // sizes for the recevies has reached the destination processes, but it
1104 // not guaranteed that the messages have been processed. What can happend
1105 // is that a process can exit from the function while other processes are
1106 // still receving/processing the recevied messages (so they are still
1107 // setting the receives). The notification issued when canceling the
1108 // sends may than be catched by the processes that are still discovering
1109 // the receives. To avoid this discover tags and notification tags have
1110 // to be different.
1111
1112 // Start auxiliary receives for handling synchronization
1113 //
1114 // We will receive a notification from all the processes for which a
1115 // receive has been set. A notification equal to one means that the
1116 // send request on that process that matches the receive of this rank has
1117 // been successfully cancelled.
1118 int nRecvNotifications = getRecvCount();
1119 std::vector<MPI_Request> recvNotificationRequests(nRecvNotifications);
1120 std::vector<int> remoteCancelCompleted(nRecvNotifications, 0);
1121 for (int i = 0; i < nRecvNotifications; ++i) {
1122 int rank = m_recvRanks[i];
1123 MPI_Irecv(remoteCancelCompleted.data() + i, 1, MPI_INT, rank, m_notificationTag, m_communicator, &recvNotificationRequests[i]);
1124 }
1125
1126 // Notify that the sends have been canceled
1127 //
1128 // We will send a notification to all processes for wich a send has been
1129 // set. A notification equal to one means that the send request was
1130 // successfully cancelled.
1131 int nSendNotifications = getSendCount();
1132 std::vector<MPI_Request> sendNotificationRequests(nSendNotifications);
1133 std::vector<int> cancelCompleted(nSendNotifications, 1);
1134 for (int i = 0; i < nSendNotifications; ++i) {
1135 int rank = m_sendRanks[i];
1136 MPI_Isend(cancelCompleted.data() + i, 1, MPI_INT, rank, m_notificationTag, m_communicator, &sendNotificationRequests[i]);
1137 }
1138
1139 // Synchronize with other processes
1140 //
1141 // Wait until all processes involved in data exchanging cancel their
1142 // send requests.
1143 MPI_Waitall(nRecvNotifications, recvNotificationRequests.data(), MPI_STATUS_IGNORE);
1144 for (int i = 0; i < nRecvNotifications; ++i) {
1145 if (!remoteCancelCompleted[i]) {
1146 log::cout() << "Unable to properly cancel the sends.";
1147 MPI_Abort(m_communicator, 1);
1148 }
1149 }
1150
1151 // Wait until the notifications have been sent
1152 MPI_Waitall(nSendNotifications, sendNotificationRequests.data(), MPI_STATUS_IGNORE);
1153
1154#if BITPIT_ENABLE_DEBUG
1155 // Wait until all processes cancel the sends
1156 MPI_Barrier(m_communicator);
1157#endif
1158}
1159
1172{
1173 // Cancel all the receives
1174 for (int rank : m_recvRanks) {
1175 cancelRecv(rank);
1176 }
1177
1178 // Early return if no synchronization is needed
1179 if (!synchronous) {
1180 return;
1181 }
1182
1183 // Notify that the sends have been canceled
1184 //
1185 // The tag of the notifications has to be different from the tag used for
1186 // data size discovery. See comment in 'cancelAllSends' function for an
1187 // explanation.
1188
1189 // Start auxiliary receives for handling synchronization
1190 //
1191 // We will receive a notification from all the processes for which a
1192 // send has been set. A notification equal to one means that the receive
1193 // request on that process that matches the send of this rank has
1194 // been successfully cancelled.
1195 int nRecvNotifications = getSendCount();
1196 std::vector<MPI_Request> recvNotificationRequests(nRecvNotifications);
1197 std::vector<int> remoteCancelCompleted(nRecvNotifications, 0);
1198 for (int i = 0; i < nRecvNotifications; ++i) {
1199 int rank = m_sendRanks[i];
1200 MPI_Irecv(remoteCancelCompleted.data() + i, 1, MPI_INT, rank, m_notificationTag, m_communicator, &recvNotificationRequests[i]);
1201 }
1202
1203 // Notify that the receives have been canceled
1204 //
1205 // We will send a notification to all processes for wich a receive has been
1206 // set. A notification equal to one means that the receive request was
1207 // successfully cancelled.
1208 int nSendNotifications = getRecvCount();
1209 std::vector<MPI_Request> sendNotificationRequests(nSendNotifications);
1210 std::vector<int> cancelCompleted(nSendNotifications, 1);
1211 for (int i = 0; i < nSendNotifications; ++i) {
1212 int rank = m_recvRanks[i];
1213 MPI_Isend(cancelCompleted.data() + i, 1, MPI_INT, rank, m_notificationTag, m_communicator, &sendNotificationRequests[i]);
1214 }
1215
1216 // Synchronize with other processes
1217 //
1218 // Wait until all processes involved in data exchanging cancel their
1219 // receive requests.
1220 MPI_Waitall(nRecvNotifications, recvNotificationRequests.data(), MPI_STATUS_IGNORE);
1221 for (int i = 0; i < nRecvNotifications; ++i) {
1222 if (!remoteCancelCompleted[i]) {
1223 log::cout() << "Unable to properly cancel the receives.";
1224 MPI_Abort(m_communicator, 1);
1225 }
1226 }
1227
1228 // Wait until the notifications have been sent
1229 MPI_Waitall(nSendNotifications, sendNotificationRequests.data(), MPI_STATUS_IGNORE);
1230
1231#if BITPIT_ENABLE_DEBUG
1232 // Wait until all processes cancel the receives
1233 MPI_Barrier(m_communicator);
1234#endif
1235}
1236
1243MPI_Datatype DataCommunicator::getChunkDataType(int chunkSize) const
1244{
1245 MPI_Datatype chunkDataType;
1246 if (chunkSize == 1) {
1247 chunkDataType = MPI_CHAR;
1248 } else {
1249 MPI_Type_contiguous(chunkSize, MPI_CHAR, &chunkDataType);
1250 MPI_Type_commit(&chunkDataType);
1251 }
1252
1253 return chunkDataType;
1254}
1255
1256}
1257
1258#endif
bool seekg(std::size_t pos)
void trash(int tag, MPI_Comm communicator)
int generate(MPI_Comm communicator)
int waitAnySend(const std::vector< int > &blackList=std::vector< int >())
void clearAllSends(bool synchronous=false)
void finalize(bool synchronous=false)
void setRecv(int rank, long length=0)
const MPI_Comm & getCommunicator() const
void startRecv(int srcRank)
SendBuffer & getSendBuffer(int rank)
const std::vector< int > & getRecvRanks() const
void resizeSend(int rank, long resize)
void setTags(int exchangeTag, int discoverTag, int notificationTag)
const std::vector< int > & getSendRanks() const
int waitAnyRecv(const std::vector< int > &blackList=std::vector< int >())
DataCommunicator(MPI_Comm communicator)
void resizeRecv(int rank, long resize)
void setSend(int rank, long length=0)
void cancelAllRecvs(bool synchronous=false)
void cancelAllSends(bool synchronous=false)
void setTag(int exchangeTag)
void clearAllRecvs(bool synchronous=false)
void setRecvsContinuous(bool enabled)
RecvBuffer & getRecvBuffer(int rank)
void startSend(int dstRank)
Output binary stream.
Output binary stream.
Buffer to be used for receive communications.
Buffer to be used for send communications.
CommunicationTags & tags()
Logger & cout(log::Level defaultSeverity, log::Visibility defaultVisibility)
Definition logger.cpp:1705
--- layout: doxygen_footer ---