scheduler.c 46.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/*******************************************************************************
 * This file is part of SWIFT.
 * Coypright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
 * 
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
 * 
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 * 
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
 * 
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <math.h>
28
#include <pthread.h>
29
30
#include <limits.h>
#include <omp.h>
31

32
33
34
35
36
/* MPI headers. */
#ifdef WITH_MPI
    #include <mpi.h>
#endif

37
38
39
40
41
42
43
44
45
46
47
48
/* Local headers. */
#include "error.h"
#include "cycle.h"
#include "atomic.h"
#include "timers.h"
#include "const.h"
#include "vector.h"
#include "lock.h"
#include "task.h"
#include "part.h"
#include "debug.h"
#include "space.h"
49
#include "multipole.h"
50
#include "cell.h"
51
52
53
54
55
#include "queue.h"
#include "kernel.h"
#include "scheduler.h"


56
57
58
59
60
61
62
63
64
65
66
67
68
69
/**
 * @brief Add an unlock_task to the given task.
 *
 * @param s The #scheduler.
 * @param ta The unlocking #task.
 * @param tb The #task that will be unlocked.
 */
 
void scheduler_addunlock ( struct scheduler *s , struct task *ta , struct task *tb ) {

    /* Main loop. */
    while ( 1 ) {

        /* Follow the links. */
70
71
        while ( ta->nr_unlock_tasks == task_maxunlock+1 )
            ta = ta->unlock_tasks[ task_maxunlock ];
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86

        /* Get the index of the next free task. */
        int ind = atomic_inc( &ta->nr_unlock_tasks );

        /* Is there room in this task? */
        if ( ind < task_maxunlock ) {
            ta->unlock_tasks[ ind ] = tb;
            break;
            }

        /* Otherwise, generate a link task. */
        else {
        
            /* Only one thread should have to do this. */
            if ( ind == task_maxunlock ) {
87
88
                ta->unlock_tasks[ task_maxunlock ] = scheduler_addtask( s , task_type_link , task_subtype_none , ta->flags , 0 , ta->ci , ta->cj , 0 );
                ta->unlock_tasks[ task_maxunlock ]->implicit = 1;
89
90
91
92
93
94
95
96
97
98
99
100
101
                }

            /* Otherwise, reduce the count. */
            else
                atomic_dec( &ta->nr_unlock_tasks );

            }
            
        }

    }
    

102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/**
 * @brief Split tasks that may be too large.
 *
 * @param s The #scheduler we are working in.
 */
 
