queue.c 9.46 KB
Newer Older
1
/*******************************************************************************
2
 * This file is part of SWIFT.
3
 * Copyright (c) 2012 Pedro Gonnet (pedro.gonnet@durham.ac.uk)
4
 *
5
6
7
8
 * This program is free software: you can redistribute it and/or modify
 * it under the terms of the GNU Lesser General Public License as published
 * by the Free Software Foundation, either version 3 of the License, or
 * (at your option) any later version.
9
 *
10
11
12
13
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
14
 *
15
16
 * You should have received a copy of the GNU Lesser General Public License
 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
 *
18
19
20
21
22
23
24
25
26
27
 ******************************************************************************/

/* Config parameters. */
#include "../config.h"

/* Some standard headers. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

28
29
/* MPI headers. */
#ifdef WITH_MPI
30
#include <mpi.h>
31
32
#endif

33
34
35
/* This object's header. */
#include "queue.h"

36
/* Local headers. */
37
#include "atomic.h"
38
#include "error.h"
39
#include "memswap.h"
40

41
42
43
44
45
46
47
48
49
50
51
/**
 * @brief Push the task at the given index up the heap until it is either at the
 * top or smaller than its parent.
 *
 * @param q The task #queue.
 * @param ind The index of the task to be sifted-down in the queue.
 *
 * @return The new index of the entry.
 */
int queue_bubble_up(struct queue *q, int ind) {
  /* Set some pointers we will use often. */
52
53
  struct queue_entry *entries = q->entries;
  const float w = entries[ind].weight;
54
55
56
57
58

  /* While we are not yet at the top of the heap... */
  while (ind > 0) {
    /* Check if the parent is larger and bail if not.. */
    const int parent = (ind - 1) / 2;
59
    if (w < entries[parent].weight) break;
60
61

    /* Parent is not larger, so swap. */
62
    memswap(&entries[ind], &entries[parent], sizeof(struct queue_entry));
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
    ind = parent;
  }

  return ind;
}

/**
 * @brief Push the task at the given index down the heap until both its children
 * have a smaller weight.
 *
 * @param q The task #queue.
 * @param ind The index of the task to be sifted-down in the queue.
 *
 * @return The new index of the entry.
 */
int queue_sift_down(struct queue *q, int ind) {
  /* Set some pointers we will use often. */
80
  struct queue_entry *entries = q->entries;
81
  const int qcount = q->count;
82
  const float w = entries[ind].weight;
83
84
85
86
87
88
89
90

  /* While we still have at least one child... */
  while (1) {
    /* Check if we still have children. */
    int child = 2 * ind + 1;
    if (child >= qcount) break;

    /* Which of both children is the largest? */
91
    if (child + 1 < qcount && entries[child + 1].weight > entries[child].weight)
92
93
94
      child += 1;

    /* Do we want to swap with the largest child? */
95
96
    if (entries[child].weight > w) {
      memswap(&entries[ind], &entries[child], sizeof(struct queue_entry));
97
98
99
100
101
102
103
104
      ind = child;
    } else
      break;
  }

  return ind;
}

105
106
107
108
109
110
111
/**
 * @brief Enqueue all tasks in the incoming DEQ.
 *
 * @param q The #queue, assumed to be locked.
 */
