00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029 #include "mpi.h"
00030
00031 #include "mpiIPstreamImpl.H"
00032 #include "mpiPstreamGlobals.H"
00033
00034 #include <OpenFOAM/addToRunTimeSelectionTable.H>
00035 #include <OpenFOAM/Pstream.H>
00036
00037
00038
00039
00040
00041 namespace Foam
00042 {
00043
00044 defineTypeNameAndDebug(mpiIPstreamImpl, 0);
00045 addToRunTimeSelectionTable(IPstreamImpl, mpiIPstreamImpl, dictionary);
00046
00047 }
00048
00049
00050
00051
00052 void Foam::mpiIPstreamImpl::init
00053 (
00054 const PstreamImpl::commsTypes commsType,
00055 const label bufSize,
00056 int& fromProcNo,
00057 label& messageSize,
00058 List<char>& buf
00059 )
00060 {
00061 MPI_Status status;
00062
00063
00064
00065 if (!bufSize)
00066 {
00067 MPI_Probe(Pstream::procID(fromProcNo), Pstream::msgType(), MPI_COMM_WORLD, &status);
00068 MPI_Get_count(&status, MPI_BYTE, &messageSize);
00069
00070 buf.setSize(messageSize);
00071 }
00072
00073 messageSize = read(commsType, fromProcNo, buf.begin(), buf.size());
00074
00075 if (!messageSize)
00076 {
00077 FatalErrorIn
00078 (
00079 "mpiIPstreamImpl::mpiIPstreamImpl(const commsTypes commsType, const label bufSize, "
00080 "const int fromProcNo, label& messageSize, List<char>& buf)"
00081 ) << "read failed"
00082 << Foam::abort(FatalError);
00083 }
00084 }
00085
00086
00087 Foam::label Foam::mpiIPstreamImpl::read
00088 (
00089 const PstreamImpl::commsTypes commsType,
00090 const int fromProcNo,
00091 char* buf,
00092 const std::streamsize bufSize
00093 )
00094 {
00095 if (commsType == PstreamImpl::blocking || commsType == PstreamImpl::scheduled)
00096 {
00097 MPI_Status status;
00098
00099 if
00100 (
00101 MPI_Recv
00102 (
00103 buf,
00104 bufSize,
00105 MPI_PACKED,
00106 Pstream::procID(fromProcNo),
00107 Pstream::msgType(),
00108 MPI_COMM_WORLD,
00109 &status
00110 )
00111 )
00112 {
00113 FatalErrorIn
00114 (
00115 "mpiIPstreamImpl::read"
00116 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
00117 ) << "MPI_Recv cannot receive incoming message"
00118 << Foam::abort(FatalError);
00119
00120 return 0;
00121 }
00122
00123
00124
00125
00126 label messageSize;
00127 MPI_Get_count(&status, MPI_BYTE, &messageSize);
00128
00129 if (messageSize > bufSize)
00130 {
00131 FatalErrorIn
00132 (
00133 "mpiIPstreamImpl::read"
00134 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
00135 ) << "buffer (" << label(bufSize)
00136 << ") not large enough for incoming message ("
00137 << messageSize << ')'
00138 << Foam::abort(FatalError);
00139 }
00140
00141 return messageSize;
00142 }
00143 else if (commsType == PstreamImpl::nonBlocking)
00144 {
00145 MPI_Request request;
00146
00147 if
00148 (
00149 MPI_Irecv
00150 (
00151 buf,
00152 bufSize,
00153 MPI_PACKED,
00154 Pstream::procID(fromProcNo),
00155 Pstream::msgType(),
00156 MPI_COMM_WORLD,
00157 &request
00158 )
00159 )
00160 {
00161 FatalErrorIn
00162 (
00163 "mpiIPstreamImpl::read"
00164 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
00165 ) << "MPI_Recv cannot start non-blocking receive"
00166 << Foam::abort(FatalError);
00167
00168 return 0;
00169 }
00170
00171 PstreamGlobals::IPstream_outstandingRequests_.append(request);
00172
00173 return 1;
00174 }
00175 else
00176 {
00177 FatalErrorIn
00178 (
00179 "mpiIPstreamImpl::read"
00180 "(const int fromProcNo, char* buf, std::streamsize bufSize)"
00181 ) << "Unsupported communications type " << commsType
00182 << Foam::abort(FatalError);
00183
00184 return 0;
00185 }
00186 }
00187
00188
00189 void Foam::mpiIPstreamImpl::waitRequests()
00190 {
00191 if (PstreamGlobals::IPstream_outstandingRequests_.size())
00192 {
00193 if
00194 (
00195 MPI_Waitall
00196 (
00197 PstreamGlobals::IPstream_outstandingRequests_.size(),
00198 PstreamGlobals::IPstream_outstandingRequests_.begin(),
00199 MPI_STATUSES_IGNORE
00200 )
00201 )
00202 {
00203 FatalErrorIn
00204 (
00205 "mpiIPstreamImpl::waitRequests()"
00206 ) << "MPI_Waitall returned with error" << endl;
00207 }
00208
00209 PstreamGlobals::IPstream_outstandingRequests_.clear();
00210 }
00211 }
00212
00213
00214 bool Foam::mpiIPstreamImpl::finishedRequest(const label i)
00215 {
00216 if (i >= PstreamGlobals::IPstream_outstandingRequests_.size())
00217 {
00218 FatalErrorIn
00219 (
00220 "mpiIPstreamImpl::finishedRequest(const label)"
00221 ) << "There are "
00222 << PstreamGlobals::IPstream_outstandingRequests_.size()
00223 << " outstanding send requests and you are asking for i=" << i
00224 << nl
00225 << "Maybe you are mixing blocking/non-blocking comms?"
00226 << Foam::abort(FatalError);
00227 }
00228
00229 int flag;
00230 MPI_Test
00231 (
00232 &PstreamGlobals::IPstream_outstandingRequests_[i],
00233 &flag,
00234 MPI_STATUS_IGNORE
00235 );
00236
00237 return flag != 0;
00238 }
00239
00240
00241
00242
00243