void scheduler_splittasks ( struct scheduler *s ) {

    int j, k, ind, sid, tid = 0, redo;
    struct cell *ci, *cj;
    double hi, hj, shift[3];
    struct task *t, *t_old;
    // float dt_step = s->dt_step;
    int pts[7][8] = { { -1 , 12 , 10 ,  9 ,  4 ,  3 ,  1 ,  0 } ,
                      { -1 , -1 , 11 , 10 ,  5 ,  4 ,  2 ,  1 } ,
                      { -1 , -1 , -1 , 12 ,  7 ,  6 ,  4 ,  3 } , 
                      { -1 , -1 , -1 , -1 ,  8 ,  7 ,  5 ,  4 } ,
                      { -1 , -1 , -1 , -1 , -1 , 12 , 10 ,  9 } ,
                      { -1 , -1 , -1 , -1 , -1 , -1 , 11 , 10 } ,
                      { -1 , -1 , -1 , -1 , -1 , -1 , -1 , 12 } };
122
123
124
    float sid_scale[13] = { 0.1897 , 0.4025 , 0.1897 , 0.4025 , 0.5788 , 0.4025 ,
                            0.1897 , 0.4025 , 0.1897 , 0.4025 , 0.5788 , 0.4025 , 
                            0.5788 };
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142

    /* Loop through the tasks... */
    redo = 0; t_old = t = NULL;
    while ( 1 ) {
    
        /* Get a pointer on the task. */
        if ( redo ) {
            redo = 0;
            t = t_old;
            }
        else {
            if ( ( ind = atomic_inc( &tid ) ) < s->nr_tasks )
                t_old = t = &s->tasks[ s->tasks_ind[ ind ] ];
            else
                break;
            }
        
        /* Empty task? */
Pedro Gonnet's avatar
Pedro Gonnet committed
143
        if ( t->ci == NULL || ( t->type == task_type_pair && t->cj == NULL ) ) {
144
145
146
            t->type = task_type_none;
            t->skip = 1;
            continue;
Pedro Gonnet's avatar
Pedro Gonnet committed
147
            }
148
149
150
151
152
153
154
155
156
            
        /* Non-local kick task? */
        if ( (t->type == task_type_kick1 || t->type == task_type_kick2 ) &&
             t->ci->nodeID != s->nodeID ) {
            t->type = task_type_none;
            t->skip = 1;
            continue;
            }
            
157
158
159
160
161
162
        /* Self-interaction? */
        if ( t->type == task_type_self ) {
        
            /* Get a handle on the cell involved. */
            ci = t->ci;
            
163
164
            /* Foreign task? */
            if ( ci->nodeID != s->nodeID ) {
165
166
                t->skip = 1;
                continue;
167
                }
168
169
170
171
172
            
            /* Is this cell even split? */
            if ( ci->split ) {
            
                /* Make a sub? */
173
                if ( scheduler_dosub && ci->count < space_subsize/ci->count ) {
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213

                    /* convert to a self-subtask. */
                    t->type = task_type_sub;

                    }

                /* Otherwise, make tasks explicitly. */
                else {

                    /* Take a step back (we're going to recycle the current task)... */
                    redo = 1;

                    /* Add the self taks. */
                    for ( k = 0 ; ci->progeny[k] == NULL ; k++ );
                    t->ci = ci->progeny[k];
                    for ( k += 1 ; k < 8 ; k++ )
                        if ( ci->progeny[k] != NULL )
                            scheduler_addtask( s , task_type_self , task_subtype_density , 0 , 0 , ci->progeny[k] , NULL , 0 );

                    /* Make a task for each pair of progeny. */
                    for ( j = 0 ; j < 8 ; j++ )
                        if ( ci->progeny[j] != NULL )
                            for ( k = j + 1 ; k < 8 ; k++ )
                                if ( ci->progeny[k] != NULL )
                                    scheduler_addtask( s , task_type_pair , task_subtype_density , pts[j][k] , 0 , ci->progeny[j] , ci->progeny[k] , 0 );
                    }

                }
        
            }
    
        /* Pair interaction? */
        else if ( t->type == task_type_pair ) {
            
            /* Get a handle on the cells involved. */
            ci = t->ci;
            cj = t->cj;
            hi = ci->dmin;
            hj = cj->dmin;

214
215
            /* Foreign task? */
            if ( ci->nodeID != s->nodeID && cj->nodeID != s->nodeID ) {
216
217
                t->skip = 1;
                continue;
218
                }
219
220
221
222
223
224
225
            
            /* Get the sort ID, use space_getsid and not t->flags
               to make sure we get ci and cj swapped if needed. */
            sid = space_getsid( s->space , &ci , &cj , shift );
                
            /* Should this task be split-up? */
            if ( ci->split && cj->split &&
Pedro Gonnet's avatar
Pedro Gonnet committed
226
227
                 ci->h_max*kernel_gamma*space_stretch < hi/2 &&
                 cj->h_max*kernel_gamma*space_stretch < hj/2 ) {
228
229
230
                 
                /* Replace by a single sub-task? */
                if ( scheduler_dosub &&
231
                     ci->count * sid_scale[sid] < space_subsize/cj->count &&
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
                     sid != 0 && sid != 2 && sid != 6 && sid != 8 ) {
                
                    /* Make this task a sub task. */
                    t->type = task_type_sub;

                    }
                    
                /* Otherwise, split it. */
                else {

                    /* Take a step back (we're going to recycle the current task)... */
                    redo = 1;

                    /* For each different sorting type... */
                    switch ( sid ) {

                        case 0: /* (  1 ,  1 ,  1 ) */
                            t->ci = ci->progeny[7]; t->cj = cj->progeny[0]; t->flags = 0;
                            break;

                        case 1: /* (  1 ,  1 ,  0 ) */
                            t->ci = ci->progeny[6]; t->cj = cj->progeny[0]; t->flags = 1; t->tight = 1;
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 1 , 0 , ci->progeny[7] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 0 , 0 , ci->progeny[6] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 2 , 0 , ci->progeny[7] , cj->progeny[0] , 1 );
                            break;

                        case 2: /* (  1 ,  1 , -1 ) */
                            t->ci = ci->progeny[6]; t->cj = cj->progeny[1]; t->flags = 2; t->tight = 1;
                            break;

                        case 3: /* (  1 ,  0 ,  1 ) */
                            t->ci = ci->progeny[5]; t->cj = cj->progeny[0]; t->flags = 3; t->tight = 1;
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 3 , 0 , ci->progeny[7] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 0 , 0 , ci->progeny[5] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 6 , 0 , ci->progeny[7] , cj->progeny[0] , 1 );
                            break;

                        case 4: /* (  1 ,  0 ,  0 ) */
                            t->ci = ci->progeny[4]; t->cj = cj->progeny[0]; t->flags = 4; t->tight = 1;
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 5 , 0 , ci->progeny[5] , cj->progeny[0] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 7 , 0 , ci->progeny[6] , cj->progeny[0] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 8 , 0 , ci->progeny[7] , cj->progeny[0] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 3 , 0 , ci->progeny[4] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 4 , 0 , ci->progeny[5] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 6 , 0 , ci->progeny[6] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 7 , 0 , ci->progeny[7] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 1 , 0 , ci->progeny[4] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 2 , 0 , ci->progeny[5] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 4 , 0 , ci->progeny[6] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 5 , 0 , ci->progeny[7] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 0 , 0 , ci->progeny[4] , cj->progeny[3] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 1 , 0 , ci->progeny[5] , cj->progeny[3] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 3 , 0 , ci->progeny[6] , cj->progeny[3] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 4 , 0 , ci->progeny[7] , cj->progeny[3] , 1 );
                            break;

                        case 5: /* (  1 ,  0 , -1 ) */
                            t->ci = ci->progeny[4]; t->cj = cj->progeny[1]; t->flags = 5; t->tight = 1;
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 5 , 0 , ci->progeny[6] , cj->progeny[3] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 2 , 0 , ci->progeny[4] , cj->progeny[3] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 8 , 0 , ci->progeny[6] , cj->progeny[1] , 1 );
                            break;

                        case 6: /* (  1 , -1 ,  1 ) */
                            t->ci = ci->progeny[5]; t->cj = cj->progeny[2]; t->flags = 6; t->tight = 1;
                            break;

                        case 7: /* (  1 , -1 ,  0 ) */
                            t->ci = ci->progeny[4]; t->cj = cj->progeny[3]; t->flags = 6; t->tight = 1;
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 8 , 0 , ci->progeny[5] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 7 , 0 , ci->progeny[4] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 7 , 0 , ci->progeny[5] , cj->progeny[3] , 1 );
                            break;

                        case 8: /* (  1 , -1 , -1 ) */
                            t->ci = ci->progeny[4]; t->cj = cj->progeny[3]; t->flags = 8; t->tight = 1;
                            break;

                        case 9: /* (  0 ,  1 ,  1 ) */
                            t->ci = ci->progeny[3]; t->cj = cj->progeny[0]; t->flags = 9; t->tight = 1;
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 9 , 0 , ci->progeny[7] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 0 , 0 , ci->progeny[3] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 8 , 0 , ci->progeny[7] , cj->progeny[0] , 1 );
                            break;

                        case 10: /* (  0 ,  1 ,  0 ) */
                            t->ci = ci->progeny[2]; t->cj = cj->progeny[0]; t->flags = 10; t->tight = 1;
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 11 , 0 , ci->progeny[3] , cj->progeny[0] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 7 , 0 , ci->progeny[6] , cj->progeny[0] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 6 , 0 , ci->progeny[7] , cj->progeny[0] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 9 , 0 , ci->progeny[2] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 10 , 0 , ci->progeny[3] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 8 , 0 , ci->progeny[6] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 7 , 0 , ci->progeny[7] , cj->progeny[1] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 1 , 0 , ci->progeny[2] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 2 , 0 , ci->progeny[3] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 10 , 0 , ci->progeny[6] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 11 , 0 , ci->progeny[7] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 0 , 0 , ci->progeny[2] , cj->progeny[5] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 1 , 0 , ci->progeny[3] , cj->progeny[5] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 9 , 0 , ci->progeny[6] , cj->progeny[5] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 10 , 0 , ci->progeny[7] , cj->progeny[5] , 1 );
                            break;

                        case 11: /* (  0 ,  1 , -1 ) */
                            t->ci = ci->progeny[2]; t->cj = cj->progeny[1]; t->flags = 11; t->tight = 1;
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 11 , 0 , ci->progeny[6] , cj->progeny[5] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 2 , 0 , ci->progeny[2] , cj->progeny[5] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 6 , 0 , ci->progeny[6] , cj->progeny[1] , 1 );
                            break;

                        case 12: /* (  0 ,  0 ,  1 ) */
                            t->ci = ci->progeny[1]; t->cj = cj->progeny[0]; t->flags = 12; t->tight = 1;
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 11 , 0 , ci->progeny[3] , cj->progeny[0] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 5 , 0 , ci->progeny[5] , cj->progeny[0] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 2 , 0 , ci->progeny[7] , cj->progeny[0] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 9 , 0 , ci->progeny[1] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 12 , 0 , ci->progeny[3] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 8 , 0 , ci->progeny[5] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 5 , 0 , ci->progeny[7] , cj->progeny[2] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 3 , 0 , ci->progeny[1] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 6 , 0 , ci->progeny[3] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 12 , 0 , ci->progeny[5] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 11 , 0 , ci->progeny[7] , cj->progeny[4] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 0 , 0 , ci->progeny[1] , cj->progeny[6] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 3 , 0 , ci->progeny[3] , cj->progeny[6] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 9 , 0 , ci->progeny[5] , cj->progeny[6] , 1 );
                            t = scheduler_addtask( s , task_type_pair , t->subtype , 12 , 0 , ci->progeny[7] , cj->progeny[6] , 1 );
                            break;

                        }
                        
                    }

                } /* split this task? */
                
