~ok2/cl-escape/cl-escape

« back to all changes in this revision

Viewing changes to src/processes.lisp

  • Committer: Stefan Mandl
  • Date: 2013-03-21 22:52:42 UTC
  • Revision ID: mail@stefan-mandl.de-20130321225242-90i1adz7nja1mqmi
initial import

Show diffs side-by-side

added added

removed removed

Lines of Context:
 
1
;;; -*- Mode: LISP; Syntax: COMMON-LISP; Package: CL-USER; Base: 10 -*-
 
2
;;;
 
3
;;; These threads ain't green these threads aint't red so let's call
 
4
;;; them brown instead.
 
5
;;;
 
6
;;; Copyright (c) 2012, Stefan Mandl <lisp@stefan-mandl.de>.
 
7
;;; All rights reserved.
 
8
 
 
9
;;; Redistribution and use in source and binary forms, with or without
 
10
;;; modification, are permitted provided that the following conditions
 
11
;;; are met:
 
12
 
 
13
;;;   * Redistributions of source code must retain the above copyright
 
14
;;;     notice, this list of conditions and the following disclaimer.
 
15
 
 
16
;;;   * Redistributions in binary form must reproduce the above
 
17
;;;     copyright notice, this list of conditions and the following
 
18
;;;     disclaimer in the documentation and/or other materials
 
19
;;;     provided with the distribution.
 
20
 
 
21
;;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR 'AS IS' AND ANY EXPRESSED
 
22
;;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
 
23
;;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 
24
;;; ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
 
25
;;; DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 
26
;;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
 
27
;;; GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 
28
;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
 
29
;;; WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
 
30
;;; NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
 
