XRootD
Loading...
Searching...
No Matches
XrdFfsQueue.cc
Go to the documentation of this file.
1/******************************************************************************/
2/* XrdFfsQueue.cc functions to run independent tasks in queue */
3/* */
4/* (c) 2010 by the Board of Trustees of the Leland Stanford, Jr., University */
5/* All Rights Reserved */
6/* Author: Wei Yang (SLAC National Accelerator Laboratory, 2009) */
7/* Contract DE-AC02-76-SFO0515 with the Department of Energy */
8/* */
9/* This file is part of the XRootD software suite. */
10/* */
11/* XRootD is free software: you can redistribute it and/or modify it under */
12/* the terms of the GNU Lesser General Public License as published by the */
13/* Free Software Foundation, either version 3 of the License, or (at your */
14/* option) any later version. */
15/* */
16/* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17/* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18/* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19/* License for more details. */
20/* */
21/* You should have received a copy of the GNU Lesser General Public License */
22/* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23/* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24/* */
25/* The copyright holder's institutional names and contributor's names may not */
26/* be used to endorse or promote products derived from this software without */
27/* specific prior written permission of the institution or contributor. */
28/******************************************************************************/
29
30#include "XrdFfs/XrdFfsQueue.hh"
31#include <cstdlib>
32
33/* queue operation */
34
35#ifdef __cplusplus
36 extern "C" {
37#endif
38
41unsigned int XrdFfsQueueNext_task_id = 0;
42pthread_mutex_t XrdFfsQueueTaskque_mutex = PTHREAD_MUTEX_INITIALIZER;
43pthread_cond_t XrdFfsQueueTaskque_cond = PTHREAD_COND_INITIALIZER;
44
46{
47 pthread_mutex_lock(&XrdFfsQueueTaskque_mutex);
48
49 task->id = XrdFfsQueueNext_task_id + 1;
51 if (XrdFfsQueueTaskque_tail == NULL)
52 {
55 task->next = NULL;
56 pthread_cond_broadcast(&XrdFfsQueueTaskque_cond);
57 }
58 else
59 {
61 task->next = NULL;
64 }
65
66 pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
67 return;
68}
69
71{
72 struct XrdFfsQueueTasks *head;
73 while (pthread_mutex_lock(&XrdFfsQueueTaskque_mutex) == 0)
74 if (XrdFfsQueueTaskque_head == NULL)
75 {
77 pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
78 }
79 else
80 break;
81
84
85 head->next = NULL;
86 head->prev = NULL;
87
88 if (XrdFfsQueueTaskque_head == NULL)
90
91 pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
92 return head;
93}
94
95/* create, wait and free(delete) a task */
96
97struct XrdFfsQueueTasks* XrdFfsQueue_create_task(void* (*func)(void*), void **args, short initstat)
98{
99 struct XrdFfsQueueTasks *task = (struct XrdFfsQueueTasks*) malloc(sizeof(struct XrdFfsQueueTasks));
100 task->func = func;
101 task->args = args;
102 task->done = ( (initstat == -1)? -1 : 0); /* -1 means this task is meant to kill a worker thread */
103
104 pthread_mutex_init(&task->mutex, NULL);
105 pthread_cond_init(&task->cond, NULL);
106
108 return task;
109}
110
112{
113 pthread_mutex_destroy(&task->mutex);
114 pthread_cond_destroy(&task->cond);
115 task->func = NULL;
116 task->args = NULL;
117 task->next = NULL;
118 task->prev = NULL;
119 free(task);
120 task = NULL;
121}
122
124{
125 pthread_mutex_lock(&task->mutex);
126 if (task->done != 1)
127 pthread_cond_wait(&task->cond, &task->mutex);
128 pthread_mutex_unlock(&task->mutex);
129}
130
132{
133 unsigned int que_len = 0;
134 pthread_mutex_lock(&XrdFfsQueueTaskque_mutex);
135 if (XrdFfsQueueTaskque_head != NULL && XrdFfsQueueTaskque_tail != NULL) {
138 else
139// this is wrong:
140// que_len = (unsigned int)2147483647 - (XrdFfsQueueTaskque_head->id - XrdFfsQueueTaskque_tail->id) + 1;
141
142//not accepted by c89
143// que_len = (unsigned int)4294967295 - (XrdFfsQueueTaskque_head->id - XrdFfsQueueTaskque_tail->id) + 1;
144
145//this is not quite correct, but I imagine that the queue will never by so long >= 2147483647
146 que_len = (unsigned int)2147483647 - (XrdFfsQueueTaskque_head->id - XrdFfsQueueTaskque_tail->id) + 1+(unsigned int)2147483647+1;
147 }
148 pthread_mutex_unlock(&XrdFfsQueueTaskque_mutex);
149 return que_len;
150}
151
152/* workers */
153
154void *XrdFfsQueue_worker(void* x)
155{
156 struct XrdFfsQueueTasks *task;
157 short quit = 0;
158
159 loop:
160 task = XrdFfsQueue_dequeue();
161
162 if (task->done == -1) // terminate this worker thread
163 quit = 1;
164
165 pthread_mutex_lock(&task->mutex);
166#ifdef QUEDEBUG
167 printf("worker %d on task %d\n", wid, task->id);
168#endif
169 if (!quit)
170 (task->func)(task->args);
171
172 task->done = 1;
173 pthread_cond_signal(&task->cond);
174 pthread_mutex_unlock(&task->mutex);
175 if (quit)
176 {
177#ifdef QUEDEBUG
178 printf("worker %d is leaving\n", wid);
179#endif
180 free(x);
181// pthread_exit(NULL);
182 return(NULL);
183 }
184 else
185 goto loop;
186}
187
189unsigned short XrdFfsQueueNworkers = 0;
190unsigned int XrdFfsQueueWorker_id = 0;
191
193{
194 int i, rc, *id;
195 pthread_t *thread;
196 pthread_attr_t attr;
197 size_t stacksize = 2*1024*1024;
198
199 pthread_attr_init(&attr);
200 pthread_attr_setstacksize(&attr, stacksize);
201 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
202
203 pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
204 for (i = 0; i < n; i++)
205 {
206 id = (int*) malloc(sizeof(int));
207 *id = XrdFfsQueueWorker_id++;
208 thread = (pthread_t*) malloc(sizeof(pthread_t));
209 if (thread == NULL)
210 {
212 break;
213 }
214 rc = pthread_create(thread, &attr, XrdFfsQueue_worker, id);
215 if (rc != 0)
216 {
218 break;
219 }
220 pthread_detach(*thread);
221 free(thread);
222 }
223 pthread_attr_destroy(&attr);
225 pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
226 return i;
227}
228
230{
231 int i;
232 struct XrdFfsQueueTasks *task;
233
234 pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
235 if (XrdFfsQueueNworkers == 0)
236 n = 0;
237 else if (n > XrdFfsQueueNworkers)
238 {
241 }
242 else
244 for (i = 0; i < n; i++)
245 {
246 task = XrdFfsQueue_create_task(NULL, NULL, -1);
249 }
250 pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
251 return n;
252}
253
255{
256 int i;
257 pthread_mutex_lock(&XrdFfsQueueWorker_mutex);
259 pthread_mutex_unlock(&XrdFfsQueueWorker_mutex);
260 return i;
261}
262
263
264/* Test program below
265 ==================
266
267struct jobargs {
268 int i;
269 int XrdFfsQueueWorker_id;
270};
271
272void* job1(void *arg)
273{
274 int i = ((struct jobargs*)arg)->i;
275// int wid = ((struct jobargs*)arg)->XrdFfsQueueWorker_id;
276
277// if (i == 10 || i == 20 || i == 30 || i == 40)
278// sleep(2);
279 printf("hello from job1 ( %d )\n", i);
280}
281
282int main()
283{
284 int i;
285
286 XrdFfsQueue_create_workers(20);
287#define N 500
288 struct XrdFfsQueueTasks *myjob1[N];
289 struct jobargs myarg1[N];
290
291 sleep(1);
292 printf("1st round ...\n");
293 for (i = 0; i < N; i++)
294 {
295 myarg1[i].i = i;
296 myjob1[i] = XrdFfsQueue_create_task((void*) &job1, (void*) &myarg1[i], 0);
297 }
298 for (i = 0; i < N; i++)
299 {
300 XrdFfsQueue_wait_task(myjob1[i]);
301 XrdFfsQueue_free_task(myjob1[i]);
302 }
303
304 printf("there are %d workers after 1st round\n", XrdFfsQueue_count_workers());
305 printf("remove %d workers\n", XrdFfsQueue_remove_workers(8));
306 printf("add 1 worker\n");
307 XrdFfsQueue_create_workers(10);
308
309 sleep(2);
310 printf("2nd round ...\n");
311
312 for (i = 0; i < N; i++)
313 {
314 myarg1[i].i = i;
315 myjob1[i] = XrdFfsQueue_create_task((void*) &job1, (void*) &myarg1[i], 0);
316 }
317 for (i = 0; i < N; i++)
318 {
319 XrdFfsQueue_wait_task(myjob1[i]);
320 XrdFfsQueue_free_task(myjob1[i]);
321 }
322
323 XrdFfsQueue_remove_workers(XrdFfsQueue_count_workers());
324 printf("bye ...\n");
325 return 0;
326}
327
328*/
329
330#ifdef __cplusplus
331 }
332#endif
void XrdFfsQueue_free_task(struct XrdFfsQueueTasks *task)
struct XrdFfsQueueTasks * XrdFfsQueueTaskque_head
int XrdFfsQueue_remove_workers(int n)
pthread_mutex_t XrdFfsQueueWorker_mutex
unsigned int XrdFfsQueueWorker_id
struct XrdFfsQueueTasks * XrdFfsQueue_create_task(void *(*func)(void *), void **args, short initstat)
unsigned int XrdFfsQueueNext_task_id
pthread_mutex_t XrdFfsQueueTaskque_mutex
void XrdFfsQueue_enqueue(struct XrdFfsQueueTasks *task)
int XrdFfsQueue_count_workers()
int XrdFfsQueue_create_workers(int n)
struct XrdFfsQueueTasks * XrdFfsQueueTaskque_tail
pthread_cond_t XrdFfsQueueTaskque_cond
unsigned short XrdFfsQueueNworkers
unsigned int XrdFfsQueue_count_tasks()
void XrdFfsQueue_wait_task(struct XrdFfsQueueTasks *task)
struct XrdFfsQueueTasks * XrdFfsQueue_dequeue()
void * XrdFfsQueue_worker(void *x)
pthread_cond_t cond
unsigned int id
struct XrdFfsQueueTasks * prev
struct XrdFfsQueueTasks * next
pthread_mutex_t mutex
void *(* func)(void *)