369
370
            /* Otherwise, break it up if it is too large? */
            else if ( scheduler_doforcesplit && ci->split && cj->split &&
371
                      ( ci->count > space_maxsize / cj->count ) ) {
372
                      
373
                // message( "force splitting pair with %i and %i parts." , ci->count , cj->count );
374
375
376
377
378
379
380
381
382
383
384
385
386
387
                      
                /* Replace the current task. */
                t->type = task_type_none;
                
                for ( j = 0 ; j < 8 ; j++ )
                    if ( ci->progeny[j] != NULL )
                        for ( k = 0 ; k < 8 ; k++ )
                            if ( cj->progeny[k] != NULL ) {
                                t = scheduler_addtask( s , task_type_pair , t->subtype , 0 , 0 , ci->progeny[j] , cj->progeny[k] , 0 );
                                t->flags = space_getsid( s->space , &t->ci , &t->cj , shift );
                                }
                      
                }
                
388
389
390
391
392
393
394
395
396
397
            /* Otherwise, if not spilt, stitch-up the sorting. */
            else {
            
                /* Create the sort for ci. */
                // lock_lock( &ci->lock );
                if ( ci->sorts == NULL )
                    ci->sorts = scheduler_addtask( s , task_type_sort , 0 , 1 << sid , 0 , ci , NULL , 0 );
                else
                    ci->sorts->flags |= (1 << sid);
                // lock_unlock_blind( &ci->lock );
398
                scheduler_addunlock( s , ci->sorts , t );
399
400
401
402
403
404
405
406
                
                /* Create the sort for cj. */
                // lock_lock( &cj->lock );
                if ( cj->sorts == NULL )
                    cj->sorts = scheduler_addtask( s , task_type_sort , 0 , 1 << sid , 0 , cj , NULL , 0 );
                else
                    cj->sorts->flags |= (1 << sid);
                // lock_unlock_blind( &cj->lock );
407
                scheduler_addunlock( s , cj->sorts , t );
408
409
410
411
                
                }
                
            } /* pair interaction? */
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
            
        /* Gravity interaction? */
        else if ( t->type == task_type_grav_mm ) {
        
            /* Get a handle on the cells involved. */
            ci = t->ci;
            cj = t->cj;
            
            /* Self-interaction? */
            if ( cj == NULL ) {
            
                /* Ignore this task if the cell has no gparts. */
                if ( ci->gcount == 0 )
                    t->type = task_type_none;
            
                /* If the cell is split, recurse. */
                else if ( ci->split ) {
                
                    /* Make a single sub-task? */
                    if ( scheduler_dosub && ci->count < space_subsize/ci->count ) {
                    
                        t->type = task_type_sub;
                        t->subtype = task_subtype_grav;
                    
                        }
                        
                    /* Otherwise, just split the task. */
                    else {
                
                        /* Split this task into tasks on its progeny. */
                        t->type = task_type_none;
                        for ( j = 0 ; j < 8 ; j++ )
444
                            if ( ci->progeny[j] != NULL && ci->progeny[j]->gcount > 0 ) {
445
446
447
448
449
450
451
452
                                if ( t->type == task_type_none ) {
                                    t->type = task_type_grav_mm;
                                    t->ci = ci->progeny[j];
                                    t->cj = NULL;
                                    }
                                else
                                    t = scheduler_addtask( s , task_type_grav_mm , task_subtype_none , 0 , 0 , ci->progeny[j] , NULL , 0 );
                                for ( k = j+1 ; k < 8 ; k++ )
453
                                    if ( ci->progeny[k] != NULL && ci->progeny[k]->gcount > 0 ) {
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
                                        if ( t->type == task_type_none ) {
                                            t->type = task_type_grav_mm;
                                            t->ci = ci->progeny[j];
                                            t->cj = ci->progeny[k];
                                            }
                                        else
                                            t = scheduler_addtask( s , task_type_grav_mm , task_subtype_none , 0 , 0 , ci->progeny[j] , ci->progeny[k] , 0 );
                                        }
                                }
                        redo = ( t->type != task_type_none );
                        
                        }
                      
                    }
                    
                /* Otherwise, just make a pp task out of it. */
                else
                    t->type = task_type_grav_pp;
                
                }
                
            /* Nope, pair. */
            else {
            
                /* Make a sub-task? */
                if ( scheduler_dosub && ci->count < space_subsize/cj->count ) {
                
                    t->type = task_type_sub;
                    t->subtype = task_subtype_grav;
                
                    }
                    
                /* Otherwise, split the task. */
                else {
        
                    /* Get the opening angle theta. */
                    float dx[3], theta;
                    for ( k = 0 ; k < 3 ; k++ ) {
                        dx[k] = fabsf( ci->loc[k] - cj->loc[k] );
                        if ( s->space->periodic && dx[k] > 0.5*s->space->dim[k] )
                            dx[k] = -dx[k] + s->space->dim[k];
                        if ( dx[k] > 0.0f )
                            dx[k] -= ci->h[k];
                        }
                    theta = ( dx[0]*dx[0] + dx[1]*dx[1] + dx[2]*dx[2] ) / 
                            ( ci->h[0]*ci->h[0] + ci->h[1]*ci->h[1] + ci->h[2]*ci->h[2] );

                    /* Ignore this task if the cell has no gparts. */
                    if ( ci->gcount == 0 || cj->gcount == 0 )
                        t->type = task_type_none;

                    /* Split the interacton? */
                    else if ( theta < const_theta_max*const_theta_max ) {

                        /* Are both ci and cj split? */
                        if ( ci->split && cj->split ) {

                            /* Split this task into tasks on its progeny. */
                            t->type = task_type_none;
                            for ( j = 0 ; j < 8 ; j++ )
514
                                if ( ci->progeny[j] != NULL && ci->progeny[j]->gcount > 0 ) {
515
                                    for ( k = 0 ; k < 8 ; k++ )
516
                                        if ( cj->progeny[k] != NULL && cj->progeny[k]->gcount > 0 ) {
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
                                            if ( t->type == task_type_none ) {
                                                t->type = task_type_grav_mm;
                                                t->ci = ci->progeny[j];
                                                t->cj = cj->progeny[k];
                                                }
                                            else
                                                t = scheduler_addtask( s , task_type_grav_mm , task_subtype_none , 0 , 0 , ci->progeny[j] , cj->progeny[k] , 0 );
                                            }
                                    }
                            redo = ( t->type != task_type_none );

                            }

                        /* Otherwise, make a pp task out of it. */
                        else
                            t->type = task_type_grav_pp;

                        }
                        
                    }
                
                } /* gravity pair interaction? */
        
            } /* gravity interaction? */
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
    
        } /* loop over all tasks. */
        
    }
    
    
