1
;;; -*- Mode: LISP; Syntax: COMMON-LISP; Package: CL-USER; Base: 10 -*-
3
;;; These threads ain't green these threads aint't red so let's call
4
;;; them brown instead.
6
;;; Copyright (c) 2012, Stefan Mandl <lisp@stefan-mandl.de>.
7
;;; All rights reserved.
9
;;; Redistribution and use in source and binary forms, with or without
10
;;; modification, are permitted provided that the following conditions
13
;;; * Redistributions of source code must retain the above copyright
14
;;; notice, this list of conditions and the following disclaimer.
16
;;; * Redistributions in binary form must reproduce the above
17
;;; copyright notice, this list of conditions and the following
18
;;; disclaimer in the documentation and/or other materials
19
;;; provided with the distribution.
21
;;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR 'AS IS' AND ANY EXPRESSED
22
;;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
23
;;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
24
;;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
25
;;; DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
26
;;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
27
;;; GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
28
;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
29
;;; WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
30
;;; NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
31
;;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36
;; (defmacro condition-broadcast (&body body)
37
;; #+sbcl `(sb-thread:condition-broadcast ,@body)
38
;; #+ecl `(mp:condition-variable-broadcast ,@body))
40
;; (defmacro condition-wait (&body body)
41
;; #+sbcl `(sb-thread:condition-wait ,@body)
42
;; #+ecl `(mp:condition-variable-wait ,@body))
44
;; (defmacro make-condition-variable ()
45
;; #+sbcl `(sb-thread:make-waitqueue)
46
;; #+ecl `(mp:make-condition-variable))
49
;;;; a simple thread barrier
50
;; (defclass thread-barrier ()
51
;; ((num-threads-waiting :accessor num-threads-waiting :initform 0)
52
;; (name :accessor name :initarg :name :initform (error "missing mandatory initarg :name"))
53
;; (size :accessor size :initarg :size :initform (error "missing mandatory initarg :size"))
54
;; (condition-variable :accessor condition-variable :initform (make-condition-variable))
55
;; (lock :accessor lock :initform (make-mutex))
56
;; (num-thread-leaving :accessor num-thread-leaving :initform 0)))
59
;; (defmethod initialize-instance :after ((tb thread-barrier) &rest initargs)
60
;; (declare (ignore initargs))
61
;; (setf (condition-variable tb) (make-condition-variable)
62
;; (lock tb) (make-mutex))
66
;; (defmacro lock-and-wait-for-condition (&key lock condition-variable condition
67
;; (before-wait-steps nil) (after-wait-steps nil))
68
;; `(with-mutex (,lock)
69
;; ,@before-wait-steps
70
;; (cond (,condition (condition-broadcast ,condition-variable))
71
;; (t (loop while (not ,condition)
73
;; (condition-wait ,condition-variable ,lock))))
74
;; ,@after-wait-steps))
77
;; (defmethod wait-at ((b thread-barrier))
78
;; (lock-and-wait-for-condition
80
;; :condition-variable (condition-variable b)
81
;; :condition (>= (num-threads-waiting b) (size b))
82
;; :before-wait-steps ((incf (num-threads-waiting b)))
83
;; :after-wait-steps ((incf (num-thread-leaving b))
84
;; (when (= (num-thread-leaving b) (size b))
85
;; (setf (num-thread-leaving b) 0
86
;; (num-threads-waiting b) 0)))))
92
;; workers handle a bunch of processes
94
;; (defclass worker ()
95
;; ((processes :accessor processes
96
;; :initarg :processes
98
;; (scheduler :accessor scheduler
99
;; :initarg :scheduler)
100
;; (terminate-flag :accessor terminate-flag
102
;; (:documentation "A worker manages a number of MoTEE processes.
103
;; Each worker is assigned its own thread of execution."))
106
;; (defmethod add ((p process) (w worker))
107
;; (push p (processes w)))
109
;; (defclass scheduler ()
110
;; ((workers :accessor workers
113
;; (worker-threads :accessor worker-threads
115
;; (number-of-threads :accessor number-of-threads
116
;; :initarg :number-of-threads
118
;; (num-workers-finished-variable :reader num-workers-finished-variable
119
;; :initform (make-condition-variable))
120
;; (num-workers-finished-lock :reader num-workers-finished-lock
121
;; :initform (make-mutex))
122
;; (num-workers-finished :accessor num-workers-finished
124
;; (start-step-barrier :accessor start-step-barrier :initform nil)
125
;; (commit-barrier :accessor commit-barrier :initform nil)
126
;; (update-channels-barrier :accessor update-channels-barrier :initform nil)
127
;; (step-finished-barrier :accessor step-finished-barrier :initform nil)
128
;; (is-alive :accessor is-alive :initform nil)
129
;; (motee :accessor motee :initarg :motee :initform nil)))
132
;; (defmethod initialize-instance :after ((s scheduler) &rest rest)
133
;; "sets up the scheduler's commit barriers and create the worker instances."
134
;; (declare (ignore rest))
135
;; (setf (start-step-barrier s)
136
;; (make-instance 'thread-barrier :name "start-step-barrier" :size (1+ (number-of-threads s)))
137
;; (commit-barrier s)
138
;; (make-instance 'thread-barrier :name "commit-barrier" :size (number-of-threads s))
139
;; (update-channels-barrier s)
140
;; (make-instance 'thread-barrier :name "update-channels-barrier" :size (number-of-threads s))
141
;; (step-finished-barrier s)
142
;; (make-instance 'thread-barrier :name "step-finished-barrier" :size (1+ (number-of-threads s))))
143
;; (dotimes (i (number-of-threads s))
144
;; (let* ((worker (make-instance 'worker :scheduler s))
145
;; (worker-thread (make-thread #'run (format nil "Worker-~a" i) worker)))
146
;; (push worker (workers s))
147
;; (push worker-thread (worker-threads s))))
148
;; (setf (is-alive s) t))
151
;; (defmethod run ((w worker))
152
;; (loop while (not (terminate-flag w))
154
;; (wait-at (start-step-barrier (scheduler w)))
155
;; (dolist (p (processes w))
157
;; (wait-at (commit-barrier (scheduler w)))
158
;; (dolist (p (processes w))
160
;; (wait-at (update-channels-barrier (scheduler w)))
161
;; (dolist (p (processes w))
163
;; (wait-at (step-finished-barrier (scheduler w)))))
166
;; (defmethod add ((p process) (s scheduler))
167
;; ;; TODO: need to be more clever here:
168
;; (let ((min-worker nil)
169
;; (min-processes 0))
170
;; (loop for w in (workers s)
171
;; do (when (or (not min-worker)
172
;; (< (length (processes w)) min-processes))
173
;; (setf min-processes (length (processes w))
175
;; (add p min-worker)))
177
;; (defmethod one-step ((s scheduler))
180
;; (wait-at (start-step-barrier s))
181
;; (wait-at (step-finished-barrier s)))
182
;; (error "The scheduler is not alive. Either it has not been properly initialized or it has been shutdown")))
186
;; (defmethod shutdown ((s scheduler))
187
;; (flet ((kill-em-all (worker-threads)
188
;; (handler-bind ((sb-thread:interrupt-thread-error
190
;; (format *trace-output* "Condition: ~a~%" c)
191
;; (let ((restart (find-restart 'retry)))
193
;; (invoke-restart restart)
194
;; (format *trace-output* "... ignoring~%"))))))
195
;; (dolist (wt worker-threads)
196
;; (when (sb-thread:thread-alive-p wt)
197
;; (sb-thread:terminate-thread wt))))))
198
;; (kill-em-all (worker-threads s))
199
;; (setf (is-alive s) nil)))
201
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
202
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
203
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
204
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
205
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
206
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
207
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
208
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
209
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
210
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
211
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
212
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
213
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
214
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
215
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
216
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
217
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
218
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
219
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
228
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
231
((hostname :accessor hostname :initarg :hostname :initform nil)
232
(id :accessor id :initarg :id :initform (gensym "PID-"))))
235
((messages :accessor messages :initform (list))
236
(lock :reader lock :initform (bt:make-recursive-lock))
237
(message-received :reader message-received :initform (bt:make-condition-variable))))
239
(defclass bt-process ()
240
((pid :accessor pid :initform (make-instance 'pid))
241
(mailbox :accessor mailbox :initform (make-instance 'mailbox))
242
;(cont :initarg :cont :initform (error "missing mandatory initarg :cont"))
243
(state :accessor state :initarg :state :initform (error "missing mandatory initarg :state"))
244
(scheduler :accessor scheduler :initarg :scheduler :initform (error "missing mandatory initarg :scheduler"))
245
(name :accessor name :initarg :name :initform "anonymous")
246
; (pvalue :accessor process-value :initarg :process-value :initform nil)))
249
(defmethod initialize-instance :after ((p bt-process) &rest rest)
250
(declare (ignore rest))
252
;; (make-state :cont (slot-value p 'cont) :value nil))
254
(setf (name p) "anonymous"))
259
(defun number-of-threads-to-use ()
260
(max (length (remove-if-not (lambda (e)
261
(eq (car e) :processor))
262
(com.informatimago.clmisc.resource-utilization::cpu-info)))
263
2)) ;; 2 because at least for testing, we want to have a little fun
267
(defclass scheduler ()
268
((processes :initform (make-instance 'fifo) :accessor processes)
269
(idle-process :initform nil :accessor idle-process)
270
(workers :accessor workers :initform nil)
271
(worker-threads :accessor worker-threads :initform nil)
272
(exit-when-no-processes :accessor exit-when-no-processes :initarg :exit-when-no-processes :initform t)
273
(lock :initform (bt:make-recursive-lock) :accessor lock)))
276
(defmethod work ((s scheduler))
280
;; (defmethod initialize-instance :after ((s scheduler) &rest rest)
281
;; (declare (ignore rest))
282
;; (labels ((worker-kickstart ()
283
;; (bt:with-recursive-lock-held ((lock s))
284
;; (push (bt:current-thread)
285
;; (worker-threads s)))
287
;; (dotimes (i (number-of-threads-to-use))
288
;; (bt:make-thread #'worker-kickstart :name (format nil "Worker-~a" i)))))
290
(defun make-idle-process (scheduler)
291
"The idle process is running all the time and doing
293
(make-instance 'bt-process :name "*idle*" :scheduler scheduler
296
(lisp (bt:with-recursive-lock-held ((lock (scheduler self)))
298
;(format t "Idle: ~a~%" (number-of-elements (processes (scheduler self))))
299
(setf (processes (scheduler self))
300
(fifo-remove-if (lambda (p)
301
(let ((term (and (not (eql p self))
302
(or (null (st-c (state p))) (st-termp (state p))))))
304
(format t "removing terminated process: ~a~%" p))
306
(processes (scheduler self))))))))))
308
(defmethod initialize-instance :after ((s scheduler) &rest rest)
309
(declare (ignore rest))
310
(bt:with-recursive-lock-held ((lock s))
311
(setf (idle-process s) (make-idle-process s))))
314
(defmethod get-process-to-process ((s scheduler))
315
(bt:with-recursive-lock-held ((lock s))
316
(cycle-next (processes s))))
318
(defmethod add ((s scheduler) (p bt-process))
319
;(when (not (null (cont p)))
320
(bt:with-recursive-lock-held ((lock s))
321
(add p (processes s))))
323
(defmethod run ((s scheduler))
324
(loop while ;(not (emptyp (processes s)))
325
(or (not (exit-when-no-processes s))
326
(> (number-of-elements (processes s)) 1))
327
do (let ((p (get-process-to-process s)))
328
(when (st-c (state p))
329
;(format t "old state: ~a~%" (state p))
330
(setf (state p) (funcall (st-c (state p)) p (st-v (state p))))
331
;(format t "new state: ~a~%" (state p))
336
(defmacro ! (process message)
338
;(format t "sending message ~a to ~a~%" ,message ,process)
339
(push ,message (messages (mailbox ,process)))))
343
(defmacro spawn (proc &key name scheduler)
344
`(make-instance 'bt-process
347
:scheduler ,(if scheduler