void queue_get_incoming(struct queue *q) {

112
  struct queue_entry *entries = q->entries;
Matthieu Schaller's avatar
Matthieu Schaller committed
113

114
115
  /* Loop over the incoming DEQ. */
  while (1) {
Matthieu Schaller's avatar
Matthieu Schaller committed
116

117
118
119
    /* Is there a next element? */
    const int ind = q->first_incoming % queue_incoming_size;
    if (q->tid_incoming[ind] < 0) break;
Matthieu Schaller's avatar
Matthieu Schaller committed
120

121
122
123
124
125
126
    /* Get the next offset off the DEQ. */
    const int offset = atomic_swap(&q->tid_incoming[ind], -1);
    atomic_inc(&q->first_incoming);

    /* Does the queue need to be grown? */
    if (q->count == q->size) {
127
      struct queue_entry *temp;
128
      q->size *= queue_sizegrow;
129
130
      if ((temp = (struct queue_entry *)malloc(sizeof(struct queue_entry) *
                                               q->size)) == NULL)
131
        error("Failed to allocate new indices.");
132
133
134
      memcpy(temp, entries, sizeof(struct queue_entry) * q->count);
      free(entries);
      q->entries = entries = temp;
135
136
137
    }

    /* Drop the task at the end of the queue. */
138
139
    entries[q->count].tid = offset;
    entries[q->count].weight = q->tasks[offset].weight;
140
    q->count += 1;
141
    atomic_dec(&q->count_incoming);
142

143
144
    /* Re-heap by bubbling up the new (last) element. */
    queue_bubble_up(q, q->count - 1);
145

146
#ifdef SWIFT_DEBUG_CHECK
147
    /* Check the queue's consistency. */
148
    for (int k = 1; k < q->count; k++)
149
      if (entries[(k - 1) / 2].weight < entries[k].weight)
150
151
        error("Queue heap is disordered.");
#endif
152
153
154
  }
}

Pedro Gonnet's avatar
Pedro Gonnet committed
155
156
157
158
159
160
/**
 * @brief Insert a used tasks into the given queue.
 *
 * @param q The #queue.
 * @param t The #task.
 */
161
void queue_insert(struct queue *q, struct task *t) {
162
163
  /* Get an index in the DEQ. */
  const int ind = atomic_inc(&q->last_incoming) % queue_incoming_size;
Matthieu Schaller's avatar
Matthieu Schaller committed
164

165
166
  /* Spin until the new offset can be stored. */
  while (atomic_cas(&q->tid_incoming[ind], -1, t - q->tasks) != -1) {
Matthieu Schaller's avatar
Matthieu Schaller committed
167

168
169
170
    /* Try to get the queue lock, non-blocking, ensures that at
       least somebody is working on this queue. */
    if (lock_trylock(&q->lock) == 0) {
Matthieu Schaller's avatar
Matthieu Schaller committed
171

172
      /* Clean up the incoming DEQ. */
173
      queue_get_incoming(q);
Matthieu Schaller's avatar
Matthieu Schaller committed
174

175
176
177
178
      /* Release the queue lock. */
      if (lock_unlock(&q->lock) != 0) {
        error("Unlocking the qlock failed.\n");
      }
179
    }
180
  }
Matthieu Schaller's avatar
Matthieu Schaller committed
181

182
183
  /* Increase the incoming count. */
  atomic_inc(&q->count_incoming);
184
}
Pedro Gonnet's avatar
Pedro Gonnet committed
185

186
/**
187
188
189
190
191
 * @brief Initialize the given queue.
 *
 * @param q The #queue.
 * @param tasks List of tasks to which the queue indices refer to.
 */
192
193
194
195
void queue_init(struct queue *q, struct task *tasks) {

  /* Allocate the task list if needed. */
  q->size = queue_sizeinit;
196
197
198
  if ((q->entries = (struct queue_entry *)malloc(sizeof(struct queue_entry) *
                                                 q->size)) == NULL)
    error("Failed to allocate queue entries.");
199
200
201
202
203
204
205
206
207

  /* Set the tasks pointer. */
  q->tasks = tasks;

  /* Init counters. */
  q->count = 0;

  /* Init the queue lock. */
  if (lock_init(&q->lock) != 0) error("Failed to init queue lock.");
Matthieu Schaller's avatar
Matthieu Schaller committed
208

209
  /* Init the incoming DEQ. */
Matthieu Schaller's avatar
Matthieu Schaller committed
210
211
  if ((q->tid_incoming = (int *)malloc(sizeof(int) * queue_incoming_size)) ==
      NULL)
212
213
214
215
216
217
    error("Failed to allocate queue incoming buffer.");
  for (int k = 0; k < queue_incoming_size; k++) {
    q->tid_incoming[k] = -1;
  }
  q->first_incoming = 0;
  q->last_incoming = 0;
218
  q->count_incoming = 0;
219
220
}

221
222
223
224
/**
 * @brief Get a task free of dependencies and conflicts.
 *
 * @param q The task #queue.
225
 * @param prev The previous #task extracted from this #queue.
226
227
 * @param blocking Block until access to the queue is granted.
 */