/**
 * @brief Add a #task to the #scheduler.
 *
 * @param s The #scheduler we are working in.
 * @param type The type of the task.
 * @param subtype The sub-type of the task.
 * @param flags The flags of the task.
 * @param wait 
 * @param ci The first cell to interact.
 * @param cj The second cell to interact.
 * @param tight
 */
 
struct task *scheduler_addtask ( struct scheduler *s , int type , int subtype , int flags , int wait , struct cell *ci , struct cell *cj , int tight ) {

    int ind;
    struct task *t;
    
    /* Get the next free task. */
    ind = atomic_inc( &s->tasks_next );
567
568
569
570
571
572
    
    /* Overflow? */
    if ( ind >= s->size )
        error( "Task list overflow." );
    
    /* Get a pointer to the new task. */
573
574
575
576
577
578
579
580
581
582
583
    t = &s->tasks[ ind ];
    
    /* Copy the data. */
    t->type = type;
    t->subtype = subtype;
    t->flags = flags;
    t->wait = wait;
    t->ci = ci;
    t->cj = cj;
    t->skip = 0;
    t->tight = tight;
584
    t->implicit = 0;
Pedro Gonnet's avatar
Pedro Gonnet committed
585
    t->weight = 0;
586
    t->rank = 0;
Pedro Gonnet's avatar
Pedro Gonnet committed
587
    t->tic = 0;
588
    t->toc = 0;
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
    t->nr_unlock_tasks = 0;
    
    /* Init the lock. */
    lock_init( &t->lock );
    
    /* Add an index for it. */
    // lock_lock( &s->lock );
    s->tasks_ind[ atomic_inc( &s->nr_tasks ) ] = ind;
    // lock_unlock_blind( &s->lock );
    
    /* Return a pointer to the new task. */
    return t;

    }



