queue.c 8.29 KB
Newer Older
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *
5
6
7
8
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
9
 *
10
11
12
13
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
14
 *
15
16
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
 *
18
19
20
21
22
23
24
25
26
27
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

28
29
/* MPI headers. */
#ifdef WITH_MPI
30
#include <mpi.h>
31
32
#endif

33
34
35
/* This object's header. */
#include "queue.h"

36
/* Local headers. */
37
#include "atomic.h"
38
#include "const.h"
39
#include "error.h"
40

41
42
43
44
45
46
47
48
49
/**
 * @brief Enqueue all tasks in the incoming DEQ.
 *
 * @param q The #queue, assumed to be locked.
 */
void queue_get_incoming(struct queue *q) {

  int *tid = q->tid;
  struct task *tasks = q->tasks;
Matthieu Schaller's avatar
Matthieu Schaller committed
50

51
52
  /* Loop over the incoming DEQ. */
  while (1) {
Matthieu Schaller's avatar
Matthieu Schaller committed
53

54
55
56
    /* Is there a next element? */
    const int ind = q->first_incoming % queue_incoming_size;
    if (q->tid_incoming[ind] < 0) break;
Matthieu Schaller's avatar
Matthieu Schaller committed
57

58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
    /* Get the next offset off the DEQ. */
    const int offset = atomic_swap(&q->tid_incoming[ind], -1);
    atomic_inc(&q->first_incoming);

    /* Does the queue need to be grown? */
    if (q->count == q->size) {
      int *temp;
      q->size *= queue_sizegrow;
      if ((temp = (int *)malloc(sizeof(int) * q->size)) == NULL)
        error("Failed to allocate new indices.");
      memcpy(temp, tid, sizeof(int) * q->count);
      free(tid);
      q->tid = tid = temp;
    }

    /* Drop the task at the end of the queue. */
    tid[q->count] = offset;
    q->count += 1;
76
    atomic_dec(&q->count_incoming);
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

    /* Shuffle up. */
    for (int k = q->count - 1; k > 0; k = (k - 1) / 2)
      if (tasks[tid[k]].weight > tasks[tid[(k - 1) / 2]].weight) {
        int temp = tid[k];
        tid[k] = tid[(k - 1) / 2];
        tid[(k - 1) / 2] = temp;
      } else
        break;

    /* Check the queue's consistency. */
    /* for (int k = 1; k < q->count; k++)
        if ( tasks[ tid[(k-1)/2] ].weight < tasks[ tid[k] ].weight )
            error( "Queue heap is disordered." ); */
  }
}

Pedro Gonnet's avatar
Pedro Gonnet committed
94
95
96
97
98
99
/**
 * @brief Insert a used tasks into the given queue.
 *
 * @param q The #queue.
 * @param t The #task.
 */
100
void queue_insert(struct queue *q, struct task *t) {
101
102
  /* Get an index in the DEQ. */
  const int ind = atomic_inc(&q->last_incoming) % queue_incoming_size;
Matthieu Schaller's avatar
Matthieu Schaller committed
103

104
105
  /* Spin until the new offset can be stored. */
  while (atomic_cas(&q->tid_incoming[ind], -1, t - q->tasks) != -1) {
Matthieu Schaller's avatar
Matthieu Schaller committed
106

107
108
109
    /* Try to get the queue lock, non-blocking, ensures that at
       least somebody is working on this queue. */
    if (lock_trylock(&q->lock) == 0) {
Matthieu Schaller's avatar
Matthieu Schaller committed
110

111
      /* Clean up the incoming DEQ. */
112
      queue_get_incoming(q);
Matthieu Schaller's avatar
Matthieu Schaller committed
113

114
115
116
117
      /* Release the queue lock. */
      if (lock_unlock(&q->lock) != 0) {
        error("Unlocking the qlock failed.\n");
      }
118
    }
119
  }
Matthieu Schaller's avatar
Matthieu Schaller committed
120

121
122
  /* Increase the incoming count. */
  atomic_inc(&q->count_incoming);
123
}
Pedro Gonnet's avatar
Pedro Gonnet committed
124

125
/**
126
127
128
129
130
 * @brief Initialize the given queue.
 *
 * @param q The #queue.
 * @param tasks List of tasks to which the queue indices refer to.
 */
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
void queue_init(struct queue *q, struct task *tasks) {

  /* Allocate the task list if needed. */
  q->size = queue_sizeinit;
  if ((q->tid = (int *)malloc(sizeof(int) * q->size)) == NULL)
    error("Failed to allocate queue tids.");

  /* Set the tasks pointer. */
  q->tasks = tasks;

  /* Init counters. */
  q->count = 0;

  /* Init the queue lock. */
  if (lock_init(&q->lock) != 0) error("Failed to init queue lock.");
Matthieu Schaller's avatar
Matthieu Schaller committed
146

147
  /* Init the incoming DEQ. */
Matthieu Schaller's avatar
Matthieu Schaller committed
148
149
  if ((q->tid_incoming = (int *)malloc(sizeof(int) * queue_incoming_size)) ==
      NULL)
150
151
152
153
154
155
    error("Failed to allocate queue incoming buffer.");
  for (int k = 0; k < queue_incoming_size; k++) {
    q->tid_incoming[k] = -1;
  }
  q->first_incoming = 0;
  q->last_incoming = 0;
156
  q->count_incoming = 0;
157
158
}

