mp.lsp
Module: MP
Author: Jeff Ober
Version: 1.1
Original location: http://static.artfulcode.net/newlisp/mp.lsp
Source: mp.lsp
Package definition: mp.qwerty
Classes for multi-processing and synchronization (requires newlisp 10)
Provides many classes for controlling access to resources as well as utilities for common multi-processing tasks. Requires newlisp 10 and the util module.
Version history
1.1 • MP:iter and MP:map both now check spawn returns for errors and re-throw them • MP:map and MP:iter now block in sleep, rather than sync, which uses *much* less cpu time
1.0 • initial release (replaces locks module)- § -
MP:get-pid
syntax: (MP:get-pid)
Returns the pid of the current process.
example:(MP:get-pid) => 16024- § -
MP:with-lock-held
syntax: (MP:with-lock-held lock expr [expr ...])
parameter: lock - an instance of a locking class with an :acquire and :release method
parameter: expr - one or more expressions to be evaluated
Evaluates one or more expressions with lock acquired. lock may be an instance of any class with an :acquire and :release method. MP:with-lock-held guarantees that the lock will be released, even if an error is thrown during evaluation of the body expressions. Errors thrown will be re-thrown after the lock is released. The value of the expression is the value of the last body form evaluated.
example:(setf lock (Lock)) (MP:with-lock-held lock (do stuff))- § -
MP:wait
syntax: (MP:wait fn-condition int-start int-max [int-timeout])
parameter: fn-condition - a predicate used to test state
parameter: int-start - the initial sleep time (milliseconds)
parameter: int-max - the maximum sleep time (milliseconds)
parameter: int-timeout - the maximum number of milliseconds to wait for fn-condition to return true
Blocks until fn-condition returns true. Wait will poll fn-condition every int-start ms, growing every ten polling cycles up to int-max ms, up to an optional int-timeout ms. Returns true when the polling loop returns normally, nil when int-timeout (if present) was reached. Note that int-timeout will be approximately observed; it is affected by the current interval. If int-start and int-max are equal, no change in the polling interval will take place.
example:; Blocks until 'some-flag is set to true. Polls initially every 50ms, ; increasing to 500ms. After 5 seconds (5000 ms), returns even if ; 'some-flag is nil. (MP:wait (fn () (true? some-flag)) 50 500 5000)- § -
MP:map
syntax: (MP:map fun seq [limit])
parameter: fun - a function to apply to each element of seq
parameter: seq - a list
parameter: limit - the max number of processes to start
parameter: timeout - the (approximate) max time to wait for the process to complete
Maps fun over seq, bounding the number of running processes to limit.
example:(MP:map pow (sequence 0 4) 4) => (0 1 4 9 16)- § -
MP:iter
syntax: (MP:iter fun seq [limit [timeout]])
parameter: fun - a function to apply to each element of seq
parameter: seq - a list
parameter: limit - the max number of processes to start
parameter: timeout - the (approximate) max time to wait for the process to complete
Iterates over seq, applying fun to each element. If limit is specified, will not start more than limit processes. Returns the value of the final iteration.
example:(MP:iter println (sequence 0 4) 4) 0 1 3 2 4- § -
Semaphore
syntax: (Semaphore int-initial)
parameter: int-initial - the initial value of the semaphore
Creates a synchronized counter that cannot drop below zero. Any attempt to do so will block until the counter has been incremented (released). A basic semaphore may count as high or low as desired; this is useful for protecting queues or stacks. By default, Semaphores are initialized with a value of 1.
example:(setf sem (Semaphore)) (setf queue '()) (dotimes (i 10) (push i queue) (:inc sem 1))- § -
:inc
syntax: (:inc inst [int-amount])
parameter: inst - an instance of Semaphore
parameter: int-amount - the amount to increment; default is 1
Increments (releases) the Semaphore by int-amount.
- § -
:dec
syntax: (:dec inst [int-amount])
parameter: inst - an instance of Semaphore
parameter: int-amount - the amount to decrement; default is 1
Decrements (acquires) the Semaphore by int-amount.
- § -
:count
syntax: (:count inst)
parameter: inst - an instance of Semaphore
Returns the current count of the Semaphore.
- § -
:acquire
syntax: (:acquire inst [blocking [int-amount]])
parameter: inst - an instance of Semaphore
parameter: blocking - if true (default is true) blocks if Semaphore is held
parameter: int-amount - the amount by which the Semaphore is to be decremented
Attempts to acquire the Semaphore. If blocking is true, :acquire will block until Semaphore becomes available. If blocking is nil, :acquire will attempt to acquire the Semaphore and return nil immediately if it is unavailable.
- § -
:release
syntax: (:release inst)
parameter: inst - an instance of Semaphore
Releases the Semaphore.
- § -
Share
syntax: (Share)
A Share wraps a single page in memory which may be used to store interger, float, or string values between processes. In order to store compound objects or lists, use source and/or pack to serialize the object first. Access to the Share between different processes must be protected with locking mechanisms.
- § -
:set
syntax: (:set inst value)
parameter: inst - an instance of Share
parameter: value - the new value for the Share
Sets the Share's value to value.
- § -
:get
syntax: (:get inst)
parameter: inst - an instance of Share
Gets the value of the Share.
- § -
Synchronized
syntax: (Synchronized [initial-value])
parameter: initial-value - the initial value, if any
Synchronized wraps a Share and protects access to it with a Semaphore.
- § -
:get
syntax: (:get inst)
parameter: inst - an instance of Synchronized
Gets the current value of the Synchronized instance. Will block if another process is currently getting or setting the value.
- § -
:set
syntax: (:set inst expr)
parameter: inst - an instance of Synchronized
parameter: expr - the new value
Sets the value of the Synchronized share. If the new value is an expression, it will be evaluated with the variable $0 set to the old value of the share. This is necessary to prevent a deadlock when dealing with self-referential values.
example:(setf mem (Synchronized)) (:set mem 10) => 10 (:set mem (+ 10 $0)) => 20- § -
Lock
syntax: (Lock)
A Lock is a binary semaphore (or mutual exclusion lock) that may be set to either 1 (released) or 0 (acquired). It is an error for a process to release a Lock it has not acquired.
- § -
:acquire
syntax: (:acquire inst [blocking])
parameter: inst - an instance of Lock
parameter: blocking - whether to block if the Lock is not available (default is true)
Attempts to acquire the Lock, blocking until it becomes available if blocking is true.
- § -
:release
syntax: (:release inst)
parameter: inst - an instance of Lock
Releases the Lock.
- § -
RLock
syntax: (RLock)
An RLock is a Lock that may be acquired multiple times by the same process. This is useful to lock various inter-dependent functions in the same process with a single lock. Observes the invariant # acquires >= # releases.
- § -
:acquire
syntax: (:acquire inst [blocking])
parameter: inst - an instance of RLock
parameter: blocking - whether to block if the RLock is not available (default is true)
Attempts to acquire the RLock, blocking until it becomes available if blocking is true.
- § -
:release
syntax: (:release inst)
parameter: inst - an instance of RLock
Releases the RLock.
- § -
Event
syntax: (Event)
An Event is a simple mechanism for synchronization. It allows multiple processes to block until a controlling process issues a signal to unblock.
- § -
:reset
syntax: (:reset inst)
parameter: inst - an instance of Event
Resets this Event. Does no checking to see if any processes are waiting on this event. Those processes will remain locked until this Event is signaled again.
- § -
:signaled?
syntax: (:signaled? inst)
parameter: inst - an instance of Event
Returns true if this Event has already been signaled.
- § -
:signal
syntax: (:signal inst)
parameter: inst - an instance of Event
Signals inst and unblocks any processes waiting on this Event.
- § -
:wait
syntax: (:wait inst [int-timeout])
parameter: inst - an instance of Event
parameter: int-timeout - the maximum number of milliseconds to wait
Blocks until inst is signaled or int-timeout, if present, expires. Returns true when exiting normally, nil if wait times out.
- § -
Pipe
syntax: (Pipe [in out])
parameter: in - an in-channel of an existing pipe
parameter: out - an out-channel of an existing pipe
A Pipe is a one-way communcations channel. If in and out are supplied, an existing pipe (such as the result of the pipe function) may be used.
- § -
:send
syntax: (:send inst msg)
parameter: inst - an instance of Pipe
parameter: msg - the message to send
Sends a message along the Pipe. Returns the number of bytes sent (including message encoding).
example:(setf p (Pipe)) (:send p "Hello world.")- § -
:peek
syntax: (:peek inst)
parameter: inst - an instance of Pipe
Returns the number of bytes ready for reading on Pipe. This does not correspond directly with the size of the message (extra data is send with the message).
- § -
:has-messages?
syntax: (:has-messages? inst)
parameter: inst - an instance of Pipe
Returns true if there is a message ready to be read from the Pipe.
- § -
:receive
syntax: (:receive inst [block])
parameter: inst - an instance of Pipe
parameter: block - when true, blocks until a message is available on the Pipe
Returns the next message on the Pipe. By default, blocks until the next message is available.
- § -
:close
syntax: (:close inst)
parameter: inst - an instance of Pipe
Closes the read and write handles for this Pipe.
- § -
Channel
syntax: (Channel)
Creates a two-way communcations channel using Pipes. Channels have two Pipes, a parent and a child, each of which may be given to separate processes to communicate back and forth using the standard Pipe syntax.
example:(setf ch (Channel)) (map set '(parent child) (:pipes ch)) ; in parent, send a message (:send parent "Hello child.") ; fork child, receive the message, and send a response (fork (begin (println "Child received: " (:receive child)) (:send child "Hello yourself!"))) ; in the parent process, block until a response becomes available (setf resp (:receive parent)) (println "Parent received: " resp)- § -
:pipes
syntax: (:pipes inst)
parameter: inst - an instance of Channel
Returns a list of the parent and child pipes. Equivalent to (list (:parent inst) (:child inst)).
- § -
:parent
syntax: (:parent inst)
parameter: inst - an instance of Channel
Returns the parent Pipe.
- § -
:child
syntax: (:child inst)
parameter: inst - an instance of Channel
Returns the child Pipe.
- § -
Queue
syntax: (Queue [size])
parameter: size - the maximum size for the queue (no max if nil)
A Queue is a synchronized first in, first out list of items that is safe for use in multiple processes. Object size is not restricted as when using a shared page of memory. A Queue must be closed when no longer needed using the :close method.
- § -
:count
syntax: (:count inst)
parameter: inst - an instance of Queue
Returns the number of items currently in the queue.
- § -
:put
syntax: (:put inst expr [block])
parameter: inst - an instance of Queue
parameter: expr - the object to be added
parameter: block - when true, blocks until space is available in the queue
Adds expr to the Queue, blocking by default. Returns true when the item was added. If block is nil, returns nil when the Queue is full.
example:(setf q (Queue 4)) (dotimes (i 5) (if (:put q i nil) (print i))) => 0123- § -
:get
syntax: (:get inst [block])
parameter: inst - an instance of Queue
parameter: block - when true, blocks until an item is available from the queue
Pulls the next item off of the Queue. If block is nil, returns nil when no item is available. Otherwise, blocks until one becomes available.
- § -
:close
syntax: (:close inst)
parameter: inst - an instance of Queue
Closes the Queue and removes its temporary files.
- ∂ -
Artful Code
generated with newLISP and newLISPdoc