/** 
 * @brief Sort the tasks in topological order over all queues.
 *
 * @param s The #scheduler.
 */
 
void scheduler_ranktasks ( struct scheduler *s ) {

    int i, j = 0, k, temp, left = 0, rank;
    struct task *t, *tasks = s->tasks;
    int *tid = s->tasks_ind, nr_tasks = s->nr_tasks;
    
    /* Run throught the tasks and get all the waits right. */
    for ( i = 0 , k = 0 ; k < nr_tasks ; k++ ) {
        tid[k] = k;
        for ( j = 0 ; j < tasks[k].nr_unlock_tasks ; j++ )
            tasks[k].unlock_tasks[j]->wait += 1;
        }
        
    /* Main loop. */
    for ( j = 0 , rank = 0 ; left < nr_tasks ; rank++ ) {
        
        /* Load the tids of tasks with no waits. */
        for ( k = left ; k < nr_tasks ; k++ )
            if ( tasks[ tid[k] ].wait == 0 ) {
                temp = tid[j]; tid[j] = tid[k]; tid[k] = temp;
                j += 1;
                }
                
        /* Did we get anything? */
        if ( j == left )
            error( "Unsatisfiable task dependencies detected." );

        /* Unlock the next layer of tasks. */
        for ( i = left ; i < j ; i++ ) {
            t = &tasks[ tid[i] ];
            t->rank = rank;
            tid[i] = t - tasks;
            if ( tid[i] >= nr_tasks )
                error( "Task index overshoot." );
646
            /* message( "task %i of type %s has rank %i." , i , 
647
648
649
650
651
652
653
654
655
656
                (t->type == task_type_self) ? "self" : (t->type == task_type_pair) ? "pair" : "sort" , rank ); */
            for ( k = 0 ; k < t->nr_unlock_tasks ; k++ )
                t->unlock_tasks[k]->wait -= 1;
            }
            
        /* The new left (no, not tony). */
        left = j;
            
        }
        
657
658
659
660
661
    /* Verify that the tasks were ranked correctly. */
    /* for ( k = 1 ; k < s->nr_tasks ; k++ )
        if ( tasks[ tid[k-1] ].rank > tasks[ tid[k-1] ].rank )
            error( "Task ranking failed." ); */
        
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
    }


/**
 * @brief (Re)allocate the task arrays.
 *
 * @param s The #scheduler.
 * @param size The maximum number of tasks in the #scheduler.
 */
 
void scheduler_reset ( struct scheduler *s , int size ) {

    int k;

    /* Do we need to re-allocate? */
    if ( size > s->size ) {

        /* Free exising task lists if necessary. */
        if ( s->tasks != NULL )
            free( s->tasks );
        if ( s->tasks_ind != NULL )
            free( s->tasks_ind );

        /* Allocate the new lists. */
        if ( ( s->tasks = (struct task *)malloc( sizeof(struct task) * size ) ) == NULL ||
             ( s->tasks_ind = (int *)malloc( sizeof(int) * size ) ) == NULL )
            error( "Failed to allocate task lists." );
            
        }
        
692
693
694
    /* Reset the task data. */
    bzero( s->tasks , sizeof(struct task) * size );
        
695
696
697
698
699
700
701
702
703
704
705
706
707
708
    /* Reset the counters. */
    s->size = size;
    s->nr_tasks = 0;
    s->tasks_next = 0;
    s->waiting = 0;
    
    /* Set the task pointers in the queues. */
    for ( k = 0 ; k < s->nr_queues ; k++ )
        s->queues[k].tasks = s->tasks;

    }


/**
709
 * @brief Compute the task weights
710
711
712
713
 *
 * @param s The #scheduler.
 */
 
