;; @module MP ;; @author Jeff Ober , Kanen Flowers ;; @version 1.1 ;; @location http://static.artfulcode.net/newlisp/mp.lsp ;; @package http://static.artfulcode.net/newlisp/mp.qwerty ;; @description Classes for multi-processing and synchronization (requires newlisp 10) ;; Provides many classes for controlling access to resources as well as ;; utilities for common multi-processing tasks. Requires newlisp 10 and the ;; util module. ;; ;;

Version history

;; 1.1 ;; • MP:iter and MP:map both now check spawn returns for errors and re-throw them ;; • MP:map and MP:iter now block in sleep, rather than sync, which uses *much* less cpu time ;; ;; 1.0 ;; • initial release (replaces locks module) ;;;============================================================================= ;;; MP: multi-processing utilities for newlisp. ;;;============================================================================= (context 'MP) ;; @syntax (MP:get-pid) ;;

Returns the pid of the current process.

;; @example ;; (MP:get-pid) => 16024 (define (get-pid) (sys-info 6)) ;; @syntax (MP:with-lock-held [ ...]) ;; @param an instance of a locking class with an :acquire and :release method ;; @param one or more expressions to be evaluated ;;

Evaluates one or more expressions with acquired. may be an ;; instance of any class with an :acquire and :release method. MP:with-lock-held ;; guarantees that the lock will be released, even if an error is thrown during ;; evaluation of the body expressions. Errors thrown will be re-thrown after the ;; lock is released. The value of the expression is the value of the last body ;; form evaluated.

;; @example ;; (setf lock (Lock)) ;; (MP:with-lock-held lock ;; (do stuff)) (define-macro (with-lock-held) (letex ((_inst (args 0)) (_body (cons begin (rest (args)))) (_err (gensym)) (_res (gensym))) (local (_err _res) (:acquire _inst) (setf _err (catch _body '_res)) (:release _inst) (if-not _err (throw-error _res) _res)))) ;; @syntax (MP:wait []) ;; @param a predicate used to test state ;; @param the initial sleep time (milliseconds) ;; @param the maximum sleep time (milliseconds) ;; @param the maximum number of milliseconds to wait for to return true ;;

Blocks until returns true. Wait will poll ;; every ms, growing every ten polling cycles up to ms, ;; up to an optional ms. Returns true when the polling loop ;; returns normally, nil when (if present) was reached. Note that ;; will be approximately observed; it is affected by the current ;; interval. If and are equal, no change in the polling ;; interval will take place.

;; @example ;; ; Blocks until 'some-flag is set to true. Polls initially every 50ms, ;; ; increasing to 500ms. After 5 seconds (5000 ms), returns even if ;; ; 'some-flag is nil. ;; ;; (MP:wait (fn () (true? some-flag)) 50 500 5000) (define (wait condition start-interval max-interval timeout , waited result increase current) (setf increase (ceil (/ (- max-interval start-interval) 10))) (setf current start-interval) (setf waited 0) (until (or (and timeout (>= waited timeout)) ; if timeout param present, check for timeout (setf result (condition))) ; gives us our return value (when (zero? (mod $idx 10)) (inc current increase)) (inc waited (sleep current))) result) ;; @syntax (MP:map []) ;; @param a function to apply to each element of ;; @param a list ;; @param the max number of processes to start ;; @param the (approximate) max time to wait for the process to complete ;;

Maps over , bounding the number of running processes to ;; .

;; @example ;; (MP:map pow (sequence 0 4) 4) ;; => (0 1 4 9 16) (define (MP:map fun seq limit timeout , result mem (MP:wait-n 1) (max-wait 500) (increment 2)) ; Using an array makes symbol access faster (setf mem (make-array (length seq) gensym)) (dolist (elt seq) ; Gradually increasing the sync timeout reduces polling overhead ; for long-running calculations. (when limit (until (< (length (sync)) limit) (when (< wait-n max-wait) (setf wait-n (* increment wait-n))) (sleep wait-n) (sync 50))) ; Add a new child process (spawn (mem $idx) (fun elt))) ; Wait for remaining calculations to complete (sync -1) ; Get results and delete symbols used (setf result (MAIN:map eval (array-list mem))) ; Check for errors in results (dolist (res result) (when (and (string? res) (starts-with res "ERR:")) (throw-error (replace {(ERR: user error : )+} res "" 0)))) (array-iter delete mem) ; Return result result) ;; @syntax (MP:iter [ []]) ;; @param a function to apply to each element of ;; @param a list ;; @param the max number of processes to start ;; @param the (approximate) max time to wait for the process to complete ;;

Iterates over , applying to each element. If is ;; specified, will not start more than processes. Returns the value of ;; the final iteration.

;; @example ;; (MP:iter println (sequence 0 4) 4) ;; 0 ;; 1 ;; 3 ;; 2 ;; 4 (define (iter fun seq limit , mem check result (MP:wait-n 1) (max-wait 500) (increment 2)) ; Using an array makes symbol access faster (setf mem (make-array (length seq) gensym)) (dolist (elt seq) ; Gradually increasing the sync timeout reduces polling overhead ; for long-running calculations. (when limit (until (< (length (sync)) limit) (when (< wait-n max-wait) (setf wait-n (* increment wait-n))) (sleep wait-n) (sync 50))) ; Add a new child process (spawn (mem $idx) (fun elt))) ; Wait for remaining calculations to complete (sync -1) ; Check for errors in results (dotimes (i (length seq)) (setf check (eval (nth i mem))) (when (and (string? check) (starts-with check "ERR:")) (throw-error (replace {(ERR: user error : )+} check "" 0)))) ; Get results and delete symbols used (setf result (eval (last mem))) (array-iter delete mem) result) (context 'MAIN) ;;;============================================================================= ;;; Semaphore: a synchronized counter that blocks when attempting to decrement ;;; below zero. Also known as a counting semaphore. By default, newly created ;;; Semaphores are initialized with a count of 1. ;;;============================================================================= ;; @syntax (Semaphore ) ;; @param the initial value of the semaphore ;;

Creates a synchronized counter that cannot drop below zero. Any attempt to ;; do so will block until the counter has been incremented (released). A basic ;; semaphore may count as high or low as desired; this is useful for protecting ;; queues or stacks. By default, Semaphores are initialized with a value of 1.

;; @example ;; (setf sem (Semaphore)) ;; (setf queue '()) ;; (dotimes (i 10) ;; (push i queue) ;; (:inc sem 1)) (define (Semaphore:Semaphore (initial-value 1) , sem) (setf sem (semaphore)) (semaphore sem initial-value) (list (context) sem)) ;; @syntax (:inc []) ;; @param an instance of Semaphore ;; @param the amount to increment; default is 1 ;;

Increments (releases) the Semaphore by .

(define (Semaphore:inc inst (n 1)) (semaphore (inst 1) n)) ;; @syntax (:dec []) ;; @param an instance of Semaphore ;; @param the amount to decrement; default is 1 ;;

Decrements (acquires) the Semaphore by .

(define (Semaphore:dec inst (n 1)) (semaphore (inst 1) (- n))) ;; @syntax (:count ) ;; @param an instance of Semaphore ;;

Returns the current count of the Semaphore.

(define (Semaphore:count inst) (semaphore (inst 1))) ;; @syntax (:acquire [ []]) ;; @param an instance of Semaphore ;; @param if true (default is true) blocks if Semaphore is held ;; @param the amount by which the Semaphore is to be decremented ;;

Attempts to acquire the Semaphore. If is true, :acquire will ;; block until Semaphore becomes available. If is nil, :acquire will ;; attempt to acquire the Semaphore and return nil immediately if it is ;; unavailable.

(define (Semaphore:acquire inst (blocking true) (n 1)) (when (or blocking (>= n (:count inst))) (:dec inst n))) ;; @syntax (:release ) ;; @param an instance of Semaphore ;;

Releases the Semaphore.

(setf Semaphore:release Semaphore:inc) ;;;============================================================================= ;;; Share: a shared page in memory. Only integers, floats, or strings may be ;;; stored. To store complex objects, use source and pack to first convert the ;;; object to a string. ;;;============================================================================= ;; @syntax (Share) ;;

A Share wraps a single page in memory which may be used to store interger, ;; float, or string values between processes. In order to store compound objects ;; or lists, use source and/or pack to serialize the object first. Access to the ;; Share between different processes must be protected with locking ;; mechanisms.

(define (Share:Share) (list (context) (share))) ;; @syntax (:set ) ;; @param an instance of Share ;; @param the new value for the Share ;;

Sets the Share's value to .

(define (Share:set inst value) (share (inst 1) value)) ;; @syntax (:get ) ;; @param an instance of Share ;;

Gets the value of the Share.

(define (Share:get inst) (share (inst 1))) ;;;============================================================================= ;;; Synchronized: a Share that has access protected with a semaphore. ;;;============================================================================= ;; @syntax (Synchronized []) ;; @param the initial value, if any ;;

Synchronized wraps a Share and protects access to it with a Semaphore.

(define (Synchronized:Synchronized ) (setf mem (Share)) (when initial-value (:set mem initial-value)) (list (context) mem (Semaphore 1))) ;; @syntax (:get ) ;; @param an instance of Synchronized ;;

Gets the current value of the Synchronized instance. Will block if another ;; process is currently getting or setting the value.

(define (Synchronized:get inst) (MP:with-lock-held (inst 2) (:get (inst 1)))) ;; @syntax (:set ) ;; @param an instance of Synchronized ;; @param the new value ;;

Sets the value of the Synchronized share. If the new value is an ;; expression, it will be evaluated with the variable $0 set to the old value ;; of the share. This is necessary to prevent a deadlock when dealing with ;; self-referential values.

;; @example ;; (setf mem (Synchronized)) ;; (:set mem 10) ;; => 10 ;; (:set mem (+ 10 $0)) ;; => 20 (define-macro (Synchronized:set) (letex ((_sync_inst (args 0)) (_sync_expr (args 1))) (MP:with-lock-held (_sync_inst 2) (setf $0 (:get (_sync_inst 1))) (:set (_sync_inst 1) (eval _sync_expr))))) ;;;============================================================================= ;;; Lock: a binary semaphore. It is an error for a different process than that ;;; which acquires the lock to release the lock. ;;;============================================================================= ;; @syntax (Lock) ;;

A Lock is a binary semaphore (or mutual exclusion lock) that may be set to ;; either 1 (released) or 0 (acquired). It is an error for a process to release ;; a Lock it has not acquired.

(define (Lock:Lock) (list (context) (Semaphore) (Synchronized))) ;; @syntax (:acquire []) ;; @param an instance of Lock ;; @param whether to block if the Lock is not available (default is true) ;;

Attempts to acquire the Lock, blocking until it becomes available if ;; is true.

(define (Lock:acquire inst (blocking true)) (when (:acquire (inst 1) blocking) (:set (inst 2) (MP:get-pid)) true)) ;; @syntax (:release ) ;; @param an instance of Lock ;;

Releases the Lock.

(define (Lock:release inst) (if (= (MP:get-pid) (:get (inst 2))) (:release (inst 1)) (throw-error "unlocking process does not match owner"))) ;;;============================================================================= ;;; RLock: identical to a Lock except that the locking process may acquire the ;;; lock multiple times. The number of acquires must be >= the number of ;;; releases. ;;;============================================================================= ;; @syntax (RLock) ;;

An RLock is a Lock that may be acquired multiple times by the same ;; process. This is useful to lock various inter-dependent functions in the same ;; process with a single lock. Observes the invariant # acquires >= # releases.

(define (RLock:RLock) (list (context) (Semaphore) (Synchronized) (Synchronized))) (define (RLock:owner inst) (inst 2)) (define (RLock:counter inst) (inst 3)) (define (RLock:held? inst) (zero? (:count (inst 1)))) (define (RLock:process-is-owner? inst) (= (MP:get-pid) (:get (:owner inst)))) (define (RLock:inc inst) (:set (:counter inst) (+ $0 1))) (define (RLock:dec inst) (:set (:counter inst) (- $0 1))) ;; @syntax (:acquire []) ;; @param an instance of RLock ;; @param whether to block if the RLock is not available (default is true) ;;

Attempts to acquire the RLock, blocking until it becomes available if ;; is true.

(define (RLock:acquire inst (blocking true)) (if (and (:held? inst) (:process-is-owner? inst)) (:inc inst) (when (:acquire (inst 1)) (:set (:counter inst) 1) (:set (:owner inst) (MP:get-pid))))) ;; @syntax (:release ) ;; @param an instance of RLock ;;

Releases the RLock.

(define (RLock:release inst) (unless (:held? inst) (throw-error "lock is not held")) (unless (:process-is-owner? inst) (throw-error "owner and releasing process do not match")) (:dec inst) (when (zero? (:get (:counter inst))) (:release (inst 1)))) ;;;============================================================================= ;;; Event: a simple mechanism to signal one or more waiting processes that some ;;; condition has been met. ;;;============================================================================= ;; @syntax (Event) ;;

An Event is a simple mechanism for synchronization. It allows multiple ;; processes to block until a controlling process issues a signal to ;; unblock.

(define (Event:Event , mem) (list (context) (Synchronized 0))) ;; @syntax (:reset inst) ;; @param an instance of Event ;;

Resets this Event. Does checking to see if any processes are ;; waiting on this event. Those processes will remain locked until this ;; Event is signaled again.

(define (Event:reset inst) (:set (inst 1) 0)) ;; @syntax (:signaled? inst) ;; @param an instance of Event ;;

Returns true if this Event has already been signaled. (define (Event:signaled? inst) (= 1 (:get (inst 1)))) ;; @syntax (:signal inst) ;; @param an instance of Event ;;

Signals and unblocks any processes waiting on this Event.

(define (Event:signal inst) (:set (inst 1) 1)) ;; @syntax (:wait []) ;; @param an instance of Event ;; @param the maximum number of milliseconds to wait ;;

Blocks until is signaled or , if present, expires. ;; Returns true when exiting normally, nil if wait times out.

(define (Event:wait inst timeout) (unless (:signaled? inst) (MP:wait (fn () (:signaled? inst)) 50 500 timeout))) ;;;============================================================================= ;;; Pipe: a one-way communications pipe. ;;;============================================================================= ;; @syntax (Pipe [ ]) ;; @param an in-channel of an existing pipe ;; @param an out-channel of an existing pipe ;;

A Pipe is a one-way communcations channel. If and are supplied, ;; an existing pipe (such as the result of the pipe function) may be used.

(define (Pipe:Pipe in out) (unless (and in out) (map set '(in out) (pipe))) (list (context) in out)) (define (Pipe:in inst) (inst 1)) (define (Pipe:out inst) (inst 2)) ;; @syntax (:send ) ;; @param an instance of Pipe ;; @param the message to send ;;

Sends a message along the Pipe. Returns the number of bytes sent ;; (including message encoding).

;; @example ;; (setf p (Pipe)) ;; (:send p "Hello world.") (define (Pipe:send inst msg , expr) (setf expr msg) (setf msg (source 'expr)) (write-buffer (:out inst) (string "" msg ""))) ;; @syntax (:peek ) ;; @param an instance of Pipe ;;

Returns the number of bytes ready for reading on Pipe. This does not ;; correspond directly with the size of the message (extra data is send with ;; the message). (define (Pipe:peek inst) (peek (:in inst))) ;; @syntax (:has-messages? ) ;; @param an instance of Pipe ;;

Returns true if there is a message ready to be read from the Pipe.

(define (Pipe:has-messages? inst) (not (zero? (:peek inst)))) ;; @syntax (:receive []) ;; @param an instance of Pipe ;; @param when true, blocks until a message is available on the Pipe ;;

Returns the next message on the Pipe. By default, blocks until the next ;; message is available.

(define (Pipe:receive inst (block true) , msg buf has-messages expr) (setf has-messages (:has-messages? inst)) (when (or has-messages ; there is a message waiting (and block (not has-messages))) ; no messages but we will wait (setf msg "") (until (ends-with msg "
") (read-buffer (:in inst) buf 4096 "
") (write-buffer msg buf)) (setf msg (slice msg 5 (- (length msg) 5 6))) ; eval msg in 'MAIN to work-around source/eval-string bug with FOOP contexts (context 'MAIN) (eval-string msg) (context 'Pipe) expr)) ;; @syntax (:close ) ;; @param an instance of Pipe ;;

Closes the read and write handles for this Pipe.

(define (Pipe:close inst) (close (inst 1)) (close (inst 2))) ;;;============================================================================= ;;; Channel: a two-way communcations channel using Pipes. ;;;============================================================================= ;; @syntax (Channel) ;;

Creates a two-way communcations channel using Pipes. Channels have two ;; Pipes, a parent and a child, each of which may be given to separate processes ;; to communicate back and forth using the standard Pipe syntax.

;; @example ;; (setf ch (Channel)) ;; (map set '(parent child) (:pipes ch)) ;; ;; ; in parent, send a message ;; (:send parent "Hello child.") ;; ;; ; fork child, receive the message, and send a response ;; (fork ;; (begin ;; (println "Child received: " (:receive child)) ;; (:send child "Hello yourself!"))) ;; ;; ; in the parent process, block until a response becomes available ;; (setf resp (:receive parent)) ;; (println "Parent received: " resp) (define (Channel:Channel , parent child) (setf parent (pipe)) (setf child (pipe)) (list (context) (Pipe (parent 0) (child 1)) (Pipe (child 0) (parent 1)))) ;; @syntax (:pipes ) ;; @param an instance of Channel ;;

Returns a list of the parent and child pipes. Equivalent to ;; (list (:parent inst) (:child inst)).

(define (Channel:pipes inst) (list (:parent inst) (:child inst))) ;; @syntax (:parent ) ;; @param an instance of Channel ;;

Returns the parent Pipe.

(define (Channel:parent inst) (inst 1)) ;; @syntax (:child ) ;; @param an instance of Channel ;;

Returns the child Pipe.

(define (Channel:child inst) (inst 2)) ;;;============================================================================= ;;; Queue: a synchronized FIFO queue. Objects may be of any size (share is not ;;; used). ;;;============================================================================= ;; @syntax (Queue []) ;; @param the maximum size for the queue (no max if nil) ;;

A Queue is a synchronized first in, first out list of items that is safe ;; for use in multiple processes. Object size is not restricted as when using ;; a shared page of memory. A Queue must be closed when no longer needed using ;; the :close method.

(define (Queue:Queue size) (list (context) (Pipe) (Semaphore) (when size (Semaphore size)))) (define (Queue:comm inst) (inst 1)) (define (Queue:lock inst) (inst 2)) (define (Queue:counter inst) (inst 3)) ;; @syntax (:count ) ;; @param an instance of Queue ;;

Returns the number of items currently in the queue.

(define (Queue:count inst) (:count (:counter inst))) (define (Queue:inc inst) "Records an increase in the size of the queue." (when (:counter inst) (:dec (:counter inst)))) (define (Queue:dec inst) "Records a decrease in the size of the queue." (when (:counter inst) (:inc (:counter inst)))) ;; @syntax (:put []) ;; @param an instance of Queue ;; @param the object to be added ;; @param when true, blocks until space is available in the queue ;;

Adds to the Queue, blocking by default. Returns true when the item ;; was added. If is nil, returns nil when the Queue is full.

;; @example ;; (setf q (Queue 4)) ;; (dotimes (i 5) ;; (if (:put q i nil) (print i))) ;; => 0123 (define (Queue:put inst expr (block true)) (when (or block (not (zero? (:count inst)))) (:inc inst) (:send (:comm inst) expr) true)) ;; @syntax (:get []) ;; @param an instance of Queue ;; @param when true, blocks until an item is available from the queue ;;

Pulls the next item off of the Queue. If is nil, returns nil when ;; no item is available. Otherwise, blocks until one becomes available.

(define (Queue:get inst (block true) , msg) (MP:with-lock-held (:lock inst) (setf msg (:receive (:comm inst) block))) (when msg (:dec inst) msg)) ;; @syntax (:close ) ;; @param an instance of Queue ;;

Closes the Queue and removes its temporary files.

(define (Queue:close inst) (:close (:comm inst)))