31
;;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 
32
 
 
33
(in-package #:escape)
 
34
 
 
35
 
 
36
;; (defmacro condition-broadcast (&body body)
 
37
;;   #+sbcl `(sb-thread:condition-broadcast ,@body)
 
38
;;   #+ecl  `(mp:condition-variable-broadcast ,@body))
 
39
 
 
40
;; (defmacro condition-wait (&body body)
 
41
;;   #+sbcl `(sb-thread:condition-wait ,@body)
 
42
;;   #+ecl `(mp:condition-variable-wait ,@body))
 
43
 
 
44
;; (defmacro make-condition-variable ()
 
45
;;   #+sbcl `(sb-thread:make-waitqueue)
 
46
;;   #+ecl `(mp:make-condition-variable))
 
47
 
 
48
 
 
49
;;;; a simple thread barrier
 
50
;; (defclass thread-barrier ()
 
51
;;   ((num-threads-waiting :accessor num-threads-waiting :initform 0)
 
52
;;    (name :accessor name :initarg :name :initform (error "missing mandatory initarg :name"))
 
53
;;    (size :accessor size :initarg :size :initform (error "missing mandatory initarg :size"))
 
54
;;    (condition-variable :accessor condition-variable :initform (make-condition-variable))
 
55
;;    (lock :accessor lock :initform (make-mutex))
 
56
;;    (num-thread-leaving :accessor num-thread-leaving :initform 0)))
 
57
 
 
58
 
 
59
;; (defmethod initialize-instance :after ((tb thread-barrier) &rest initargs)
 
60
;;   (declare (ignore initargs))
 
61
;;   (setf (condition-variable tb)      (make-condition-variable)
 
62
;;      (lock tb) (make-mutex))
 
63
;;   tb)
 
64
 
 
65
 
 
66
;; (defmacro lock-and-wait-for-condition (&key lock condition-variable condition 
 
67
;;                                     (before-wait-steps nil) (after-wait-steps nil))
 
68
;;   `(with-mutex (,lock)
 
69
;;      ,@before-wait-steps
 
70
;;      (cond (,condition (condition-broadcast ,condition-variable))
 
71
;;         (t (loop while (not ,condition)
 
72
;;                  do
 
73
;;                  (condition-wait ,condition-variable ,lock))))
 
74
;;      ,@after-wait-steps))
 
75
 
 
76
 
 
77
;; (defmethod wait-at ((b thread-barrier))
 
78
;;   (lock-and-wait-for-condition 
 
79
;;    :lock (lock b) 
 
80
;;    :condition-variable (condition-variable b) 
 
81
;;    :condition (>= (num-threads-waiting b) (size b))
 
82
;;    :before-wait-steps ((incf (num-threads-waiting b)))
 
83
;;    :after-wait-steps ((incf (num-thread-leaving b))
 
84
;;                    (when (= (num-thread-leaving b) (size b))
 
85
;;                      (setf (num-thread-leaving b) 0
 
86
;;                            (num-threads-waiting b) 0)))))
 
87
           
 
88
 
 
89
 
 
90
 
 
91
 
 
92
;; workers handle a bunch of processes                                         
 
93
 
 
94
;; (defclass worker ()
 
95
;;   ((processes :accessor processes
 
96
;;            :initarg :processes
 
97
;;            :initform nil)
 
98
;;    (scheduler :accessor scheduler
 
99
;;            :initarg :scheduler)
 
100
;;    (terminate-flag :accessor terminate-flag
 
101
;;                 :initform nil))
 
102
;;   (:documentation "A worker manages a number of MoTEE processes.
 
103
;;  Each worker is assigned its own thread of execution."))
 
104
 
 
105
 
 
106
;; (defmethod add ((p process) (w worker))
 
107
;;   (push p (processes w)))
 
108
 
 
109
;; (defclass scheduler ()
 
110
;;   ((workers :accessor workers
 
111
;;          :initarg :workers
 
112
;;          :initform nil)
 
113
;;    (worker-threads :accessor worker-threads
 
114
;;                 :initform nil)
 
115
;;    (number-of-threads :accessor number-of-threads
 
116
;;                    :initarg :number-of-threads
 
117
;;                    :initform 4)
 
118
;;    (num-workers-finished-variable :reader num-workers-finished-variable
 
119
;;                                :initform (make-condition-variable))
 
120
;;    (num-workers-finished-lock :reader num-workers-finished-lock
 
121
;;                            :initform (make-mutex))
 
122
;;    (num-workers-finished :accessor num-workers-finished
 
123
;;                       :initform 0)
 
124
;;    (start-step-barrier :accessor start-step-barrier :initform nil)
 
125
;;    (commit-barrier :accessor commit-barrier :initform nil)
 
126
;;    (update-channels-barrier :accessor update-channels-barrier :initform nil)
 
127
;;    (step-finished-barrier :accessor step-finished-barrier :initform nil)
 
128
;;    (is-alive :accessor is-alive :initform nil)
 
129
;;    (motee :accessor motee :initarg :motee :initform nil)))
 
130
 
 
131
 
 
132
;; (defmethod initialize-instance :after ((s scheduler) &rest rest)
 
133
;;   "sets up the scheduler's commit barriers and create the worker instances."
 
134
;;   (declare (ignore rest))
 
135
;;   (setf (start-step-barrier s) 
 
136
;;      (make-instance 'thread-barrier :name "start-step-barrier" :size (1+ (number-of-threads s)))
 
137
;;      (commit-barrier s) 
 
138
;;      (make-instance 'thread-barrier :name "commit-barrier" :size (number-of-threads s))
 
139
;;      (update-channels-barrier s)
 
140
;;      (make-instance 'thread-barrier :name "update-channels-barrier" :size (number-of-threads s))
 
141
;;      (step-finished-barrier s) 
 
142
;;      (make-instance 'thread-barrier :name "step-finished-barrier" :size (1+ (number-of-threads s))))
 
143
;;   (dotimes (i (number-of-threads s))
 
144
;;     (let* ((worker (make-instance 'worker :scheduler s))
 
145
;;         (worker-thread (make-thread #'run (format nil "Worker-~a" i) worker)))
 
146
;;       (push worker (workers s))
 
147
;;       (push worker-thread (worker-threads s))))
 
148
;;   (setf (is-alive s) t))
 
149
            
 
150
           
 
151
;; (defmethod run ((w worker))
 
152
;;   (loop while (not (terminate-flag w))
 
153
;;      do
 
154
;;      (wait-at (start-step-barrier (scheduler w)))
 
155
;;      (dolist (p (processes w))
 
156
;;        (step-process p))
 
157
;;      (wait-at (commit-barrier (scheduler w)))
 
158
;;      (dolist (p (processes w))
 
159
;;        (commit p))
 
160
;;      (wait-at (update-channels-barrier (scheduler w)))
 
161
;;      (dolist (p (processes w))
 
162
;;        (update p))
 
163
;;      (wait-at (step-finished-barrier (scheduler w)))))
 
164
 
 
165
 
 
166
;; (defmethod add ((p process) (s scheduler))
 
167
;;   ;; TODO: need to be more clever here:
 
168
;;   (let ((min-worker nil)
 
169
;;      (min-processes 0))
 
170
;;     (loop for w in (workers s)
 
171
;;        do (when (or (not min-worker)
 
172
;;                     (< (length (processes w)) min-processes))
 
173
;;             (setf min-processes (length (processes w))
 
174
;;                   min-worker w)))
 
175
;;     (add p min-worker)))
 
176
 
 
177
;; (defmethod one-step ((s scheduler))
 
178
;;   (if (is-alive s)
 
179
;;       (progn
 
180
;;      (wait-at (start-step-barrier s))
 
181
;;      (wait-at (step-finished-barrier s)))
 
182
;;       (error "The scheduler is not alive. Either it has not been properly initialized or it has been shutdown")))
 
183
 
 
184
 
 
185
 
 
186
;; (defmethod shutdown ((s scheduler))
 
187
;;   (flet ((kill-em-all (worker-threads)
 
188
;;         (handler-bind ((sb-thread:interrupt-thread-error                     
 
189
;;                         (lambda (c)
 
190
;;                           (format *trace-output* "Condition: ~a~%" c)
 
191
;;                           (let ((restart (find-restart 'retry)))
 
192
;;                             (if restart
 
193
;;                                 (invoke-restart restart)
 
194
;;                                  (format *trace-output* "... ignoring~%"))))))
 
195
;;           (dolist (wt worker-threads)
 
196
;;             (when (sb-thread:thread-alive-p wt)
 
197
;;               (sb-thread:terminate-thread wt))))))
 
198
;;     (kill-em-all (worker-threads s))
 
199
;;     (setf (is-alive s) nil)))
 
200
 
 
201
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
202
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
203
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
204
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
205
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
206
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
207
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
208
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
209
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
210
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
211
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
212
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
213
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
214
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
215
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
216
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
217
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
218
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
219
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
220
 
 
221
 
 
222
 
 
223
;;;;
 
224
 
 
225
 
 
226
 
 
227
 
 
228
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
 
229
 
 
230
(defclass pid ()
 
231
  ((hostname :accessor hostname :initarg :hostname :initform nil)
 
232
   (id :accessor id :initarg :id :initform (gensym "PID-"))))
 
233
 
 
234
(defclass mailbox ()
 
235
  ((messages :accessor messages :initform (list))
 
236
   (lock :reader lock :initform (bt:make-recursive-lock))
 
237
   (message-received :reader message-received :initform (bt:make-condition-variable))))
 
238
 
 
239
(defclass bt-process ()
 
240
  ((pid :accessor pid :initform (make-instance 'pid))
 
241
   (mailbox :accessor mailbox :initform (make-instance 'mailbox))
 
242
   ;(cont  :initarg :cont :initform (error "missing mandatory initarg :cont"))
 
243
   (state :accessor state :initarg :state :initform (error "missing mandatory initarg :state"))
 
244
   (scheduler :accessor scheduler :initarg :scheduler :initform (error "missing mandatory initarg :scheduler"))
 
245
   (name :accessor name :initarg :name :initform "anonymous")
 
246
 ;  (pvalue :accessor process-value :initarg :process-value :initform nil)))
 
247
   ))
 
248
 
 
249
(defmethod initialize-instance :after ((p bt-process) &rest rest)
 
250
  (declare (ignore rest))
 
251
  ;; (setf (state p)
 
252
  ;;       (make-state :cont (slot-value p 'cont) :value nil))
 
253
  (when (not (name p))
 
254
    (setf (name p) "anonymous"))
 
255
  (add (scheduler p)
 
256
       p))
 
257
 
 
258
 
 
259
(defun number-of-threads-to-use ()
 
260
  (max (length (remove-if-not (lambda (e) 
 
261
                                (eq (car e) :processor))
 
262
                              (com.informatimago.clmisc.resource-utilization::cpu-info)))
 
263
       2)) ;; 2 because at least for testing, we want to have a little fun
 
264
 
 
265
           
 
266
 
 
267
(defclass scheduler ()
 
268
  ((processes :initform (make-instance 'fifo) :accessor processes)
 
269
   (idle-process :initform nil :accessor idle-process)
 
270
   (workers :accessor workers :initform nil)
 
271
   (worker-threads :accessor worker-threads :initform nil)
 
272
   (exit-when-no-processes :accessor exit-when-no-processes :initarg :exit-when-no-processes :initform t)
 
273
   (lock :initform (bt:make-recursive-lock) :accessor lock)))
 
274
 
 
275
 
 
276
(defmethod work ((s scheduler))
 
277
  "Implement me :-)"
 
278
  )
 
279
 
 
280
;; (defmethod initialize-instance :after ((s scheduler) &rest rest)
 
281
;;   (declare (ignore rest))
 
282
;;   (labels ((worker-kickstart ()
 
283
;;              (bt:with-recursive-lock-held ((lock s))
 
284
;;                (push (bt:current-thread)
 
285
;;                      (worker-threads s)))
 
286
;;              (work s)))
 
287
;;     (dotimes (i (number-of-threads-to-use))
 
288
;;       (bt:make-thread #'worker-kickstart :name (format nil "Worker-~a" i)))))
 
289
 
 
290
(defun make-idle-process (scheduler)
 
291
  "The idle process is running all the time and doing
 
292
some service work"
 
293
  (make-instance 'bt-process :name "*idle*" :scheduler scheduler
 
294
                 :state (ecp
 
295
                          (while t                         
 
296
                            (lisp (bt:with-recursive-lock-held ((lock (scheduler self)))
 
297
                                        ;(sleep 0.2)
 
298
                                        ;(format t "Idle: ~a~%" (number-of-elements (processes (scheduler self))))
 
299
                                    (setf (processes (scheduler self))
 
300
                                          (fifo-remove-if (lambda (p)
 
301
                                                            (let ((term (and (not (eql p self))
 
302
                                                                             (or (null (st-c (state p))) (st-termp (state p))))))
 
303
                                                              (when term
 
304
                                                                (format t "removing terminated process: ~a~%" p))
 
305
                                                              term))
 
306
                                                          (processes (scheduler self))))))))))
 
307
 
 
308
(defmethod initialize-instance :after ((s scheduler) &rest rest)
 
309
  (declare (ignore rest))
 
310
  (bt:with-recursive-lock-held ((lock s))
 
311
    (setf (idle-process s) (make-idle-process s))))
 
312
   
 
313
 
 
314
(defmethod get-process-to-process ((s scheduler))
 
315
  (bt:with-recursive-lock-held ((lock s))
 
316
    (cycle-next (processes s))))
 
317
 
 
318
(defmethod add ((s scheduler) (p bt-process))
 
319
  ;(when (not (null (cont p)))
 
320
  (bt:with-recursive-lock-held ((lock s))
 
321
    (add p (processes s))))
 
322
 
 
323
(defmethod run ((s scheduler))
 
324
  (loop while ;(not (emptyp (processes s)))
 
325
        (or (not (exit-when-no-processes s)) 
 
326
            (> (number-of-elements (processes s)) 1))
 
327
        do (let ((p (get-process-to-process s)))
 
328
             (when (st-c (state p))
 
329
               ;(format t "old state: ~a~%" (state p))
 
330
               (setf (state p) (funcall (st-c (state p)) p (st-v (state p))))
 
331
               ;(format t "new state: ~a~%" (state p))
 
332
               ))))
 
333
        
 
334
 
 
335
 
 
336
(defmacro ! (process message)
 
337
  `(progn
 
338
     ;(format t "sending message ~a to ~a~%" ,message ,process)
 
339
     (push ,message (messages (mailbox ,process)))))
 
340
 
 
341
 
 
342
 
 
343
(defmacro spawn (proc &key name scheduler)
 
344
  `(make-instance 'bt-process 
 
345
                  :state ,proc 
 
346
                  :name ,name 
 
347
                  :scheduler ,(if scheduler
 
348
                                  scheduler
 
349
                                  '(scheduler self))))
 
350
 
 
351
 
 
352
 
 
353
 
 
354
 
 
355
 
 
356
 
 
357