714
void scheduler_reweight ( struct scheduler *s ) {
715

716
    int k, j, nr_tasks = s->nr_tasks, *tid = s->tasks_ind;
Pedro Gonnet's avatar
Pedro Gonnet committed
717
    struct task *t, *tasks = s->tasks;
718
    int nodeID = s->nodeID;
719
720
721
    float sid_scale[13] = { 0.1897 , 0.4025 , 0.1897 , 0.4025 , 0.5788 , 0.4025 ,
                            0.1897 , 0.4025 , 0.1897 , 0.4025 , 0.5788 , 0.4025 , 
                            0.5788 };
722
    float wscale = 0.001;
723
    // ticks tic;
724
    
725
726
    /* Run throught the tasks backwards and set their waits and
       weights. */
727
    // tic = getticks();
728
    // #pragma omp parallel for schedule(static) private(t,j)
729
    for ( k = nr_tasks-1 ; k >= 0 ; k-- ) {
Pedro Gonnet's avatar
Pedro Gonnet committed
730
        t = &tasks[ tid[k] ];
Pedro Gonnet's avatar
Pedro Gonnet committed
731
        t->weight = 0;
732
        for ( j = 0 ; j < t->nr_unlock_tasks ; j++ )
Pedro Gonnet's avatar
Pedro Gonnet committed
733
734
            if ( t->unlock_tasks[j]->weight > t->weight )
                t->weight = t->unlock_tasks[j]->weight;
Pedro Gonnet's avatar
Pedro Gonnet committed
735
        if ( !t->implicit && t->tic > 0 )
Pedro Gonnet's avatar
Pedro Gonnet committed
736
            t->weight += wscale * (t->toc - t->tic);
Pedro Gonnet's avatar
Pedro Gonnet committed
737
738
739
        else
            switch ( t->type ) {
                case task_type_sort:
740
                    t->weight += wscale * __builtin_popcount( t->flags ) * t->ci->count * ( sizeof(int)*8 - __builtin_clz( t->ci->count ) );
Pedro Gonnet's avatar
Pedro Gonnet committed
741
742
                    break;
                case task_type_self:
743
                    t->weight += 1 * t->ci->count * t->ci->count;
Pedro Gonnet's avatar
Pedro Gonnet committed
744
745
                    break;
                case task_type_pair:
746
                    if ( t->ci->nodeID != nodeID || t->cj->nodeID != nodeID )
747
                        t->weight += 3 * wscale * t->ci->count * t->cj->count * sid_scale[ t->flags ];
748
                    else
749
                        t->weight += 2 * wscale * t->ci->count * t->cj->count * sid_scale[ t->flags ];
Pedro Gonnet's avatar
Pedro Gonnet committed
750
751
                    break;
                case task_type_sub:
752
                    if ( t->cj != NULL ) {
753
754
755
756
757
758
759
760
761
762
763
764
                        if ( t->ci->nodeID != nodeID || t->cj->nodeID != nodeID ) {
                            if ( t->flags < 0 )
                                t->weight += 3 * wscale * t->ci->count * t->cj->count;
                            else
                                t->weight += 3 * wscale * t->ci->count * t->cj->count * sid_scale[ t->flags ];
                            }
                        else {
                            if ( t->flags < 0 )
                                t->weight += 2 * wscale * t->ci->count * t->cj->count;
                            else
                                t->weight += 2 * wscale * t->ci->count * t->cj->count * sid_scale[ t->flags ];
                            }
765
                        }
Pedro Gonnet's avatar
Pedro Gonnet committed
766
                    else
767
                        t->weight += 1 * wscale * t->ci->count * t->ci->count;
Pedro Gonnet's avatar
Pedro Gonnet committed
768
769
770
                    break;
                case task_type_ghost:
                    if ( t->ci == t->ci->super )
771
                        t->weight += wscale * t->ci->count;
Pedro Gonnet's avatar
Pedro Gonnet committed
772
773
774
                    break;
                case task_type_kick1:
                case task_type_kick2:
775
                    t->weight += wscale * t->ci->count;
Pedro Gonnet's avatar
Pedro Gonnet committed
776
                    break;
777
778
                default:
                    break;
Pedro Gonnet's avatar
Pedro Gonnet committed
779
                }
780
        if ( t->type == task_type_send )
781
            t->weight  = INT_MAX / 8;
782
        if ( t->type == task_type_recv )
783
            t->weight *= 1.41; 
784
        }
785
    // message( "weighting tasks took %.3f ms." , (double)( getticks() - tic ) / CPU_TPS * 1000 );
786
787
788
789
790
791
792
793

    /* int min = tasks[0].weight, max = tasks[0].weight;
    for ( k = 1 ; k < nr_tasks ; k++ )
    	if ( tasks[k].weight < min )
	    min = tasks[k].weight;
	else if ( tasks[k].weight > max )
	    max = tasks[k].weight;
    message( "task weights are in [ %i , %i ]." , min , max ); */
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
        
    }


/**
 * @brief Start the scheduler, i.e. fill the queues with ready tasks.
 *
 * @param s The #scheduler.
 * @param mask The task types to enqueue.
 */
 
void scheduler_start ( struct scheduler *s , unsigned int mask ) {

    int k, j, nr_tasks = s->nr_tasks, *tid = s->tasks_ind;
    struct task *t, *tasks = s->tasks;
    // ticks tic;
    
811
    /* Run throught the tasks and set their waits. */
812
813
    // tic = getticks();
    // #pragma omp parallel for schedule(static) private(t,j)
814
    for ( k = nr_tasks - 1 ; k >= 0 ; k-- ) {
815
        t = &tasks[ tid[k] ];
816
        t->wait = 0;
817
        t->rid = -1;
818
819
820
821
822
        if ( !( (1 << t->type) & mask ) || t->skip )
            continue;
        for ( j = 0 ; j < t->nr_unlock_tasks ; j++ )
            atomic_inc( &t->unlock_tasks[j]->wait );
        }
823
    // message( "waiting tasks took %.3f ms." , (double)( getticks() - tic ) / CPU_TPS * 1000 );
824
        
825
826
827
    /* Don't enqueue link tasks directly. */
    mask &= ~(1 << task_type_link);
        
828
    /* Loop over the tasks and enqueue whoever is ready. */
829
    // tic = getticks();
830
    for ( k = 0 ; k < nr_tasks ; k++) {
831
        t = &tasks[ tid[k] ];
832
833
        if ( ( (1 << t->type) & mask ) && !t->skip ) {
            if ( t->wait == 0 ) {
834
835
836
837
838
		        scheduler_enqueue( s , t );
		        pthread_cond_broadcast( &s->sleep_cond );
		        }
	        else
	            break;
839
            }
840
        }
841
    // message( "enqueueing tasks took %.3f ms." , (double)( getticks() - tic ) / CPU_TPS * 1000 );
842
843
844
845
846
847
848
849
850
851
852
853
854
        
    }


/**
 * @brief Put a task on one of the queues.
 *
 * @param s The #scheduler.
 * @param t The #task.
 */
 
