16
16
#include "ntoskernel.h"
18
/* workqueue implementation for 2.4 kernels */
18
struct workq_thread_data {
19
workqueue_struct_t *workq;
20
23
static int workq_thread(void *data)
22
workqueue_struct_t *workq = data;
25
struct workq_thread_data *thread_data = data;
26
struct workqueue_thread *thread;
27
workqueue_struct_t *workq;
23
28
work_struct_t *work;
30
workq = thread_data->workq;
31
thread = &workq->threads[thread_data->index];
32
WORKTRACE("%p, %d, %p", workq, thread_data->index, thread);
33
strncpy(thread->name, current->comm, sizeof(thread->name));
26
35
#if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,7)
27
strncpy(current->comm, workq->name, sizeof(current->comm));
28
current->comm[sizeof(current->comm) - 1] = 0;
30
37
reparent_to_init();
31
38
current->nice -= 5;
39
sigfillset(¤t->blocked);
33
daemonize(workq->name);
41
daemonize(thread->name);
34
42
set_user_nice(current, -5);
38
current->flags |= PF_NOFREEZE;
40
sigfillset(¤t->blocked);
43
workq->task = current;
44
complete(xchg(&workq->completion, NULL));
45
WORKTRACE("%s (%d) started", workq->name, workq->pid);
46
while (workq->pending >= 0) {
47
if (wait_condition(workq->pending, 0, TASK_INTERRUPTIBLE) < 0) {
45
if (thread->task != current) {
46
WARNING("invalid task: %p, %p", thread->task, current);
47
thread->task = current;
49
thread->pid = current->pid;
50
complete(xchg(&thread->completion, NULL));
51
WORKTRACE("%s (%d) started", thread->name, thread->pid);
53
if (wait_condition(thread->pending, 0, TASK_INTERRUPTIBLE) < 0) {
48
54
/* TODO: deal with signal */
49
55
WARNING("signal not blocked?");
50
56
flush_signals(current);
54
60
struct list_head *entry;
56
spin_lock_irqsave(&workq->lock, flags);
57
if (list_empty(&workq->work_list)) {
63
spin_lock_irqsave(&thread->lock, flags);
64
if (list_empty(&thread->work_list)) {
58
65
struct completion *completion;
59
if (workq->pending > 0)
61
completion = workq->completion;
62
workq->completion = NULL;
63
spin_unlock_irqrestore(&workq->lock, flags);
66
if (thread->pending < 0) {
67
spin_unlock_irqrestore(&thread->lock,
72
completion = thread->completion;
73
thread->completion = NULL;
74
spin_unlock_irqrestore(&thread->lock, flags);
65
76
complete(completion);
68
entry = workq->work_list.next;
79
entry = thread->work_list.next;
69
80
work = list_entry(entry, work_struct_t, list);
70
if (xchg(&work->workq, NULL))
81
if (xchg(&work->thread, NULL))
74
spin_unlock_irqrestore(&workq->lock, flags);
85
spin_unlock_irqrestore(&thread->lock, flags);
87
WORKTRACE("%p, %p", work, thread);
76
90
work->func(work->data);
80
WORKTRACE("%s exiting", workq->name);
95
WORKTRACE("%s exiting", thread->name);
85
wfastcall void wrap_queue_work(workqueue_struct_t *workq, work_struct_t *work)
100
wfastcall int wrap_queue_work_on(workqueue_struct_t *workq, work_struct_t *work,
103
struct workqueue_thread *thread = &workq->threads[cpu];
87
104
unsigned long flags;
89
spin_lock_irqsave(&workq->lock, flags);
92
list_add_tail(&work->list, &workq->work_list);
94
wake_up_process(workq->task);
96
spin_unlock_irqrestore(&workq->lock, flags);
107
assert(thread->pid > 0);
109
WORKTRACE("%p, %d", workq, cpu);
111
spin_lock_irqsave(&thread->lock, flags);
115
work->thread = thread;
116
list_add_tail(&work->list, &thread->work_list);
118
wake_up_process(thread->task);
121
spin_unlock_irqrestore(&thread->lock, flags);
125
wfastcall int wrap_queue_work(workqueue_struct_t *workq, work_struct_t *work)
127
if (NR_CPUS == 1 || workq->singlethread)
128
return wrap_queue_work_on(workq, work, 0);
130
typeof(workq->qon) qon;
131
/* work is queued on threads in a round-robbin fashion */
133
qon = workq->qon % workq->num_cpus;
134
atomic_inc_var(workq->qon);
135
} while (!workq->threads[qon].pid);
136
return wrap_queue_work_on(workq, work, qon);
99
140
void wrap_cancel_work(work_struct_t *work)
101
workqueue_struct_t *workq;
142
struct workqueue_thread *thread;
102
143
unsigned long flags;
104
if ((workq = xchg(&work->workq, NULL))) {
105
spin_lock_irqsave(&workq->lock, flags);
145
WORKTRACE("%p", work);
146
if ((thread = xchg(&work->thread, NULL))) {
147
WORKTRACE("%p", thread);
148
spin_lock_irqsave(&thread->lock, flags);
106
149
list_del(&work->list);
107
spin_unlock_irqrestore(&workq->lock, flags);
150
spin_unlock_irqrestore(&thread->lock, flags);
111
workqueue_struct_t *wrap_create_wq(const char *name)
154
workqueue_struct_t *wrap_create_wq(const char *name, u8 singlethread, u8 freeze)
113
156
struct completion started;
114
workqueue_struct_t *workq = kmalloc(sizeof(*workq), GFP_KERNEL);
157
workqueue_struct_t *workq;
164
workq = kmalloc(sizeof(*workq) + n * sizeof(workq->threads[0]),
116
167
WARNING("couldn't allocate memory");
119
memset(workq, 0, sizeof(*workq));
120
spin_lock_init(&workq->lock);
121
strncpy(workq->name, name, sizeof(workq->name));
122
workq->name[sizeof(workq->name) - 1] = 0;
123
INIT_LIST_HEAD(&workq->work_list);
170
memset(workq, 0, sizeof(*workq) + n * sizeof(workq->threads[0]));
171
WORKTRACE("%p", workq);
124
172
init_completion(&started);
125
workq->completion = &started;
126
workq->pid = kernel_thread(workq_thread, workq, 0);
127
if (workq->pid <= 0) {
129
WARNING("couldn't start thread %s", name);
173
for_each_online_cpu(i) {
174
struct workq_thread_data thread_data;
175
spin_lock_init(&workq->threads[i].lock);
176
INIT_LIST_HEAD(&workq->threads[i].work_list);
177
INIT_COMPLETION(started);
178
workq->threads[i].completion = &started;
179
thread_data.workq = workq;
180
thread_data.index = i;
181
WORKTRACE("%p, %d, %p", workq, i, &workq->threads[i]);
182
#if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,7)
183
workq->threads[i].pid =
184
kernel_thread(workq_thread, &thread_data, CLONE_SIGHAND);
185
if (workq->threads[i].pid < 0)
186
workq->threads[i].task = (void *)-ENOMEM;
188
workq->threads[i].task =
189
find_task_by_pid(workq->threads[i].pid);
191
workq->threads[i].task =
192
kthread_create(workq_thread, &thread_data,
195
if (IS_ERR(workq->threads[i].task)) {
197
for (j = 0; j < i; j++)
198
wrap_destroy_wq_on(workq, j);
200
WARNING("couldn't start thread %s", name);
205
workq->threads[i].task->flags |= PF_NOFREEZE;
207
kthread_bind(workq->threads[i].task, i);
208
workq->num_cpus = max(workq->num_cpus, i);
209
wake_up_process(workq->threads[i].task);
210
wait_for_completion(&started);
211
WORKTRACE("%s, %d: %p, %d", name, i,
212
workq, workq->threads[i].pid);
132
wait_for_completion(&started);
136
void wrap_flush_wq(workqueue_struct_t *workq)
220
void wrap_flush_wq_on(workqueue_struct_t *workq, int cpu)
222
struct workqueue_thread *thread = &workq->threads[cpu];
138
223
struct completion done;
225
WORKTRACE("%p: %d, %s", workq, cpu, thread->name);
139
226
init_completion(&done);
140
workq->completion = &done;
142
wake_up_process(workq->task);
227
thread->completion = &done;
229
wake_up_process(thread->task);
143
230
wait_for_completion(&done);
234
void wrap_flush_wq(workqueue_struct_t *workq)
238
WORKTRACE("%p", workq);
239
if (workq->singlethread)
243
for (i = 0; i < n; i++)
244
wrap_flush_wq_on(workq, i);
247
void wrap_destroy_wq_on(workqueue_struct_t *workq, int cpu)
249
struct workqueue_thread *thread = &workq->threads[cpu];
251
WORKTRACE("%p: %d, %s", workq, cpu, thread->name);
254
thread->pending = -1;
255
wake_up_process(thread->task);
256
while (thread->pid) {
257
WORKTRACE("%d", thread->pid);
147
262
void wrap_destroy_wq(workqueue_struct_t *workq)
150
wake_up_process(workq->task);
152
WORKTRACE("%d", workq->pid);
266
WORKTRACE("%p", workq);
267
if (workq->singlethread)
271
for (i = 0; i < n; i++)
272
wrap_destroy_wq_on(workq, i);