Line data Source code
1 : /**
2 : * \file dart_communication.c
3 : *
4 : * Implementations of all the dart communication operations.
5 : *
6 : * All the following functions are implemented with the underling *MPI-3*
7 : * one-sided runtime system.
8 : */
9 : #include <dash/dart/if/dart_types.h>
10 : #include <dash/dart/if/dart_initialization.h>
11 : #include <dash/dart/if/dart_globmem.h>
12 : #include <dash/dart/if/dart_team_group.h>
13 : #include <dash/dart/if/dart_communication.h>
14 : #include <dash/dart/mpi/dart_communication_priv.h>
15 : #include <dash/dart/mpi/dart_translation.h>
16 : #include <dash/dart/mpi/dart_team_private.h>
17 : #include <dash/dart/mpi/dart_mem.h>
18 : #include <dash/dart/mpi/dart_mpi_util.h>
19 :
20 : #include <dash/dart/base/logging.h>
21 : #include <dash/dart/base/math.h>
22 :
23 : #include <stdio.h>
24 : #include <mpi.h>
25 : #include <string.h>
26 : #include <limits.h>
27 : #include <math.h>
28 :
29 :
30 12794519 : int unit_g2l(
31 : uint16_t index,
32 : dart_unit_t abs_id,
33 : dart_unit_t * rel_id)
34 : {
35 12794519 : if (index == 0) {
36 12794519 : *rel_id = abs_id;
37 : }
38 : else {
39 : MPI_Comm comm;
40 : MPI_Group group, group_all;
41 0 : comm = dart_teams[index];
42 0 : MPI_Comm_group(comm, &group);
43 0 : MPI_Comm_group(MPI_COMM_WORLD, &group_all);
44 0 : MPI_Group_translate_ranks (group_all, 1, &abs_id, group, rel_id);
45 : }
46 12794519 : return 0;
47 : }
48 :
49 1321 : dart_ret_t dart_get(
50 : void * dest,
51 : dart_gptr_t gptr,
52 : size_t nbytes)
53 : {
54 : MPI_Aint disp_s,
55 : disp_rel;
56 : MPI_Win win;
57 1321 : dart_unit_t target_unitid_abs = gptr.unitid;
58 1321 : dart_unit_t target_unitid_rel = target_unitid_abs;
59 1321 : uint64_t offset = gptr.addr_or_offs.offset;
60 1321 : int16_t seg_id = gptr.segid;
61 1321 : uint16_t index = gptr.flags;
62 :
63 : /*
64 : * MPI uses offset type int, do not copy more than INT_MAX elements:
65 : */
66 1321 : if (nbytes > INT_MAX) {
67 0 : DART_LOG_ERROR("dart_get ! failed: nbytes > INT_MAX");
68 0 : return DART_ERR_INVAL;
69 : }
70 1321 : if (seg_id) {
71 1321 : unit_g2l(index, target_unitid_abs, &target_unitid_rel);
72 : }
73 :
74 : DART_LOG_DEBUG("dart_get() uid_abs:%d uid_rel:%d "
75 : "o:%"PRIu64" s:%d i:%u nbytes:%zu",
76 : target_unitid_abs, target_unitid_rel,
77 : offset, seg_id, index, nbytes);
78 :
79 : #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS)
80 : DART_LOG_DEBUG("dart_get: shared windows enabled");
81 1321 : if (seg_id >= 0) {
82 : int i;
83 : char * baseptr;
84 : /*
85 : * Use memcpy if the target is in the same node as the calling unit:
86 : */
87 1321 : i = dart_sharedmem_table[index][gptr.unitid];
88 1321 : if (i >= 0) {
89 : DART_LOG_DEBUG("dart_get: shared memory segment, seg_id:%d",
90 : seg_id);
91 1321 : if (seg_id) {
92 1321 : if (dart_adapt_transtable_get_baseptr(seg_id, i, &baseptr) == -1) {
93 0 : DART_LOG_ERROR("dart_get ! "
94 : "dart_adapt_transtable_get_baseptr failed");
95 1321 : return DART_ERR_INVAL;
96 : }
97 : } else {
98 0 : baseptr = dart_sharedmem_local_baseptr_set[i];
99 : }
100 1321 : baseptr += offset;
101 : DART_LOG_DEBUG("dart_get: memcpy %zu bytes", nbytes);
102 1321 : memcpy((char*)dest, baseptr, nbytes);
103 1321 : return DART_OK;
104 : }
105 : }
106 : #else
107 : DART_LOG_DEBUG("dart_get: shared windows disabled");
108 : #endif // !defined(DART_MPI_DISABLE_SHARED_WINDOWS)
109 : /*
110 : * MPI shared windows disabled or target and calling unit are on different
111 : * nodes, use MPI_Get:
112 : */
113 0 : if (seg_id) {
114 0 : if (dart_adapt_transtable_get_disp(
115 : seg_id,
116 : target_unitid_rel,
117 : &disp_s) == -1) {
118 0 : return DART_ERR_INVAL;
119 : }
120 0 : win = dart_win_lists[index];
121 0 : disp_rel = disp_s + offset;
122 : DART_LOG_TRACE("dart_get: nbytes:%zu "
123 : "source (coll.): win:%"PRIu64" unit:%d disp:%"PRId64" "
124 : "-> dest:%p",
125 : nbytes, (uint64_t)win, target_unitid_rel, disp_rel, dest);
126 : } else {
127 0 : win = dart_win_local_alloc;
128 0 : disp_rel = offset;
129 : DART_LOG_TRACE("dart_get: nbytes:%zu "
130 : "source (local): win:%"PRIu64" unit:%d disp:%"PRId64" "
131 : "-> dest:%p",
132 : nbytes, (uint64_t)win, target_unitid_rel, disp_rel, dest);
133 : }
134 : DART_LOG_TRACE("dart_get: MPI_Get");
135 0 : if (MPI_Get(dest,
136 : nbytes,
137 : MPI_BYTE,
138 : target_unitid_rel,
139 : disp_rel,
140 : nbytes,
141 : MPI_BYTE,
142 : win)
143 : != MPI_SUCCESS) {
144 0 : DART_LOG_ERROR("dart_get ! MPI_Rget failed");
145 0 : return DART_ERR_INVAL;
146 : }
147 :
148 : DART_LOG_DEBUG("dart_get > finished");
149 0 : return DART_OK;
150 : }
151 :
152 0 : dart_ret_t dart_put(
153 : dart_gptr_t gptr,
154 : const void * src,
155 : size_t nbytes)
156 : {
157 : MPI_Aint disp_s,
158 : disp_rel;
159 : MPI_Win win;
160 : dart_unit_t target_unitid_abs;
161 0 : uint64_t offset = gptr.addr_or_offs.offset;
162 0 : int16_t seg_id = gptr.segid;
163 0 : target_unitid_abs = gptr.unitid;
164 0 : if (seg_id) {
165 0 : uint16_t index = gptr.flags;
166 : dart_unit_t target_unitid_rel;
167 0 : win = dart_win_lists[index];
168 0 : unit_g2l (index, target_unitid_abs, &target_unitid_rel);
169 0 : if (dart_adapt_transtable_get_disp(
170 : seg_id,
171 : target_unitid_rel,
172 : &disp_s) == -1) {
173 0 : return DART_ERR_INVAL;
174 : }
175 0 : disp_rel = disp_s + offset;
176 0 : MPI_Put(
177 : src,
178 : nbytes,
179 : MPI_BYTE,
180 : target_unitid_rel,
181 : disp_rel,
182 : nbytes,
183 : MPI_BYTE,
184 : win);
185 : DART_LOG_DEBUG("dart_put: nbytes:%zu (from collective allocation) "
186 : "target unit: %d offset: %"PRIu64"",
187 : nbytes, target_unitid_abs, offset);
188 : } else {
189 0 : win = dart_win_local_alloc;
190 0 : MPI_Put(
191 : src,
192 : nbytes,
193 : MPI_BYTE,
194 : target_unitid_abs,
195 : offset,
196 : nbytes,
197 : MPI_BYTE,
198 : win);
199 : DART_LOG_DEBUG("dart_put: nbytes:%zu (from local allocation) "
200 : "target unit: %d offset: %"PRIu64"",
201 : nbytes, target_unitid_abs, offset);
202 : }
203 0 : return DART_OK;
204 : }
205 :
206 24 : dart_ret_t dart_accumulate(
207 : dart_gptr_t gptr,
208 : char * values,
209 : size_t nelem,
210 : dart_datatype_t dtype,
211 : dart_operation_t op,
212 : dart_team_t team)
213 : {
214 : MPI_Aint disp_s,
215 : disp_rel;
216 : MPI_Win win;
217 : MPI_Datatype mpi_dtype;
218 : MPI_Op mpi_op;
219 : dart_unit_t target_unitid_abs;
220 24 : uint64_t offset = gptr.addr_or_offs.offset;
221 24 : int16_t seg_id = gptr.segid;
222 24 : target_unitid_abs = gptr.unitid;
223 24 : mpi_dtype = dart_mpi_datatype(dtype);
224 24 : mpi_op = dart_mpi_op(op);
225 :
226 : (void)(team); // To prevent compiler warning from unused parameter.
227 :
228 : DART_LOG_DEBUG("dart_accumulate() nelem:%zu dtype:%d op:%d unit:%d",
229 : nelem, dtype, op, target_unitid_abs);
230 24 : if (seg_id) {
231 : dart_unit_t target_unitid_rel;
232 20 : uint16_t index = gptr.flags;
233 20 : win = dart_win_lists[index];
234 20 : unit_g2l(index,
235 : target_unitid_abs,
236 : &target_unitid_rel);
237 20 : if (dart_adapt_transtable_get_disp(
238 : seg_id,
239 : target_unitid_rel,
240 : &disp_s) == -1) {
241 0 : DART_LOG_ERROR("dart_accumulate ! "
242 : "dart_adapt_transtable_get_disp failed");
243 0 : return DART_ERR_INVAL;
244 : }
245 20 : disp_rel = disp_s + offset;
246 20 : MPI_Accumulate(
247 : values, // Origin address
248 : nelem, // Number of entries in buffer
249 : mpi_dtype, // Data type of each buffer entry
250 : target_unitid_rel, // Rank of target
251 : disp_rel, // Displacement from start of window to beginning
252 : // of target buffer
253 : nelem, // Number of entries in target buffer
254 : mpi_dtype, // Data type of each entry in target buffer
255 : mpi_op, // Reduce operation
256 : win);
257 : DART_LOG_TRACE("dart_accumulate: nelem:%zu (from collective allocation) "
258 : "target unit: %d offset: %"PRIu64"",
259 : nelem, target_unitid_abs, offset);
260 : } else {
261 4 : win = dart_win_local_alloc;
262 4 : MPI_Accumulate(
263 : values, // Origin address
264 : nelem, // Number of entries in buffer
265 : mpi_dtype, // Data type of each buffer entry
266 : target_unitid_abs, // Rank of target
267 : offset, // Displacement from start of window to beginning
268 : // of target buffer
269 : nelem, // Number of entries in target buffer
270 : mpi_dtype, // Data type of each entry in target buffer
271 : mpi_op, // Reduce operation
272 : win);
273 : DART_LOG_TRACE("dart_accumulate: nelem:%zu (from local allocation) "
274 : "target unit: %d offset: %"PRIu64"",
275 : nelem, target_unitid_abs, offset);
276 : }
277 : DART_LOG_DEBUG("dart_accumulate > finished");
278 24 : return DART_OK;
279 : }
280 :
281 51 : dart_ret_t dart_fetch_and_op(
282 : dart_gptr_t gptr,
283 : void * value,
284 : void * result,
285 : dart_datatype_t dtype,
286 : dart_operation_t op,
287 : dart_team_t team)
288 : {
289 : MPI_Aint disp_s,
290 : disp_rel;
291 : MPI_Win win;
292 : MPI_Datatype mpi_dtype;
293 : MPI_Op mpi_op;
294 : dart_unit_t target_unitid_abs;
295 51 : uint64_t offset = gptr.addr_or_offs.offset;
296 51 : int16_t seg_id = gptr.segid;
297 51 : target_unitid_abs = gptr.unitid;
298 51 : mpi_dtype = dart_mpi_datatype(dtype);
299 51 : mpi_op = dart_mpi_op(op);
300 :
301 : (void)(team); // To prevent compiler warning from unused parameter.
302 :
303 : DART_LOG_DEBUG("dart_fetch_and_op() dtype:%d op:%d unit:%d",
304 : dtype, op, target_unitid_abs);
305 51 : if (seg_id) {
306 : dart_unit_t target_unitid_rel;
307 47 : uint16_t index = gptr.flags;
308 47 : win = dart_win_lists[index];
309 47 : unit_g2l(index,
310 : target_unitid_abs,
311 : &target_unitid_rel);
312 47 : if (dart_adapt_transtable_get_disp(
313 : seg_id,
314 : target_unitid_rel,
315 : &disp_s) == -1) {
316 0 : DART_LOG_ERROR("dart_fetch_and_op ! "
317 : "dart_adapt_transtable_get_disp failed");
318 0 : return DART_ERR_INVAL;
319 : }
320 47 : disp_rel = disp_s + offset;
321 47 : MPI_Fetch_and_op(
322 : value, // Origin address
323 : result, // Result address
324 : mpi_dtype, // Data type of each buffer entry
325 : target_unitid_rel, // Rank of target
326 : disp_rel, // Displacement from start of window to beginning
327 : // of target buffer
328 : mpi_op, // Reduce operation
329 : win);
330 : DART_LOG_TRACE("dart_fetch_and_op: (from coll. allocation) "
331 : "target unit: %d offset: %"PRIu64"",
332 : target_unitid_abs, offset);
333 : } else {
334 4 : win = dart_win_local_alloc;
335 4 : MPI_Fetch_and_op(
336 : value, // Origin address
337 : result, // Result address
338 : mpi_dtype, // Data type of each buffer entry
339 : target_unitid_abs, // Rank of target
340 : offset, // Displacement from start of window to beginning
341 : // of target buffer
342 : mpi_op, // Reduce operation
343 : win);
344 : DART_LOG_TRACE("dart_fetch_and_op: (from local allocation) "
345 : "target unit: %d offset: %"PRIu64"",
346 : target_unitid_abs, offset);
347 : }
348 : DART_LOG_DEBUG("dart_fetch_and_op > finished");
349 51 : return DART_OK;
350 : }
351 :
352 : /* -- Non-blocking dart one-sided operations -- */
353 :
354 12 : dart_ret_t dart_get_handle(
355 : void * dest,
356 : dart_gptr_t gptr,
357 : size_t nbytes,
358 : dart_handle_t * handle)
359 : {
360 : MPI_Request mpi_req;
361 : MPI_Aint disp_s,
362 : disp_rel;
363 : MPI_Datatype mpi_type;
364 : MPI_Win win;
365 12 : dart_unit_t target_unitid_abs = gptr.unitid;
366 12 : dart_unit_t target_unitid_rel = target_unitid_abs;
367 : int mpi_ret;
368 12 : uint64_t offset = gptr.addr_or_offs.offset;
369 12 : uint16_t index = gptr.flags;
370 12 : int16_t seg_id = gptr.segid;
371 : /*
372 : * MPI uses offset type int, do not copy more than INT_MAX elements:
373 : */
374 12 : if (nbytes > INT_MAX) {
375 0 : DART_LOG_ERROR("dart_get_handle ! failed: nbytes > INT_MAX");
376 0 : return DART_ERR_INVAL;
377 : }
378 12 : int n_count = (int)(nbytes);
379 :
380 12 : mpi_type = MPI_BYTE;
381 :
382 12 : *handle = (dart_handle_t) malloc(sizeof(struct dart_handle_struct));
383 :
384 12 : if (seg_id > 0) {
385 12 : unit_g2l(index, target_unitid_abs, &target_unitid_rel);
386 : }
387 : DART_LOG_DEBUG("dart_get_handle() uid_abs:%d uid_rel:%d "
388 : "o:%"PRIu64" s:%d i:%d, nbytes:%zu",
389 : target_unitid_abs, target_unitid_rel,
390 : offset, seg_id, index, nbytes);
391 : DART_LOG_TRACE("dart_get_handle: allocated handle:%p", (void *)(*handle));
392 :
393 : #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS)
394 : DART_LOG_DEBUG("dart_get_handle: shared windows enabled");
395 12 : if (seg_id >= 0) {
396 : int i;
397 : char * baseptr;
398 : /*
399 : * Use memcpy if the target is in the same node as the calling unit:
400 : */
401 12 : i = dart_sharedmem_table[index][gptr.unitid];
402 12 : if (i >= 0) {
403 : DART_LOG_DEBUG("dart_get_handle: shared memory segment, seg_id:%d",
404 : seg_id);
405 12 : if (seg_id) {
406 12 : if (dart_adapt_transtable_get_baseptr(seg_id, i, &baseptr) == -1) {
407 0 : DART_LOG_ERROR("dart_get_handle ! "
408 : "dart_adapt_transtable_get_baseptr failed");
409 12 : return DART_ERR_INVAL;
410 : }
411 : } else {
412 0 : baseptr = dart_sharedmem_local_baseptr_set[i];
413 : }
414 12 : baseptr += offset;
415 : DART_LOG_DEBUG("dart_get_handle: memcpy %zu bytes", nbytes);
416 12 : memcpy((char*)dest, baseptr, nbytes);
417 :
418 : /*
419 : * Mark request as completed:
420 : */
421 12 : (*handle)->request = MPI_REQUEST_NULL;
422 12 : if (seg_id != 0) {
423 12 : (*handle)->dest = target_unitid_rel;
424 12 : (*handle)->win = dart_win_lists[index];
425 : } else {
426 0 : (*handle)->dest = target_unitid_abs;
427 0 : (*handle)->win = dart_win_local_alloc;
428 : }
429 12 : return DART_OK;
430 : }
431 : }
432 : #else
433 : DART_LOG_DEBUG("dart_get_handle: shared windows disabled");
434 : #endif /* !defined(DART_MPI_DISABLE_SHARED_WINDOWS) */
435 : /*
436 : * MPI shared windows disabled or target and calling unit are on different
437 : * nodes, use MPI_RGet:
438 : */
439 0 : if (seg_id != 0) {
440 : /*
441 : * The memory accessed is allocated with collective allocation.
442 : */
443 : DART_LOG_TRACE("dart_get_handle: collective, segment:%d", seg_id);
444 0 : win = dart_win_lists[index];
445 : /* Translate local unitID (relative to teamid) into global unitID
446 : * (relative to DART_TEAM_ALL).
447 : *
448 : * Note: target_unitid should not be the global unitID but rather the
449 : * local unitID relative to the team associated with the specified win
450 : * object.
451 : */
452 0 : if (dart_adapt_transtable_get_disp(
453 : seg_id,
454 : target_unitid_rel,
455 : &disp_s) == -1)
456 : {
457 0 : DART_LOG_ERROR(
458 : "dart_get_handle ! dart_adapt_transtable_get_disp failed");
459 0 : free(*handle);
460 0 : return DART_ERR_INVAL;
461 : }
462 0 : disp_rel = disp_s + offset;
463 : DART_LOG_TRACE("dart_get_handle: -- disp_s:%"PRId64" disp_rel:%"PRId64"",
464 : disp_s, disp_rel);
465 :
466 : /* TODO: Check if
467 : * MPI_Rget_accumulate(
468 : * NULL, 0, MPI_BYTE, dest, nbytes, MPI_BYTE,
469 : * target_unitid, disp_rel, nbytes, MPI_BYTE, MPI_NO_OP, win,
470 : * &mpi_req)
471 : * ... could be an better alternative?
472 : */
473 : DART_LOG_DEBUG("dart_get_handle: -- %d elements (collective allocation) "
474 : "from %d at offset %"PRIu64"",
475 : n_count, target_unitid_rel, offset);
476 : DART_LOG_DEBUG("dart_get_handle: -- MPI_Rget");
477 0 : mpi_ret = MPI_Rget(
478 : dest, // origin address
479 : n_count, // origin count
480 : mpi_type, // origin data type
481 : target_unitid_rel, // target rank
482 : disp_rel, // target disp in window
483 : n_count, // target count
484 : mpi_type, // target data type
485 : win, // window
486 : &mpi_req);
487 0 : if (mpi_ret != MPI_SUCCESS) {
488 0 : DART_LOG_ERROR("dart_get_handle ! MPI_Rget failed");
489 0 : free(*handle);
490 0 : return DART_ERR_INVAL;
491 : }
492 0 : (*handle)->dest = target_unitid_rel;
493 : } else {
494 : /*
495 : * The memory accessed is allocated with local allocation.
496 : */
497 : DART_LOG_TRACE("dart_get_handle: -- local, segment:%d", seg_id);
498 : DART_LOG_DEBUG("dart_get_handle: -- %d elements (local allocation) "
499 : "from %d at offset %"PRIu64"",
500 : n_count, target_unitid_abs, offset);
501 0 : win = dart_win_local_alloc;
502 : DART_LOG_DEBUG("dart_get_handle: -- MPI_Rget");
503 0 : mpi_ret = MPI_Rget(
504 : dest, // origin address
505 : n_count, // origin count
506 : mpi_type, // origin data type
507 : target_unitid_abs, // target rank
508 : offset, // target disp in window
509 : n_count, // target count
510 : mpi_type, // target data type
511 : win, // window
512 : &mpi_req);
513 0 : if (mpi_ret != MPI_SUCCESS) {
514 0 : DART_LOG_ERROR("dart_get_handle ! MPI_Rget failed");
515 0 : free(*handle);
516 0 : return DART_ERR_INVAL;
517 : }
518 0 : (*handle)->dest = target_unitid_abs;
519 : }
520 0 : (*handle)->request = mpi_req;
521 0 : (*handle)->win = win;
522 : DART_LOG_TRACE("dart_get_handle > handle(%p) dest:%d win:%"PRIu64" req:%ld",
523 : (void*)(*handle), (*handle)->dest,
524 : (uint64_t)win, (int64_t)mpi_req);
525 0 : return DART_OK;
526 : }
527 :
528 0 : dart_ret_t dart_put_handle(
529 : dart_gptr_t gptr,
530 : const void * src,
531 : size_t nbytes,
532 : dart_handle_t *handle)
533 : {
534 : MPI_Request mpi_req;
535 : MPI_Aint disp_s, disp_rel;
536 : dart_unit_t target_unitid_abs;
537 0 : uint64_t offset = gptr.addr_or_offs.offset;
538 0 : int16_t seg_id = gptr.segid;
539 : MPI_Win win;
540 :
541 0 : *handle = (dart_handle_t) malloc(sizeof(struct dart_handle_struct));
542 0 : target_unitid_abs = gptr.unitid;
543 :
544 0 : if (seg_id != 0) {
545 0 : uint16_t index = gptr.flags;
546 : dart_unit_t target_unitid_rel;
547 0 : win = dart_win_lists[index];
548 0 : unit_g2l (index, target_unitid_abs, &target_unitid_rel);
549 0 : if (dart_adapt_transtable_get_disp(
550 : seg_id,
551 : target_unitid_rel,
552 : &disp_s) == -1) {
553 0 : return DART_ERR_INVAL;
554 : }
555 0 : disp_rel = disp_s + offset;
556 : /**
557 : * TODO: Check if
558 : * MPI_Raccumulate(
559 : * src, nbytes, MPI_BYTE, target_unitid,
560 : * disp_rel, nbytes, MPI_BYTE,
561 : * REPLACE, win, &mpi_req)
562 : * ... could be a better alternative?
563 : */
564 : DART_LOG_DEBUG("dart_put_handle: MPI_RPut");
565 0 : MPI_Rput(
566 : src,
567 : nbytes,
568 : MPI_BYTE,
569 : target_unitid_rel,
570 : disp_rel,
571 : nbytes,
572 : MPI_BYTE,
573 : win,
574 : &mpi_req);
575 0 : (*handle) -> dest = target_unitid_rel;
576 : DART_LOG_DEBUG("dart_put_handle: nbytes:%zu "
577 : "(from collective allocation) "
578 : "target_unit:%d offset:%"PRIu64"",
579 : nbytes, target_unitid_abs, offset);
580 : } else {
581 : DART_LOG_DEBUG("dart_put_handle: MPI_RPut");
582 0 : win = dart_win_local_alloc;
583 0 : MPI_Rput(
584 : src,
585 : nbytes,
586 : MPI_BYTE,
587 : target_unitid_abs,
588 : offset,
589 : nbytes,
590 : MPI_BYTE,
591 : win,
592 : &mpi_req);
593 : DART_LOG_DEBUG("dart_put_handle: nbytes:%zu "
594 : "(from local allocation) "
595 : "target_unit:%d offset:%"PRIu64"",
596 : nbytes, target_unitid_abs, offset);
597 0 : (*handle) -> dest = target_unitid_abs;
598 : }
599 0 : (*handle) -> request = mpi_req;
600 0 : (*handle) -> win = win;
601 0 : return DART_OK;
602 : }
603 :
604 : /* -- Blocking dart one-sided operations -- */
605 :
606 : /**
607 : * TODO: Check if MPI_Get_accumulate (MPI_NO_OP) can bring better
608 : * performance?
609 : */
610 4273431 : dart_ret_t dart_put_blocking(
611 : dart_gptr_t gptr,
612 : const void * src,
613 : size_t nbytes)
614 : {
615 : MPI_Win win;
616 : MPI_Aint disp_s,
617 : disp_rel;
618 4273431 : dart_unit_t target_unitid_abs = gptr.unitid;
619 4273431 : dart_unit_t target_unitid_rel = target_unitid_abs;
620 4273431 : uint64_t offset = gptr.addr_or_offs.offset;
621 4273431 : int16_t seg_id = gptr.segid;
622 4273431 : uint16_t index = gptr.flags;
623 :
624 : /*
625 : * MPI uses offset type int, do not copy more than INT_MAX elements:
626 : */
627 4273431 : if (nbytes > INT_MAX) {
628 0 : DART_LOG_ERROR("dart_put_blocking ! failed: nbytes > INT_MAX");
629 0 : return DART_ERR_INVAL;
630 : }
631 4273431 : if (seg_id > 0) {
632 4273399 : unit_g2l(index, target_unitid_abs, &target_unitid_rel);
633 : }
634 :
635 : DART_LOG_DEBUG("dart_put_blocking() uid_abs:%d uid_rel:%d "
636 : "o:%"PRIu64" s:%d i:%d, nbytes:%zu",
637 : target_unitid_abs, target_unitid_rel,
638 : offset, seg_id, index, nbytes);
639 :
640 : #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS)
641 : DART_LOG_DEBUG("dart_put_blocking: shared windows enabled");
642 4273431 : if (seg_id >= 0) {
643 : int i;
644 : char * baseptr;
645 : /*
646 : * Use memcpy if the target is in the same node as the calling unit:
647 : * The value of i will be the target's relative ID in teamid.
648 : */
649 4273424 : i = dart_sharedmem_table[index][gptr.unitid];
650 4273424 : if (i >= 0) {
651 : DART_LOG_DEBUG("dart_put_blocking: shared memory segment, seg_id:%d",
652 : seg_id);
653 4273424 : if (seg_id) {
654 4273399 : if (dart_adapt_transtable_get_baseptr(seg_id, i, &baseptr) == -1) {
655 0 : DART_LOG_ERROR("dart_put_blocking ! "
656 : "dart_adapt_transtable_get_baseptr failed");
657 4273424 : return DART_ERR_INVAL;
658 : }
659 : } else {
660 25 : baseptr = dart_sharedmem_local_baseptr_set[i];
661 : }
662 4273424 : baseptr += offset;
663 : DART_LOG_DEBUG("dart_put_blocking: memcpy %zu bytes", nbytes);
664 4273424 : memcpy(baseptr, (char*)src, nbytes);
665 4273424 : return DART_OK;
666 : }
667 : }
668 : #else
669 : DART_LOG_DEBUG("dart_put_blocking: shared windows disabled");
670 : #endif /* !defined(DART_MPI_DISABLE_SHARED_WINDOWS) */
671 : /*
672 : * MPI shared windows disabled or target and calling unit are on different
673 : * nodes, use MPI_Rput:
674 : */
675 7 : if (seg_id) {
676 7 : if (dart_adapt_transtable_get_disp(
677 : seg_id,
678 : target_unitid_rel,
679 : &disp_s) == -1) {
680 0 : DART_LOG_ERROR("dart_put_blocking ! "
681 : "dart_adapt_transtable_get_disp failed");
682 0 : return DART_ERR_INVAL;
683 : }
684 7 : win = dart_win_lists[index];
685 7 : disp_rel = disp_s + offset;
686 : DART_LOG_DEBUG("dart_put_blocking: nbytes:%zu "
687 : "target (coll.): win:%"PRIu64" unit:%d offset:%"PRIu64" "
688 : "<- source: %p",
689 : nbytes, (uint64_t)win, target_unitid_rel,
690 : (uint64_t)disp_rel, src);
691 : } else {
692 0 : win = dart_win_local_alloc;
693 0 : disp_rel = offset;
694 : DART_LOG_DEBUG("dart_put_blocking: nbytes:%zu "
695 : "target (local): win:%"PRIu64" unit:%d offset:%"PRIu64" "
696 : "<- source: %p",
697 : nbytes, (uint64_t)win, target_unitid_rel,
698 : (uint64_t)disp_rel, src);
699 : }
700 :
701 : /*
702 : * Using MPI_Put as MPI_Win_flush is required to ensure remote completion.
703 : */
704 : DART_LOG_DEBUG("dart_put_blocking: MPI_Put");
705 7 : if (MPI_Put(src,
706 : nbytes,
707 : MPI_BYTE,
708 : target_unitid_rel,
709 : disp_rel,
710 : nbytes,
711 : MPI_BYTE,
712 : win)
713 : != MPI_SUCCESS) {
714 0 : DART_LOG_ERROR("dart_put_blocking ! MPI_Put failed");
715 0 : return DART_ERR_INVAL;
716 : }
717 : DART_LOG_DEBUG("dart_put_blocking: MPI_Win_flush");
718 7 : if (MPI_Win_flush(target_unitid_rel, win) != MPI_SUCCESS) {
719 0 : DART_LOG_ERROR("dart_put_blocking ! MPI_Win_flush failed");
720 0 : return DART_ERR_INVAL;
721 : }
722 :
723 : DART_LOG_DEBUG("dart_put_blocking > finished");
724 7 : return DART_OK;
725 : }
726 :
727 : /**
728 : * TODO: Check if MPI_Accumulate (REPLACE) can bring better performance?
729 : */
730 8519757 : dart_ret_t dart_get_blocking(
731 : void * dest,
732 : dart_gptr_t gptr,
733 : size_t nbytes)
734 : {
735 : MPI_Win win;
736 : MPI_Aint disp_s,
737 : disp_rel;
738 8519757 : dart_unit_t target_unitid_abs = gptr.unitid;
739 8519757 : dart_unit_t target_unitid_rel = target_unitid_abs;
740 8519757 : uint64_t offset = gptr.addr_or_offs.offset;
741 8519757 : int16_t seg_id = gptr.segid;
742 8519757 : uint16_t index = gptr.flags;
743 :
744 : /*
745 : * MPI uses offset type int, do not copy more than INT_MAX elements:
746 : */
747 8519757 : if (nbytes > INT_MAX) {
748 0 : DART_LOG_ERROR("dart_get_blocking ! failed: nbytes > INT_MAX");
749 0 : return DART_ERR_INVAL;
750 : }
751 8519757 : if (seg_id) {
752 8519653 : unit_g2l(index, target_unitid_abs, &target_unitid_rel);
753 : }
754 :
755 : DART_LOG_DEBUG("dart_get_blocking() uid_abs:%d uid_rel:%d "
756 : "o:%"PRIu64" s:%d i:%u, nbytes:%zu",
757 : target_unitid_abs, target_unitid_rel,
758 : offset, seg_id, index, nbytes);
759 :
760 : #if !defined(DART_MPI_DISABLE_SHARED_WINDOWS)
761 : DART_LOG_DEBUG("dart_get_blocking: shared windows enabled");
762 8519757 : if (seg_id >= 0) {
763 : int i;
764 : char * baseptr;
765 : /*
766 : * Use memcpy if the target is in the same node as the calling unit:
767 : * The value of i will be the target's relative ID in teamid.
768 : */
769 8515172 : i = dart_sharedmem_table[index][gptr.unitid];
770 8515172 : if (i >= 0) {
771 : DART_LOG_DEBUG("dart_get_blocking: shared memory segment, seg_id:%d",
772 : seg_id);
773 8515172 : if (seg_id) {
774 8515068 : if (dart_adapt_transtable_get_baseptr(seg_id, i, &baseptr) == -1) {
775 0 : DART_LOG_ERROR("dart_get_blocking ! "
776 : "dart_adapt_transtable_get_baseptr failed");
777 8515172 : return DART_ERR_INVAL;
778 : }
779 : } else {
780 104 : baseptr = dart_sharedmem_local_baseptr_set[i];
781 : }
782 8515172 : baseptr += offset;
783 : DART_LOG_DEBUG("dart_get_blocking: memcpy %zu bytes", nbytes);
784 8515172 : memcpy((char*)dest, baseptr, nbytes);
785 8515172 : return DART_OK;
786 : }
787 : }
788 : #else
789 : DART_LOG_DEBUG("dart_get_blocking: shared windows disabled");
790 : #endif /* !defined(DART_MPI_DISABLE_SHARED_WINDOWS) */
791 : /*
792 : * MPI shared windows disabled or target and calling unit are on different
793 : * nodes, use MPI_Rget:
794 : */
795 4585 : if (seg_id) {
796 4585 : if (dart_adapt_transtable_get_disp(
797 : seg_id,
798 : target_unitid_rel,
799 : &disp_s) == -1) {
800 0 : DART_LOG_ERROR("dart_get_blocking ! "
801 : "dart_adapt_transtable_get_disp failed");
802 0 : return DART_ERR_INVAL;
803 : }
804 4585 : win = dart_win_lists[index];
805 4585 : disp_rel = disp_s + offset;
806 : DART_LOG_DEBUG("dart_get_blocking: nbytes:%zu "
807 : "source (coll.): win:%p unit:%d offset:%p"
808 : "-> dest: %p",
809 : nbytes, (void*)((uint64_t)win), target_unitid_rel,
810 : (void*)disp_rel, dest);
811 : } else {
812 0 : win = dart_win_local_alloc;
813 0 : disp_rel = offset;
814 : DART_LOG_DEBUG("dart_get_blocking: nbytes:%zu "
815 : "source (local): win:%p unit:%d offset:%p "
816 : "-> dest: %p",
817 : nbytes, (void*)((uint64_t)win), target_unitid_rel,
818 : (void*)disp_rel, dest);
819 : }
820 :
821 : /*
822 : * Using MPI_Get as MPI_Win_flush is required to ensure remote completion.
823 : */
824 : DART_LOG_DEBUG("dart_get_blocking: MPI_Get");
825 4585 : if (MPI_Get(dest,
826 : nbytes,
827 : MPI_BYTE,
828 : target_unitid_rel,
829 : disp_rel,
830 : nbytes,
831 : MPI_BYTE,
832 : win)
833 : != MPI_SUCCESS) {
834 0 : DART_LOG_ERROR("dart_get_blocking ! MPI_Get failed");
835 0 : return DART_ERR_INVAL;
836 : }
837 : DART_LOG_DEBUG("dart_get_blocking: MPI_Win_flush");
838 4585 : if (MPI_Win_flush(target_unitid_rel, win) != MPI_SUCCESS) {
839 0 : DART_LOG_ERROR("dart_get_blocking ! MPI_Win_flush failed");
840 0 : return DART_ERR_INVAL;
841 : }
842 :
843 : DART_LOG_DEBUG("dart_get_blocking > finished");
844 4585 : return DART_OK;
845 : }
846 :
847 : /* -- Dart RMA Synchronization Operations -- */
848 :
849 75 : dart_ret_t dart_flush(
850 : dart_gptr_t gptr)
851 : {
852 : MPI_Win win;
853 : dart_unit_t target_unitid_abs;
854 75 : int16_t seg_id = gptr.segid;
855 75 : target_unitid_abs = gptr.unitid;
856 : DART_LOG_DEBUG("dart_flush() gptr: "
857 : "unitid:%d offset:%"PRIu64" segid:%d index:%d",
858 : gptr.unitid, gptr.addr_or_offs.offset,
859 : gptr.segid, gptr.flags);
860 75 : if (seg_id) {
861 : dart_unit_t target_unitid_rel;
862 67 : uint16_t index = gptr.flags;
863 67 : win = dart_win_lists[index];
864 67 : unit_g2l(index, target_unitid_abs, &target_unitid_rel);
865 : DART_LOG_TRACE("dart_flush: MPI_Win_flush");
866 67 : MPI_Win_flush(target_unitid_rel, win);
867 : } else {
868 8 : win = dart_win_local_alloc;
869 : DART_LOG_TRACE("dart_flush: MPI_Win_flush");
870 8 : MPI_Win_flush(target_unitid_abs, win);
871 : }
872 : DART_LOG_DEBUG("dart_flush > finished");
873 75 : return DART_OK;
874 : }
875 :
876 0 : dart_ret_t dart_flush_all(
877 : dart_gptr_t gptr)
878 : {
879 : int16_t seg_id;
880 0 : seg_id = gptr.segid;
881 : MPI_Win win;
882 : DART_LOG_DEBUG("dart_flush_all() gptr: "
883 : "unitid:%d offset:%"PRIu64" segid:%d index:%d",
884 : gptr.unitid, gptr.addr_or_offs.offset,
885 : gptr.segid, gptr.flags);
886 0 : if (seg_id) {
887 0 : uint16_t index = gptr.flags;
888 0 : win = dart_win_lists[index];
889 : } else {
890 0 : win = dart_win_local_alloc;
891 : }
892 : DART_LOG_TRACE("dart_flush_all: MPI_Win_flush_all");
893 0 : MPI_Win_flush_all(win);
894 : DART_LOG_DEBUG("dart_flush_all > finished");
895 0 : return DART_OK;
896 : }
897 :
898 0 : dart_ret_t dart_flush_local(
899 : dart_gptr_t gptr)
900 : {
901 : dart_unit_t target_unitid_abs;
902 0 : int16_t seg_id = gptr.segid;
903 : MPI_Win win;
904 0 : target_unitid_abs = gptr.unitid;
905 : DART_LOG_DEBUG("dart_flush_local() gptr: "
906 : "unitid:%d offset:%"PRIu64" segid:%d index:%d",
907 : gptr.unitid, gptr.addr_or_offs.offset,
908 : gptr.segid, gptr.flags);
909 0 : if (seg_id) {
910 0 : uint16_t index = gptr.flags;
911 : dart_unit_t target_unitid_rel;
912 0 : win = dart_win_lists[index];
913 : DART_LOG_DEBUG("dart_flush_local() win:%"PRIu64" seg:%d unit:%d",
914 : (uint64_t)win, seg_id, target_unitid_abs);
915 0 : unit_g2l(index, target_unitid_abs, &target_unitid_rel);
916 : DART_LOG_TRACE("dart_flush_local: MPI_Win_flush_local");
917 0 : MPI_Win_flush_local(target_unitid_rel, win);
918 : } else {
919 0 : win = dart_win_local_alloc;
920 : DART_LOG_DEBUG("dart_flush_local() lwin:%"PRIu64" seg:%d unit:%d",
921 : (uint64_t)win, seg_id, target_unitid_abs);
922 : DART_LOG_TRACE("dart_flush_local: MPI_Win_flush_local");
923 0 : MPI_Win_flush_local(target_unitid_abs, win);
924 : }
925 : DART_LOG_DEBUG("dart_flush_local > finished");
926 0 : return DART_OK;
927 : }
928 :
929 1325 : dart_ret_t dart_flush_local_all(
930 : dart_gptr_t gptr)
931 : {
932 1325 : int16_t seg_id = gptr.segid;
933 : MPI_Win win;
934 : DART_LOG_DEBUG("dart_flush_local_all() gptr: "
935 : "unitid:%d offset:%"PRIu64" segid:%d index:%d",
936 : gptr.unitid, gptr.addr_or_offs.offset,
937 : gptr.segid, gptr.flags);
938 1325 : if (seg_id) {
939 1325 : uint16_t index = gptr.flags;
940 1325 : win = dart_win_lists[index];
941 : } else {
942 0 : win = dart_win_local_alloc;
943 : }
944 1325 : MPI_Win_flush_local_all(win);
945 : DART_LOG_DEBUG("dart_flush_local_all > finished");
946 1325 : return DART_OK;
947 : }
948 :
949 0 : dart_ret_t dart_wait_local(
950 : dart_handle_t handle)
951 : {
952 : int mpi_ret;
953 : DART_LOG_DEBUG("dart_wait_local() handle:%p", (void*)(handle));
954 0 : if (handle != NULL) {
955 : DART_LOG_TRACE("dart_wait_local: handle->dest: %d",
956 : handle->dest);
957 : DART_LOG_TRACE("dart_wait_local: handle->win: %p",
958 : (void*)(uint64_t)(handle->win));
959 : DART_LOG_TRACE("dart_wait_local: handle->req: %ld",
960 : (int64_t)handle->request);
961 0 : if (handle->request != MPI_REQUEST_NULL) {
962 : MPI_Status mpi_sta;
963 0 : mpi_ret = MPI_Wait(&(handle->request), &mpi_sta);
964 : DART_LOG_TRACE("dart_wait_local: -- mpi_sta.MPI_SOURCE = %d",
965 : mpi_sta.MPI_SOURCE);
966 : DART_LOG_TRACE("dart_wait_local: -- mpi_sta.MPI_ERROR = %d (%s)",
967 : mpi_sta.MPI_ERROR,
968 : DART__MPI__ERROR_STR(mpi_sta.MPI_ERROR));
969 0 : if (mpi_ret != MPI_SUCCESS) {
970 : DART_LOG_DEBUG("dart_wait_local ! MPI_Wait failed");
971 0 : return DART_ERR_INVAL;
972 : }
973 : } else {
974 : DART_LOG_TRACE("dart_wait_local: handle->req == MPI_REQUEST_NULL");
975 : }
976 : /*
977 : * Do not free handle resource, it could be needed for a following
978 : * dart_wait() or dart_wait_all() to ensure remote completion.
979 : */
980 : }
981 : DART_LOG_DEBUG("dart_wait_local > finished");
982 0 : return DART_OK;
983 : }
984 :
985 0 : dart_ret_t dart_wait(
986 : dart_handle_t handle)
987 : {
988 : int mpi_ret;
989 : DART_LOG_DEBUG("dart_wait() handle:%p", (void*)(handle));
990 0 : if (handle != NULL) {
991 : DART_LOG_TRACE("dart_wait_local: handle->dest: %d",
992 : handle->dest);
993 : DART_LOG_TRACE("dart_wait_local: handle->win: %"PRIu64"",
994 : (uint64_t)handle->win);
995 : DART_LOG_TRACE("dart_wait_local: handle->req: %ld",
996 : (uint64_t)handle->request);
997 0 : if (handle->request != MPI_REQUEST_NULL) {
998 : MPI_Status mpi_sta;
999 : DART_LOG_DEBUG("dart_wait: -- MPI_Wait");
1000 0 : mpi_ret = MPI_Wait(&(handle->request), &mpi_sta);
1001 : DART_LOG_TRACE("dart_wait: -- mpi_sta.MPI_SOURCE: %d",
1002 : mpi_sta.MPI_SOURCE);
1003 : DART_LOG_TRACE("dart_wait: -- mpi_sta.MPI_ERROR: %d:%s",
1004 : mpi_sta.MPI_ERROR,
1005 : DART__MPI__ERROR_STR(mpi_sta.MPI_ERROR));
1006 0 : if (mpi_ret != MPI_SUCCESS) {
1007 : DART_LOG_DEBUG("dart_wait ! MPI_Wait failed");
1008 0 : return DART_ERR_INVAL;
1009 : }
1010 : DART_LOG_DEBUG("dart_wait: -- MPI_Win_flush");
1011 0 : mpi_ret = MPI_Win_flush(handle->dest, handle->win);
1012 0 : if (mpi_ret != MPI_SUCCESS) {
1013 : DART_LOG_DEBUG("dart_wait ! MPI_Win_flush failed");
1014 0 : return DART_ERR_INVAL;
1015 : }
1016 : } else {
1017 : DART_LOG_TRACE("dart_wait: handle->request: MPI_REQUEST_NULL");
1018 : }
1019 : /* Free handle resource */
1020 : DART_LOG_DEBUG("dart_wait: free handle %p", (void*)(handle));
1021 0 : free(handle);
1022 0 : handle = NULL;
1023 : }
1024 : DART_LOG_DEBUG("dart_wait > finished");
1025 0 : return DART_OK;
1026 : }
1027 :
1028 4 : dart_ret_t dart_waitall_local(
1029 : dart_handle_t * handle,
1030 : size_t num_handles)
1031 : {
1032 4 : size_t i, r_n = 0;
1033 : DART_LOG_DEBUG("dart_waitall_local()");
1034 4 : if (num_handles == 0) {
1035 : DART_LOG_DEBUG("dart_waitall_local > number of handles = 0");
1036 0 : return DART_OK;
1037 : }
1038 4 : if (num_handles > INT_MAX) {
1039 0 : DART_LOG_ERROR("dart_waitall_local ! number of handles > INT_MAX");
1040 0 : return DART_ERR_INVAL;
1041 : }
1042 4 : if (*handle != NULL) {
1043 : MPI_Status *mpi_sta;
1044 : MPI_Request *mpi_req;
1045 4 : mpi_req = (MPI_Request *) malloc(num_handles * sizeof(MPI_Request));
1046 4 : mpi_sta = (MPI_Status *) malloc(num_handles * sizeof(MPI_Status));
1047 16 : for (i = 0; i < num_handles; i++) {
1048 12 : if (handle[i] != NULL) {
1049 : DART_LOG_TRACE("dart_waitall_local: -- handle[%"PRIu64"]: %p)",
1050 : i, (void*)handle[i]);
1051 : DART_LOG_TRACE("dart_waitall_local: handle[%"PRIu64"]->dest: %d",
1052 : i, handle[i]->dest);
1053 : DART_LOG_TRACE("dart_waitall_local: handle[%"PRIu64"]->win: %p",
1054 : i, (void*)((uint64_t)(handle[i]->win)));
1055 : DART_LOG_TRACE("dart_waitall_local: handle[%"PRIu64"]->req: %p",
1056 : i, (void*)((uint64_t)(handle[i]->request)));
1057 12 : mpi_req[r_n] = handle[i]->request;
1058 12 : r_n++;
1059 : }
1060 : }
1061 : /*
1062 : * Wait for local completion of MPI requests:
1063 : */
1064 : DART_LOG_DEBUG("dart_waitall_local: "
1065 : "MPI_Waitall, %"PRIu64" requests from %"PRIu64" handles",
1066 : r_n, num_handles);
1067 4 : if (r_n > 0) {
1068 4 : if (MPI_Waitall(r_n, mpi_req, mpi_sta) == MPI_SUCCESS) {
1069 : DART_LOG_DEBUG("dart_waitall_local: MPI_Waitall completed");
1070 : } else {
1071 0 : DART_LOG_ERROR("dart_waitall_local: MPI_Waitall failed");
1072 : DART_LOG_TRACE("dart_waitall_local: free MPI_Request temporaries");
1073 0 : free(mpi_req);
1074 : DART_LOG_TRACE("dart_waitall_local: free MPI_Status temporaries");
1075 0 : free(mpi_sta);
1076 0 : return DART_ERR_INVAL;
1077 : }
1078 : } else {
1079 : DART_LOG_DEBUG("dart_waitall_local > number of requests = 0");
1080 0 : return DART_OK;
1081 : }
1082 : /*
1083 : * copy MPI requests back to DART handles:
1084 : */
1085 : DART_LOG_TRACE("dart_waitall_local: "
1086 : "copying MPI requests back to DART handles");
1087 4 : r_n = 0;
1088 16 : for (i = 0; i < num_handles; i++) {
1089 12 : if (handle[i]) {
1090 : DART_LOG_TRACE("dart_waitall_local: -- mpi_sta[%"PRIu64"].MPI_SOURCE:"
1091 : " %d",
1092 : r_n, mpi_sta[r_n].MPI_SOURCE);
1093 : DART_LOG_TRACE("dart_waitall_local: -- mpi_sta[%"PRIu64"].MPI_ERROR:"
1094 : " %d:%s",
1095 : r_n,
1096 : mpi_sta[r_n].MPI_ERROR,
1097 : DART__MPI__ERROR_STR(mpi_sta[r_n].MPI_ERROR));
1098 12 : if (mpi_req[r_n] != MPI_REQUEST_NULL) {
1099 0 : if (mpi_sta[r_n].MPI_ERROR == MPI_SUCCESS) {
1100 : DART_LOG_TRACE("dart_waitall_local: -- MPI_Request_free");
1101 0 : if (MPI_Request_free(&mpi_req[r_n]) != MPI_SUCCESS) {
1102 : DART_LOG_TRACE("dart_waitall_local ! MPI_Request_free failed");
1103 0 : free(mpi_req);
1104 0 : free(mpi_sta);
1105 0 : return DART_ERR_INVAL;
1106 : }
1107 : } else {
1108 : DART_LOG_TRACE("dart_waitall_local: cannot free request %d "
1109 : "mpi_sta[%d] = %d (%s)",
1110 : r_n,
1111 : r_n,
1112 : mpi_sta[r_n].MPI_ERROR,
1113 : DART__MPI__ERROR_STR(mpi_sta[r_n].MPI_ERROR));
1114 : }
1115 : }
1116 : DART_LOG_DEBUG("dart_waitall_local: free handle[%d] %p",
1117 : i, (void*)(handle[i]));
1118 12 : free(handle[i]);
1119 12 : handle[i] = NULL;
1120 12 : r_n++;
1121 : }
1122 : }
1123 : DART_LOG_TRACE("dart_waitall_local: free MPI_Request temporaries");
1124 4 : free(mpi_req);
1125 : DART_LOG_TRACE("dart_waitall_local: free MPI_Status temporaries");
1126 4 : free(mpi_sta);
1127 : }
1128 : DART_LOG_DEBUG("dart_waitall_local > finished");
1129 4 : return DART_OK;
1130 : }
1131 :
1132 0 : dart_ret_t dart_waitall(
1133 : dart_handle_t * handle,
1134 : size_t n)
1135 : {
1136 : int i, r_n;
1137 0 : int num_handles = (int)n;
1138 : DART_LOG_DEBUG("dart_waitall()");
1139 0 : if (n == 0) {
1140 0 : DART_LOG_ERROR("dart_waitall > number of handles = 0");
1141 0 : return DART_OK;
1142 : }
1143 0 : if (n > INT_MAX) {
1144 0 : DART_LOG_ERROR("dart_waitall ! number of handles > INT_MAX");
1145 0 : return DART_ERR_INVAL;
1146 : }
1147 : DART_LOG_DEBUG("dart_waitall: number of handles: %d", num_handles);
1148 0 : if (*handle) {
1149 : MPI_Status *mpi_sta;
1150 : MPI_Request *mpi_req;
1151 0 : mpi_req = (MPI_Request *) malloc(num_handles * sizeof(MPI_Request));
1152 0 : mpi_sta = (MPI_Status *) malloc(num_handles * sizeof(MPI_Status));
1153 : /*
1154 : * copy requests from DART handles to MPI request array:
1155 : */
1156 : DART_LOG_TRACE("dart_waitall: copying DART handles to MPI request array");
1157 0 : r_n = 0;
1158 0 : for (i = 0; i < num_handles; i++) {
1159 0 : if (handle[i] != NULL) {
1160 : DART_LOG_DEBUG("dart_waitall: -- handle[%d](%p): "
1161 : "dest:%d win:%"PRIu64" req:%"PRIu64"",
1162 : i, (void*)handle[i],
1163 : handle[i]->dest,
1164 : (uint64_t)handle[i]->win,
1165 : (uint64_t)handle[i]->request);
1166 0 : mpi_req[r_n] = handle[i]->request;
1167 0 : r_n++;
1168 : }
1169 : }
1170 : /*
1171 : * wait for communication of MPI requests:
1172 : */
1173 : DART_LOG_DEBUG("dart_waitall: MPI_Waitall, %d requests from %d handles",
1174 : r_n, num_handles);
1175 : /* From the MPI 3.1 standard:
1176 : *
1177 : * The i-th entry in array_of_statuses is set to the return
1178 : * status of the i-th operation. Active persistent requests
1179 : * are marked inactive.
1180 : * Requests of any other type are deallocated and the
1181 : * corresponding handles in the array are set to
1182 : * MPI_REQUEST_NULL.
1183 : * The list may contain null or inactive handles.
1184 : * The call sets to empty the status of each such entry.
1185 : */
1186 0 : if (r_n > 0) {
1187 0 : if (MPI_Waitall(r_n, mpi_req, mpi_sta) == MPI_SUCCESS) {
1188 : DART_LOG_DEBUG("dart_waitall: MPI_Waitall completed");
1189 : } else {
1190 0 : DART_LOG_ERROR("dart_waitall: MPI_Waitall failed");
1191 : DART_LOG_TRACE("dart_waitall: free MPI_Request temporaries");
1192 0 : free(mpi_req);
1193 : DART_LOG_TRACE("dart_waitall: free MPI_Status temporaries");
1194 0 : free(mpi_sta);
1195 0 : return DART_ERR_INVAL;
1196 : }
1197 : } else {
1198 : DART_LOG_DEBUG("dart_waitall > number of requests = 0");
1199 0 : return DART_OK;
1200 : }
1201 : /*
1202 : * copy MPI requests back to DART handles:
1203 : */
1204 : DART_LOG_TRACE("dart_waitall: copying MPI requests back to DART handles");
1205 0 : r_n = 0;
1206 0 : for (i = 0; i < num_handles; i++) {
1207 0 : if (handle[i]) {
1208 0 : if (mpi_req[r_n] == MPI_REQUEST_NULL) {
1209 : DART_LOG_TRACE("dart_waitall: -- mpi_req[%d] = MPI_REQUEST_NULL",
1210 : r_n);
1211 : } else {
1212 : DART_LOG_TRACE("dart_waitall: -- mpi_req[%d] = %d",
1213 : r_n, mpi_req[r_n]);
1214 : }
1215 : DART_LOG_TRACE("dart_waitall: -- mpi_sta[%d].MPI_SOURCE: %d",
1216 : r_n, mpi_sta[r_n].MPI_SOURCE);
1217 : DART_LOG_TRACE("dart_waitall: -- mpi_sta[%d].MPI_ERROR: %d:%s",
1218 : r_n,
1219 : mpi_sta[r_n].MPI_ERROR,
1220 : DART__MPI__ERROR_STR(mpi_sta[r_n].MPI_ERROR));
1221 0 : handle[i]->request = mpi_req[r_n];
1222 0 : r_n++;
1223 : }
1224 : }
1225 : /*
1226 : * wait for completion of MPI requests at origins and targets:
1227 : */
1228 : DART_LOG_DEBUG("dart_waitall: waiting for remote completion");
1229 0 : for (i = 0; i < num_handles; i++) {
1230 0 : if (handle[i]) {
1231 0 : if (handle[i]->request == MPI_REQUEST_NULL) {
1232 : DART_LOG_TRACE("dart_waitall: -- handle[%d] done (MPI_REQUEST_NULL)",
1233 : i);
1234 : } else {
1235 : DART_LOG_DEBUG("dart_waitall: -- MPI_Win_flush(handle[%d]: %p))",
1236 : i, (void*)handle[i]);
1237 : DART_LOG_TRACE("dart_waitall: handle[%d]->dest: %d",
1238 : i, handle[i]->dest);
1239 : DART_LOG_TRACE("dart_waitall: handle[%d]->win: %"PRIu64"",
1240 : i, (uint64_t)handle[i]->win);
1241 : DART_LOG_TRACE("dart_waitall: handle[%d]->req: %"PRIu64"",
1242 : i, (uint64_t)handle[i]->request);
1243 : /*
1244 : * MPI_Win_flush to wait for remote completion:
1245 : */
1246 0 : if (MPI_Win_flush(handle[i]->dest, handle[i]->win) != MPI_SUCCESS) {
1247 0 : DART_LOG_ERROR("dart_waitall: MPI_Win_flush failed");
1248 : DART_LOG_TRACE("dart_waitall: free MPI_Request temporaries");
1249 0 : free(mpi_req);
1250 : DART_LOG_TRACE("dart_waitall: free MPI_Status temporaries");
1251 0 : free(mpi_sta);
1252 0 : return DART_ERR_INVAL;
1253 : }
1254 : DART_LOG_TRACE("dart_waitall: -- MPI_Request_free");
1255 0 : if (MPI_Request_free(&handle[i]->request) != MPI_SUCCESS) {
1256 0 : DART_LOG_ERROR("dart_waitall: MPI_Request_free failed");
1257 : DART_LOG_TRACE("dart_waitall: free MPI_Request temporaries");
1258 0 : free(mpi_req);
1259 : DART_LOG_TRACE("dart_waitall: free MPI_Status temporaries");
1260 0 : free(mpi_sta);
1261 0 : return DART_ERR_INVAL;
1262 : }
1263 : }
1264 : }
1265 : }
1266 : /*
1267 : * free memory:
1268 : */
1269 : DART_LOG_DEBUG("dart_waitall: free handles");
1270 0 : for (i = 0; i < num_handles; i++) {
1271 0 : if (handle[i]) {
1272 : /* Free handle resource */
1273 : DART_LOG_TRACE("dart_waitall: -- free handle[%d]: %p",
1274 : i, (void*)(handle[i]));
1275 0 : free(handle[i]);
1276 0 : handle[i] = NULL;
1277 : }
1278 : }
1279 : DART_LOG_TRACE("dart_waitall: free MPI_Request temporaries");
1280 0 : free(mpi_req);
1281 : DART_LOG_TRACE("dart_waitall: free MPI_Status temporaries");
1282 0 : free(mpi_sta);
1283 : }
1284 : DART_LOG_DEBUG("dart_waitall > finished");
1285 0 : return DART_OK;
1286 : }
1287 :
1288 0 : dart_ret_t dart_test_local(
1289 : dart_handle_t handle,
1290 : int32_t* is_finished)
1291 : {
1292 : DART_LOG_DEBUG("dart_test_local()");
1293 0 : if (!handle) {
1294 0 : *is_finished = 1;
1295 0 : return DART_OK;
1296 : }
1297 : MPI_Status mpi_sta;
1298 0 : MPI_Test (&(handle->request), is_finished, &mpi_sta);
1299 : DART_LOG_DEBUG("dart_test_local > finished");
1300 0 : return DART_OK;
1301 : }
1302 :
1303 0 : dart_ret_t dart_testall_local(
1304 : dart_handle_t * handle,
1305 : size_t n,
1306 : int32_t * is_finished)
1307 : {
1308 : size_t i, r_n;
1309 : DART_LOG_DEBUG("dart_testall_local()");
1310 : MPI_Status *mpi_sta;
1311 : MPI_Request *mpi_req;
1312 0 : mpi_req = (MPI_Request *)malloc(n * sizeof (MPI_Request));
1313 0 : mpi_sta = (MPI_Status *)malloc(n * sizeof (MPI_Status));
1314 0 : r_n = 0;
1315 0 : for (i = 0; i < n; i++) {
1316 0 : if (handle[i]){
1317 0 : mpi_req[r_n] = handle[i] -> request;
1318 0 : r_n++;
1319 : }
1320 : }
1321 0 : MPI_Testall(r_n, mpi_req, is_finished, mpi_sta);
1322 0 : r_n = 0;
1323 0 : for (i = 0; i < n; i++) {
1324 0 : if (handle[i]) {
1325 0 : handle[i] -> request = mpi_req[r_n];
1326 0 : r_n++;
1327 : }
1328 : }
1329 0 : free(mpi_req);
1330 0 : free(mpi_sta);
1331 : DART_LOG_DEBUG("dart_testall_local > finished");
1332 0 : return DART_OK;
1333 : }
1334 :
1335 : /* -- Dart collective operations -- */
1336 :
1337 3748 : dart_ret_t dart_barrier(
1338 : dart_team_t teamid)
1339 : {
1340 : MPI_Comm comm;
1341 : uint16_t index;
1342 : int result;
1343 : DART_LOG_DEBUG("dart_barrier()");
1344 3748 : result = dart_adapt_teamlist_convert(teamid, &index);
1345 3748 : if (result == -1) {
1346 0 : return DART_ERR_INVAL;
1347 : }
1348 : /* Fetch proper communicator from teams. */
1349 3748 : comm = dart_teams[index];
1350 3748 : if (MPI_Barrier(comm) == MPI_SUCCESS) {
1351 : DART_LOG_DEBUG("dart_barrier > finished");
1352 3748 : return DART_OK;
1353 : }
1354 : DART_LOG_DEBUG("dart_barrier ! MPI_Barrier failed");
1355 0 : return DART_ERR_INVAL;
1356 : }
1357 :
1358 96 : dart_ret_t dart_bcast(
1359 : void * buf,
1360 : size_t nbytes,
1361 : int root,
1362 : dart_team_t teamid)
1363 : {
1364 : MPI_Comm comm;
1365 : uint16_t index;
1366 96 : int result = dart_adapt_teamlist_convert (teamid, &index);
1367 96 : if (result == -1) {
1368 0 : return DART_ERR_INVAL;
1369 : }
1370 96 : comm = dart_teams[index];
1371 96 : if (MPI_Bcast(buf, nbytes, MPI_BYTE, root, comm) != MPI_SUCCESS) {
1372 0 : return DART_ERR_INVAL;
1373 : }
1374 96 : return DART_OK;
1375 : }
1376 :
1377 0 : dart_ret_t dart_scatter(
1378 : void *sendbuf,
1379 : void *recvbuf,
1380 : size_t nbytes,
1381 : int root,
1382 : dart_team_t teamid)
1383 : {
1384 : MPI_Comm comm;
1385 : uint16_t index;
1386 0 : int result = dart_adapt_teamlist_convert(teamid, &index);
1387 0 : if (result == -1) {
1388 0 : return DART_ERR_INVAL;
1389 : }
1390 0 : comm = dart_teams[index];
1391 0 : if (MPI_Scatter(
1392 : sendbuf,
1393 : nbytes,
1394 : MPI_BYTE,
1395 : recvbuf,
1396 : nbytes,
1397 : MPI_BYTE,
1398 : root,
1399 : comm) != MPI_SUCCESS) {
1400 0 : return DART_ERR_INVAL;
1401 : }
1402 0 : return DART_OK;
1403 : }
1404 :
1405 0 : dart_ret_t dart_gather(
1406 : void *sendbuf,
1407 : void *recvbuf,
1408 : size_t nbytes,
1409 : int root,
1410 : dart_team_t teamid)
1411 : {
1412 : MPI_Comm comm;
1413 : uint16_t index;
1414 0 : int result = dart_adapt_teamlist_convert(teamid, &index);
1415 0 : if (result == -1) {
1416 0 : return DART_ERR_INVAL;
1417 : }
1418 0 : comm = dart_teams[index];
1419 0 : if (MPI_Gather(
1420 : sendbuf,
1421 : nbytes,
1422 : MPI_BYTE,
1423 : recvbuf,
1424 : nbytes,
1425 : MPI_BYTE,
1426 : root,
1427 : comm) != MPI_SUCCESS) {
1428 0 : return DART_ERR_INVAL;
1429 : }
1430 0 : return DART_OK;
1431 : }
1432 :
1433 836 : dart_ret_t dart_allgather(
1434 : void * sendbuf,
1435 : void * recvbuf,
1436 : size_t nbytes,
1437 : dart_team_t teamid)
1438 : {
1439 : MPI_Comm comm;
1440 : uint16_t index;
1441 : int result;
1442 : DART_LOG_TRACE("dart_allgather() team:%d nbytes:%"PRIu64"",
1443 : teamid, nbytes);
1444 :
1445 836 : result = dart_adapt_teamlist_convert(teamid, &index);
1446 836 : if (result == -1) {
1447 0 : return DART_ERR_INVAL;
1448 : }
1449 836 : comm = dart_teams[index];
1450 836 : if (MPI_Allgather(
1451 : sendbuf,
1452 : nbytes,
1453 : MPI_BYTE,
1454 : recvbuf,
1455 : nbytes,
1456 : MPI_BYTE,
1457 : comm) != MPI_SUCCESS) {
1458 0 : DART_LOG_ERROR("dart_allgather ! team:%d nbytes:%"PRIu64" failed",
1459 : teamid, nbytes);
1460 0 : return DART_ERR_INVAL;
1461 : }
1462 : DART_LOG_TRACE("dart_allgather > team:%d nbytes:%"PRIu64"",
1463 : teamid, nbytes);
1464 836 : return DART_OK;
1465 : }
1466 :
1467 0 : dart_ret_t dart_allreduce(
1468 : void * sendbuf,
1469 : void * recvbuf,
1470 : size_t nbytes,
1471 : dart_datatype_t dtype,
1472 : dart_operation_t op,
1473 : dart_team_t team)
1474 : {
1475 : MPI_Comm comm;
1476 0 : MPI_Op mpi_op = dart_mpi_op(op);
1477 0 : MPI_Datatype mpi_dtype = dart_mpi_datatype(dtype);
1478 : uint16_t team_idx;
1479 0 : int result = dart_adapt_teamlist_convert(team, &team_idx);
1480 :
1481 0 : if (result == -1) {
1482 0 : return DART_ERR_INVAL;
1483 : }
1484 0 : comm = dart_teams[team_idx];
1485 0 : if (MPI_Allreduce(
1486 : sendbuf, // send buffer
1487 : recvbuf, // receive buffer
1488 : nbytes, // buffer size
1489 : mpi_dtype, // datatype
1490 : mpi_op, // reduce operation
1491 : comm) != MPI_SUCCESS) {
1492 0 : return DART_ERR_INVAL;
1493 : }
1494 0 : return DART_OK;
1495 : }
1496 :
1497 0 : dart_ret_t dart_reduce_double(
1498 : double *sendbuf,
1499 : double *recvbuf,
1500 : dart_team_t teamid)
1501 : {
1502 : MPI_Comm comm;
1503 : uint16_t index;
1504 0 : int result = dart_adapt_teamlist_convert (teamid, &index);
1505 0 : if (result == -1) {
1506 0 : return DART_ERR_INVAL;
1507 : }
1508 0 : comm = dart_teams[index];
1509 0 : if (MPI_Reduce(
1510 : sendbuf,
1511 : recvbuf,
1512 : 1,
1513 : MPI_DOUBLE,
1514 : MPI_MAX,
1515 : 0,
1516 : comm) != MPI_SUCCESS) {
1517 0 : return DART_ERR_INVAL;
1518 : }
1519 0 : return DART_OK;
1520 : }
|