void scheduler_enqueue ( struct scheduler *s , struct task *t ) {

855
    int qid = -1;
Pedro Gonnet's avatar
Pedro Gonnet committed
856
857
858
    #ifdef WITH_MPI
        int err;
    #endif
859
860
    
    /* Ignore skipped tasks. */
861
    if ( t->skip  || atomic_cas( &t->rid , -1 , 0 ) != -1 )
862
863
        return;
        
864
    /* If this is an implicit task, just pretend it's done. */
865
866
867
868
869
870
871
    if ( t->implicit ) {
        for ( int j = 0 ; j < t->nr_unlock_tasks ; j++ ) {
            struct task *t2 = t->unlock_tasks[j];
            if ( atomic_dec( &t2->wait ) == 1 && !t2->skip )
                scheduler_enqueue( s , t2 );
            }
        }
872
        
873
874
875
    /* Otherwise, look for a suitable queue. */
    else {
        
876
877
        /* Find the previous owner for each task type, and do
           any pre-processing needed. */
878
879
880
881
882
883
884
885
886
887
        switch ( t->type ) {
            case task_type_self:
            case task_type_sort:
            case task_type_ghost:
            case task_type_kick2:
                qid = t->ci->super->owner;
                break;
            case task_type_pair:
            case task_type_sub:
                qid = t->ci->super->owner;
888
                if ( t->cj != NULL && 
889
890
891
                     ( qid < 0 || s->queues[qid].count > s->queues[t->cj->super->owner].count ) )
                    qid = t->cj->super->owner;
                break;
892
            case task_type_recv:
893
                #ifdef WITH_MPI
Pedro Gonnet's avatar
Pedro Gonnet committed
894
895
896
897
898
899
                    if ( ( err = MPI_Irecv( t->ci->parts , sizeof(struct part) * t->ci->count , MPI_BYTE , t->ci->nodeID , t->flags , MPI_COMM_WORLD , &t->req ) ) != MPI_SUCCESS ) {
                        char buff[ MPI_MAX_ERROR_STRING ];
                        int len;
                        MPI_Error_string( err , buff , &len );
                        error( "Failed to emit irecv for particle data (%s)." , buff );
                        }
900
901
                    // message( "recieving %i parts with tag=%i from %i to %i." ,
                    //     t->ci->count , t->flags , t->ci->nodeID , s->nodeID ); fflush(stdout);
902
                    qid = 1 % s->nr_queues;
903
904
905
906
                #else
                    error( "SWIFT was not compiled with MPI support." );
                #endif
                break;
907
            case task_type_send:
908
                #ifdef WITH_MPI
Pedro Gonnet's avatar
Pedro Gonnet committed
909
910
911
912
913
914
                    if ( ( err = MPI_Isend( t->ci->parts , sizeof(struct part) * t->ci->count , MPI_BYTE , t->cj->nodeID , t->flags , MPI_COMM_WORLD , &t->req ) ) != MPI_SUCCESS ) {
                        char buff[ MPI_MAX_ERROR_STRING ];
                        int len;
                        MPI_Error_string( err , buff , &len );
                        error( "Failed to emit isend for particle data (%s)." , buff );
                        }
915
916
                    // message( "sending %i parts with tag=%i from %i to %i." ,
                    //     t->ci->count , t->flags , s->nodeID , t->cj->nodeID ); fflush(stdout);
917
                    qid = 0;
918
919
920
921
                #else
                    error( "SWIFT was not compiled with MPI support." );
                #endif
                break;
922
923
924
            default:
                qid = -1;
            }
925
926
927
928
            
        if ( qid >= s->nr_queues )
            error( "Bad computed qid." );
            
929
930
931
932
933
934
935
936
937
938
939
        /* If no previous owner, find the shortest queue. */
        if ( qid < 0 )
            qid = rand() % s->nr_queues;

        /* Increase the waiting counter. */
        atomic_inc( &s->waiting );

        /* Insert the task into that queue. */
        queue_insert( &s->queues[qid] , t );
        
        }
940
941
942
943
        
    }


944
945
946
947
948
949
950
951
952
953
954
955
/**
 * @brief Take care of a tasks dependencies.
 *
 * @param s The #scheduler.
 * @param t The finished #task.
 *
 * @return A pointer to the next task, if a suitable one has
 *         been identified.
 */
 
struct task *scheduler_done ( struct scheduler *s , struct task *t ) {

956
    int k, res;
957
    struct task *t2, *next = NULL;
958
    struct cell *super = t->ci->super;
959
960
961
962
963
964
965
966
967
968
969
    
    /* Release whatever locks this task held. */
    if ( !t->implicit )
        task_unlock( t );
        
    /* Loop through the dependencies and add them to a queue if
       they are ready. */
    for ( k = 0 ; k < t->nr_unlock_tasks ; k++ ) {
        t2 = t->unlock_tasks[k];
        if ( ( res = atomic_dec( &t2->wait ) ) < 1 )
            error( "Negative wait!" );
970
        if ( res == 1 && !t2->skip ) {
971
            if ( 0 && !t2->implicit &&
972
                 t2->ci->super == super &&
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
                 ( next == NULL || t2->weight > next->weight ) &&
                 task_lock( t2 ) ) {
                if ( next != NULL ) {
                    task_unlock( next );
                    scheduler_enqueue( s , next );
                    }
                next = t2;
                }
            else
                scheduler_enqueue( s , t2 );
            }
        }
        
    /* Task definitely done. */
    if ( !t->implicit ) {
        t->toc = getticks();
989
        pthread_mutex_lock( &s->sleep_mutex );
990
991
        if ( next == NULL )
            atomic_dec( &s->waiting );
992
993
        pthread_cond_broadcast( &s->sleep_cond );
        pthread_mutex_unlock( &s->sleep_mutex );
994
        }
995

996
997
998
999
    /* Start the clock on the follow-up task. */
    if ( next != NULL )
        next->tic = getticks();
        
1000
1001
1002
1003
1004
1005
    /* Return the next best task. */
    return next;

    }


1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
/**
 * @brief Resolve a single dependency by hand.
 *
 * @param s The #scheduler.
 * @param t The dependent #task.
 *
 * @return A pointer to the next task, if a suitable one has
 *         been identified.
 */
 
struct task *scheduler_unlock ( struct scheduler *s , struct task *t ) {

    int k, res;
    struct task *t2, *next = NULL;
    
    /* Loop through the dependencies and add them to a queue if
       they are ready. */
    for ( k = 0 ; k < t->nr_unlock_tasks ; k++ ) {
        t2 = t->unlock_tasks[k];
        if ( ( res = atomic_dec( &t2->wait ) ) < 1 )
            error( "Negative wait!" );
        if ( res == 1 && !t2->skip )
            scheduler_enqueue( s , t2 );
        }
        