159
160
161
162
/**
 * @brief Get a task free of dependencies and conflicts.
 *
 * @param q The task #queue.
163
 * @param prev The previous #task extracted from this #queue.
164
165
 * @param blocking Block until access to the queue is granted.
 */
166
167
struct task *queue_gettask(struct queue *q, const struct task *prev,
                           int blocking) {
168

169
  swift_lock_type *qlock = &q->lock;
170
  struct task *res = NULL;
171
172
173
174
175
176
177

  /* Grab the task lock. */
  if (blocking) {
    if (lock_lock(qlock) != 0) error("Locking the qlock failed.\n");
  } else {
    if (lock_trylock(qlock) != 0) return NULL;
  }
Matthieu Schaller's avatar
Matthieu Schaller committed
178

179
180
  /* Fill any tasks from the incoming DEQ. */
  queue_get_incoming(q);
181

182
183
  /* If there are no tasks, leave immediately. */
  if (q->count == 0) {
184
    lock_unlock_blind(qlock);
185
186
187
    return NULL;
  }

188
  /* Set some pointers we will use often. */
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
  int *qtid = q->tid;
  struct task *qtasks = q->tasks;
  const int qcount = q->count;

  /* Data for the sliding window in which to try the task with the
     best overlap with the previous task. */
  struct {
    int ind, tid;
    float score;
  } window[queue_search_window];
  int window_count = 0;
  int tid = -1;
  int ind = -1;

  /* Loop over the queue entries. */
  for (int k = 0; k < qcount; k++) {
    if (k < queue_search_window) {
      window[window_count].ind = k;
      window[window_count].tid = qtid[k];
      window[window_count].score = task_overlap(prev, &qtasks[qtid[k]]);
      window_count += 1;
    } else {
      /* Find the task with the largest overlap. */
      int ind_max = 0;
      for (int i = 1; i < window_count; i++)
        if (window[i].score > window[ind_max].score) ind_max = i;

      /* Try to lock that task. */
      if (task_lock(&qtasks[window[ind_max].tid])) {
        tid = window[ind_max].tid;
        ind = window[ind_max].ind;
        // message("best task has overlap %f.", window[ind_max].score);
221
222
        break;

223
224
225
226
227
228
229
        /* Otherwise, replace it with a new one from the queue. */
      } else {
        window[ind_max].ind = k;
        window[ind_max].tid = qtid[k];
        window[ind_max].score = task_overlap(prev, &qtasks[qtid[k]]);
      }
    }
230
231
  }

232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
  /* If we didn't get a task, loop through whatever is left in the window. */
  if (tid < 0) {
    while (window_count > 0) {
      int ind_max = 0;
      for (int i = 1; i < window_count; i++)
        if (window[i].score > window[ind_max].score) ind_max = i;
      if (task_lock(&qtasks[window[ind_max].tid])) {
        tid = window[ind_max].tid;
        ind = window[ind_max].ind;
        // message("best task has overlap %f.", window[ind_max].score);
        break;
      } else {
        window_count -= 1;
        window[ind_max] = window[window_count];
      }
    }
248
249
250
  }

  /* Did we get a task? */
251
  if (ind >= 0) {
252
253

    /* Another one bites the dust. */
254
    const int qcount = q->count -= 1;
255

Pedro Gonnet's avatar
Pedro Gonnet committed
256
257
    /* Get a pointer on the task that we want to return. */
    res = &qtasks[tid];
258
259

    /* Swap this task with the last task and re-heap. */
260
    int k = ind;
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
    if (k < qcount) {
      qtid[k] = qtid[qcount];
      int w = qtasks[qtid[k]].weight;
      while (k > 0 && w > qtasks[qtid[(k - 1) / 2]].weight) {
        int temp = q->tid[k];
        q->tid[k] = q->tid[(k - 1) / 2];
        q->tid[(k - 1) / 2] = temp;
        k = (k - 1) / 2;
      }
      int i;
      while ((i = 2 * k + 1) < qcount) {
        if (i + 1 < qcount &&
            qtasks[qtid[i + 1]].weight > qtasks[qtid[i]].weight)
          i += 1;
        if (qtasks[qtid[i]].weight > w) {
          int temp = qtid[i];
          qtid[i] = qtid[k];
          qtid[k] = temp;
          k = i;
        } else
          break;
      }
283
284
    }

285
286
287
288
289
290
291
292
293
294
  } else
    res = NULL;

  /* Check the queue's consistency. */
  /* for ( k = 1 ; k < q->count ; k++ )
      if ( qtasks[ qtid[(k-1)/2] ].weight < qtasks[ qtid[k] ].weight )
          error( "Queue heap is disordered." ); */

  /* Release the task lock. */
  if (lock_unlock(qlock) != 0) error("Unlocking the qlock failed.\n");
295

296
297
298
  /* Take the money and run. */
  return res;
}