228
229
struct task *queue_gettask(struct queue *q, const struct task *prev,
                           int blocking) {
230

231
  swift_lock_type *qlock = &q->lock;
232
  struct task *res = NULL;
233
234
235
236
237
238
239

  /* Grab the task lock. */
  if (blocking) {
    if (lock_lock(qlock) != 0) error("Locking the qlock failed.\n");
  } else {
    if (lock_trylock(qlock) != 0) return NULL;
  }
Matthieu Schaller's avatar
Matthieu Schaller committed
240

241
242
  /* Fill any tasks from the incoming DEQ. */
  queue_get_incoming(q);
243

244
245
  /* If there are no tasks, leave immediately. */
  if (q->count == 0) {
246
    lock_unlock_blind(qlock);
247
248
249
    return NULL;
  }

250
  /* Set some pointers we will use often. */
251
  struct queue_entry *entries = q->entries;
252
  struct task *qtasks = q->tasks;
253
  const int old_qcount = q->count;
254
255

  /* Loop over the queue entries. */
256
257
  int ind;
  for (ind = 0; ind < old_qcount; ind++) {
258

259
260
261
262
263
264
265
266
267
268
269
    /* Try to lock the next task. */
    if (task_lock(&qtasks[entries[ind].tid])) break;

    /* Should we de-prioritize this task? */
    if ((1ULL << qtasks[entries[ind].tid].type) &
        queue_lock_fail_reweight_mask) {
      /* Scale the task's weight. */
      entries[ind].weight *= queue_lock_fail_reweight_factor;

      /* Send it down the binary heap. */
      if (queue_sift_down(q, ind) != ind) ind -= 1;
270
    }
271
272
273
  }

  /* Did we get a task? */
274
  if (ind < old_qcount) {
275
276

    /* Another one bites the dust. */
277
    const int qcount = q->count -= 1;
278

Pedro Gonnet's avatar
Pedro Gonnet committed
279
    /* Get a pointer on the task that we want to return. */
280
    res = &qtasks[entries[ind].tid];
281
282

    /* Swap this task with the last task and re-heap. */
283
    if (ind < qcount) {
284
      entries[ind] = entries[qcount];
285
286
      ind = queue_bubble_up(q, ind);
      ind = queue_sift_down(q, ind);
287
288
    }

289
290
291
  } else
    res = NULL;

292
#ifdef SWIFT_DEBUG_CHECKS
293
  /* Check the queue's consistency. */
294
  for (int k = 1; k < q->count; k++)
295
    if (entries[(k - 1) / 2].weight < entries[k].weight)
296
297
      error("Queue heap is disordered.");
#endif
298
299
300

  /* Release the task lock. */
  if (lock_unlock(qlock) != 0) error("Unlocking the qlock failed.\n");
301

302
303
304
  /* Take the money and run. */
  return res;
}
305
306
307

void queue_clean(struct queue *q) {

308
  free(q->entries);
309
310
  free(q->tid_incoming);
}
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331

/**
 * @brief Dump a formatted list of tasks in the queue to the given file stream.
 *
 * @param nodeID the node id of this rank.
 * @param index a number for this queue, added to the output.
 * @param file the FILE stream, should opened for write.
 * @param q The task #queue.
 */
void queue_dump(int nodeID, int index, FILE *file, struct queue *q) {

  swift_lock_type *qlock = &q->lock;

  /* Grab the queue lock. */
  if (lock_lock(qlock) != 0) error("Locking the qlock failed.\n");

  /* Fill any tasks from the incoming DEQ. */
  queue_get_incoming(q);

  /* Loop over the queue entries. */
  for (int k = 0; k < q->count; k++) {
332
    struct task *t = &q->tasks[q->entries[k].tid];
333

Peter W. Draper's avatar
Peter W. Draper committed
334
335
    fprintf(file, "%d %d %d %s %s %.2f\n", nodeID, index, k,
            taskID_names[t->type], subtaskID_names[t->subtype], t->weight);
336
337
338
339
340
  }

  /* Release the task lock. */
  if (lock_unlock(qlock) != 0) error("Unlocking the qlock failed.\n");
}