    /* Task definitely done. */
    if ( !t->implicit ) {
        t->toc = getticks();
        pthread_mutex_lock( &s->sleep_mutex );
        if ( next == NULL )
            atomic_dec( &s->waiting );
        pthread_cond_broadcast( &s->sleep_cond );
        pthread_mutex_unlock( &s->sleep_mutex );
        }

    /* Start the clock on the follow-up task. */
    if ( next != NULL )
        next->tic = getticks();
        
    /* Return the next best task. */
    return next;

    }


1051
1052
1053
1054
1055
/**
 * @brief Get a task, preferably from the given queue.
 *
 * @param s The #scheduler.
 * @param qid The ID of the prefered #queue.
1056
 * @param super the super-cell
1057
1058
1059
1060
 *
 * @return A pointer to a #task or @c NULL if there are no available tasks.
 */
 
1061
struct task *scheduler_gettask ( struct scheduler *s , int qid , struct cell *super ) {
1062

1063
    struct task *res = NULL;
1064
    int k, nr_queues = s->nr_queues;
1065
    unsigned int seed = qid;
Pedro Gonnet's avatar
Pedro Gonnet committed
1066
    
1067
1068
    /* Check qid. */
    if ( qid >= nr_queues || qid < 0 )
1069
	    error( "Bad queue ID." );
1070

1071
    /* Loop as long as there are tasks... */
1072
    while ( s->waiting > 0 && res == NULL ) {
1073
        
1074
        /* Try more than once before sleeping. */
Pedro Gonnet's avatar
Pedro Gonnet committed
1075
        for ( int tries = 0 ; res == NULL && s->waiting && tries < scheduler_maxtries ; tries++ ) {
1076
1077
        
            /* Try to get a task from the suggested queue. */
1078
1079
1080
1081
1082
1083
1084
            if ( s->queues[qid].count > 0 ) {
                TIMER_TIC
                res = queue_gettask( &s->queues[qid] , super , 0 );
                TIMER_TOC( timer_qget );
                if ( res != NULL )
                    break;
                }
1085

1086
1087
            /* If unsucessful, try stealing from the other queues. */
            if ( s->flags & scheduler_flag_steal ) {
Pedro Gonnet's avatar
Pedro Gonnet committed
1088
                int count = 0, qids[ nr_queues ];
1089
                for ( k = 0 ; k < nr_queues ; k++ )
Pedro Gonnet's avatar
Pedro Gonnet committed
1090
1091
                    if ( s->queues[k].count > 0 )
                        qids[ count++ ] = k;
1092
                for ( k = 0 ; k < scheduler_maxsteal && count > 0 ; k++ ) {
1093
                    int ind = rand_r( &seed ) % count;
1094
1095
1096
1097
                    TIMER_TIC
                    res = queue_gettask( &s->queues[ qids[ ind ] ] , super , 0 );
                    TIMER_TOC( timer_qsteal );
                    if ( res != NULL )
1098
1099
1100
1101
1102
                        break;
                    else 
                        qids[ ind ] = qids[ --count ];
                    }
                if ( res != NULL )
1103
                    break;
1104
                }
1105
                
1106
            }
1107

1108
        /* If we failed, take a short nap. */
1109
1110
1111
        #ifdef WITH_MPI
	    if ( res == NULL && qid > 1 ) {
	#else
1112
            if ( res == NULL ) {
1113
	#endif
1114
1115
1116
1117
1118
                pthread_mutex_lock( &s->sleep_mutex );
                if ( s->waiting > 0 )
                    pthread_cond_wait( &s->sleep_cond , &s->sleep_mutex );
                pthread_mutex_unlock( &s->sleep_mutex );
                }
1119

1120
        }
1121
        
1122
1123
1124
1125
    /* Start the timer on this task, if we got one. */
    if ( res != NULL ) {
        res->tic = getticks();
        res->rid = qid;
1126
1127
1128
        }
        
    /* No milk today. */
1129
    return res;
1130
1131
1132
1133
1134
1135
1136
1137

    }


/**
 * @brief Initialize the #scheduler.
 *
 * @param s The #scheduler.
1138
 * @param space The #space we are working with
1139
1140
 * @param nr_queues The number of queues in this scheduler.
 * @param flags The #scheduler flags.
1141
 * @param nodeID The MPI rank
1142
1143
 */
 
1144
void scheduler_init ( struct scheduler *s , struct space *space , int nr_queues , unsigned int flags , int nodeID ) {
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
    
    int k;
    
    /* Init the lock. */
    lock_init( &s->lock );

    /* Allocate the queues. */
    if ( ( s->queues = (struct queue *)malloc( sizeof(struct queue) * nr_queues ) ) == NULL )
        error( "Failed to allocate queues." );
        
    /* Initialize each queue. */
    for ( k = 0 ; k < nr_queues ; k++ )
        queue_init( &s->queues[k] , NULL );
        
1159
1160
1161
1162
1163
    /* Init the sleep mutex and cond. */
    if ( pthread_cond_init( &s->sleep_cond , NULL ) != 0 ||
         pthread_mutex_init( &s->sleep_mutex , NULL ) != 0 )
        error( "Failed to initialize sleep barrier." );
        
1164
1165
1166
1167
    /* Set the scheduler variables. */
    s->nr_queues = nr_queues;
    s->flags = flags;
    s->space = space;
1168
    s->nodeID = nodeID;
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
    
    /* Init other values. */
    s->tasks = NULL;
    s->tasks_ind = NULL;
    s->waiting = 0;
    s->size = 0;
    s->nr_tasks = 0;
    s->tasks_next = 0;

    }