guile/module/ice-9/threads.scm

370 lines
12 KiB
Scheme
Raw Normal View History

;;;; Copyright (C) 1996, 1998, 2001, 2002, 2003, 2006, 2010, 2011,
;;;; 2012 Free Software Foundation, Inc.
;;;;
;;;; This library is free software; you can redistribute it and/or
;;;; modify it under the terms of the GNU Lesser General Public
;;;; License as published by the Free Software Foundation; either
;;;; version 3 of the License, or (at your option) any later version.
;;;;
;;;; This library is distributed in the hope that it will be useful,
;;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
;;;; Lesser General Public License for more details.
;;;;
;;;; You should have received a copy of the GNU Lesser General Public
;;;; License along with this library; if not, write to the Free Software
2005-05-23 19:57:22 +00:00
;;;; Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
;;;;
;;;; ----------------------------------------------------------------
;;;; threads.scm -- User-level interface to Guile's thread system
;;;; 4 March 1996, Anthony Green <green@cygnus.com>
;;;; Modified 5 October 1996, MDJ <djurfeldt@nada.kth.se>
;;;; Modified 6 April 2001, ttn
;;;; ----------------------------------------------------------------
;;;;
2001-04-28 18:54:16 +00:00
;;; Commentary:
;; This module is documented in the Guile Reference Manual.
;;; Code:
* lib.scm: Move module the system directives `export', `export-syntax', `re-export' and `re-export-syntax' into the `define-module' form. This is the recommended way of exporting bindings. * srfi-2.scm, srfi-4.scm, srfi-8.scm, srfi-9.scm, srfi-10.scm, srfi-11.scm, srfi-14.scm, srfi-16.scm: Move module the system directives `export', `export-syntax', `re-export' and `re-export-syntax' into the `define-module' form. This is the recommended way of exporting bindings. * goops.scm, goops/active-slot.scm, goops/compile.scm, goops/composite-slot.scm, goops/describe.scm, goops/dispatch.scm, goops/old-define-method.scm, goops/save.scm, goops/util.scm: Move module the system directives `export', `export-syntax', `re-export' and `re-export-syntax' into the `define-module' form. This is the recommended way of exporting bindings. * slib.scm (array-indexes): New procedure. (*features*): Extend. (Probably some of these options should be set elsewhere.) (Thanks to Aubrey Jaffer.) * and-let-star-compat.scm, and-let-star.scm, calling.scm, channel.scm, common-list.scm, debug.scm, debugger.scm, expect.scm, hcons.scm, lineio.scm, ls.scm, mapping.scm, null.scm, optargs.scm, poe.scm, popen.scm, pretty-print.scm, q.scm, r5rs.scm, rdelim.scm, regex.scm, runq.scm, safe-r5rs.scm, safe.scm, session.scm, slib.scm, streams.scm, string-fun.scm, syncase.scm, threads.scm: Move module the system directives `export', `export-syntax', `re-export' and `re-export-syntax' into the `define-module' form. This is the recommended way of exporting bindings.
2001-10-21 09:49:19 +00:00
(define-module (ice-9 threads)
#:use-module (ice-9 match)
#:use-module (ice-9 control)
Move thread bindings to (ice-9 threads) * libguile/init.c (scm_i_init_guile): Don't call scm_init_thread_procs. * libguile/threads.c (scm_init_ice_9_threads): Rename from scm_init_thread_procs, make static. (scm_init_threads): Register scm_init_thread_procs extension. * libguile/threads.h (scm_init_thread_procs): Remove decl. * module/ice-9/boot-9.scm: Load (ice-9 threads), so that related side effects occur early. * module/ice-9/deprecated.scm (define-deprecated): Fix to allow deprecated bindings to appear in operator position. Export deprecated bindings. (define-deprecated/threads, define-deprecated/threads*): Trampoline thread bindings to (ice-9 threads). * module/ice-9/futures.scm: Use ice-9 threads. * module/ice-9/threads.scm: Load scm_init_ice_9_threads extension. Reorder definitions and imports so that the module circularity with (ice-9 futures) continues to work. * module/language/cps/intmap.scm: * module/language/cps/intset.scm: * module/language/tree-il/primitives.scm: Use (ice-9 threads). * module/language/cps/reify-primitives.scm: Reify current-thread in (ice-9 threads) module. * module/srfi/srfi-18.scm: Use ice-9 threads with a module prefix, and adapt all users. Use proper keywords in module definition form. * test-suite/tests/filesys.test (test-suite): * test-suite/tests/fluids.test (test-suite): * test-suite/tests/srfi-18.test: Use ice-9 threads. * NEWS: Add entry. * doc/ref/api-scheduling.texi (Threads): Update. * doc/ref/posix.texi (Processes): Move current-processor-count and total-processor-count docs to Threads.
2016-10-23 20:28:48 +02:00
;; These bindings are marked as #:replace because when deprecated code
;; is enabled, (ice-9 deprecated) also exports these names.
;; (Referencing one of the deprecated names prints a warning directing
;; the user to these bindings.) Anyway once we can remove the
;; deprecated bindings, we should use #:export instead of #:replace
;; for these.
#:replace (call-with-new-thread
yield
cancel-thread
join-thread
thread?
make-mutex
make-recursive-mutex
lock-mutex
try-mutex
unlock-mutex
mutex?
mutex-owner
mutex-level
mutex-locked?
make-condition-variable
wait-condition-variable
signal-condition-variable
broadcast-condition-variable
condition-variable?
current-thread
all-threads
thread-exited?
total-processor-count
current-processor-count)
#:export (begin-thread
make-thread
with-mutex
monitor
Move thread bindings to (ice-9 threads) * libguile/init.c (scm_i_init_guile): Don't call scm_init_thread_procs. * libguile/threads.c (scm_init_ice_9_threads): Rename from scm_init_thread_procs, make static. (scm_init_threads): Register scm_init_thread_procs extension. * libguile/threads.h (scm_init_thread_procs): Remove decl. * module/ice-9/boot-9.scm: Load (ice-9 threads), so that related side effects occur early. * module/ice-9/deprecated.scm (define-deprecated): Fix to allow deprecated bindings to appear in operator position. Export deprecated bindings. (define-deprecated/threads, define-deprecated/threads*): Trampoline thread bindings to (ice-9 threads). * module/ice-9/futures.scm: Use ice-9 threads. * module/ice-9/threads.scm: Load scm_init_ice_9_threads extension. Reorder definitions and imports so that the module circularity with (ice-9 futures) continues to work. * module/language/cps/intmap.scm: * module/language/cps/intset.scm: * module/language/tree-il/primitives.scm: Use (ice-9 threads). * module/language/cps/reify-primitives.scm: Reify current-thread in (ice-9 threads) module. * module/srfi/srfi-18.scm: Use ice-9 threads with a module prefix, and adapt all users. Use proper keywords in module definition form. * test-suite/tests/filesys.test (test-suite): * test-suite/tests/fluids.test (test-suite): * test-suite/tests/srfi-18.test: Use ice-9 threads. * NEWS: Add entry. * doc/ref/api-scheduling.texi (Threads): Update. * doc/ref/posix.texi (Processes): Move current-processor-count and total-processor-count docs to Threads.
2016-10-23 20:28:48 +02:00
parallel
letpar
par-map
par-for-each
n-par-map
n-par-for-each
n-for-each-par-map
%thread-handler))
Move thread bindings to (ice-9 threads) * libguile/init.c (scm_i_init_guile): Don't call scm_init_thread_procs. * libguile/threads.c (scm_init_ice_9_threads): Rename from scm_init_thread_procs, make static. (scm_init_threads): Register scm_init_thread_procs extension. * libguile/threads.h (scm_init_thread_procs): Remove decl. * module/ice-9/boot-9.scm: Load (ice-9 threads), so that related side effects occur early. * module/ice-9/deprecated.scm (define-deprecated): Fix to allow deprecated bindings to appear in operator position. Export deprecated bindings. (define-deprecated/threads, define-deprecated/threads*): Trampoline thread bindings to (ice-9 threads). * module/ice-9/futures.scm: Use ice-9 threads. * module/ice-9/threads.scm: Load scm_init_ice_9_threads extension. Reorder definitions and imports so that the module circularity with (ice-9 futures) continues to work. * module/language/cps/intmap.scm: * module/language/cps/intset.scm: * module/language/tree-il/primitives.scm: Use (ice-9 threads). * module/language/cps/reify-primitives.scm: Reify current-thread in (ice-9 threads) module. * module/srfi/srfi-18.scm: Use ice-9 threads with a module prefix, and adapt all users. Use proper keywords in module definition form. * test-suite/tests/filesys.test (test-suite): * test-suite/tests/fluids.test (test-suite): * test-suite/tests/srfi-18.test: Use ice-9 threads. * NEWS: Add entry. * doc/ref/api-scheduling.texi (Threads): Update. * doc/ref/posix.texi (Processes): Move current-processor-count and total-processor-count docs to Threads.
2016-10-23 20:28:48 +02:00
;; Note that this extension also defines %make-transcoded-port, which is
;; not exported but is used by (rnrs io ports).
(eval-when (expand eval load)
(load-extension (string-append "libguile-" (effective-version))
"scm_init_ice_9_threads"))
(define-syntax-rule (with-mutex m e0 e1 ...)
(let ((x m))
(dynamic-wind
(lambda () (lock-mutex x))
(lambda () (begin e0 e1 ...))
(lambda () (unlock-mutex x)))))
(define cancel-tag (make-prompt-tag "cancel"))
(define (cancel-thread thread . values)
"Asynchronously interrupt the target @var{thread} and ask it to
terminate, returning the given @var{values}. @code{dynamic-wind} post
thunks will run, but throw handlers will not. If @var{thread} has
already terminated or been signaled to terminate, this function is a
no-op."
(system-async-mark
(lambda ()
(catch #t
(lambda ()
(apply abort-to-prompt cancel-tag values))
(lambda _
(error "thread cancellation failed, throwing error instead???"))))
thread))
(define thread-join-data (make-object-property))
(define %thread-results (make-object-property))
(define* (call-with-new-thread thunk #:optional handler)
"Call @code{thunk} in a new thread and with a new dynamic state,
returning a new thread object representing the thread. The procedure
@var{thunk} is called via @code{with-continuation-barrier}.
When @var{handler} is specified, then @var{thunk} is called from within
a @code{catch} with tag @code{#t} that has @var{handler} as its handler.
This catch is established inside the continuation barrier.
Once @var{thunk} or @var{handler} returns, the return value is made the
@emph{exit value} of the thread and the thread is terminated."
(let ((cv (make-condition-variable))
(mutex (make-mutex))
(thunk (if handler
(lambda () (catch #t thunk handler))
thunk))
(thread #f))
(with-mutex mutex
(%call-with-new-thread
(lambda ()
(call-with-values
(lambda ()
(with-continuation-barrier
(lambda ()
(call-with-prompt cancel-tag
(lambda ()
(lock-mutex mutex)
(set! thread (current-thread))
(set! (thread-join-data thread) (cons cv mutex))
(signal-condition-variable cv)
(unlock-mutex mutex)
(thunk))
(lambda (k . args)
(apply values args))))))
(lambda vals
(lock-mutex mutex)
;; Probably now you're wondering why we are going to use
;; the cond variable as the key into the thread results
;; object property. It's because there is a possibility
;; that the thread object itself ends up as part of the
;; result, and if that happens we create a cycle whereby
;; the strong reference to a thread in the value of the
;; weak-key hash table used by the object property prevents
;; the thread from ever being collected. So instead we use
;; the cv as the key. Weak-key hash tables, amirite?
(set! (%thread-results cv) vals)
(broadcast-condition-variable cv)
(unlock-mutex mutex)
(apply values vals)))))
(let lp ()
(unless thread
(wait-condition-variable cv mutex)
(lp))))
thread))
(define* (join-thread thread #:optional timeout timeoutval)
"Suspend execution of the calling thread until the target @var{thread}
terminates, unless the target @var{thread} has already terminated."
(match (thread-join-data thread)
(#f (error "foreign thread cannot be joined" thread))
((cv . mutex)
(lock-mutex mutex)
(let lp ()
(cond
((%thread-results cv)
=> (lambda (results)
(unlock-mutex mutex)
(apply values results)))
((if timeout
(wait-condition-variable cv mutex timeout)
(wait-condition-variable cv mutex))
(lp))
(else timeoutval))))))
(define* (try-mutex mutex)
"Try to lock @var{mutex}. If the mutex is already locked, return
@code{#f}. Otherwise lock the mutex and return @code{#t}."
(lock-mutex mutex 0))
;;; Macros first, so that the procedures expand correctly.
(define-syntax-rule (begin-thread e0 e1 ...)
(call-with-new-thread
(lambda () e0 e1 ...)
%thread-handler))
(define-syntax-rule (make-thread proc arg ...)
(call-with-new-thread
(lambda () (proc arg ...))
%thread-handler))
(define monitor-mutex-table (make-hash-table))
(define monitor-mutex-table-mutex (make-mutex))
(define (monitor-mutex-with-id id)
(with-mutex monitor-mutex-table-mutex
(or (hashq-ref monitor-mutex-table id)
(let ((mutex (make-mutex)))
(hashq-set! monitor-mutex-table id mutex)
mutex))))
(define-syntax monitor
(lambda (stx)
(syntax-case stx ()
((_ body body* ...)
(let ((id (datum->syntax #'body (gensym))))
#`(with-mutex (monitor-mutex-with-id '#,id)
body body* ...))))))
Move thread bindings to (ice-9 threads) * libguile/init.c (scm_i_init_guile): Don't call scm_init_thread_procs. * libguile/threads.c (scm_init_ice_9_threads): Rename from scm_init_thread_procs, make static. (scm_init_threads): Register scm_init_thread_procs extension. * libguile/threads.h (scm_init_thread_procs): Remove decl. * module/ice-9/boot-9.scm: Load (ice-9 threads), so that related side effects occur early. * module/ice-9/deprecated.scm (define-deprecated): Fix to allow deprecated bindings to appear in operator position. Export deprecated bindings. (define-deprecated/threads, define-deprecated/threads*): Trampoline thread bindings to (ice-9 threads). * module/ice-9/futures.scm: Use ice-9 threads. * module/ice-9/threads.scm: Load scm_init_ice_9_threads extension. Reorder definitions and imports so that the module circularity with (ice-9 futures) continues to work. * module/language/cps/intmap.scm: * module/language/cps/intset.scm: * module/language/tree-il/primitives.scm: Use (ice-9 threads). * module/language/cps/reify-primitives.scm: Reify current-thread in (ice-9 threads) module. * module/srfi/srfi-18.scm: Use ice-9 threads with a module prefix, and adapt all users. Use proper keywords in module definition form. * test-suite/tests/filesys.test (test-suite): * test-suite/tests/fluids.test (test-suite): * test-suite/tests/srfi-18.test: Use ice-9 threads. * NEWS: Add entry. * doc/ref/api-scheduling.texi (Threads): Update. * doc/ref/posix.texi (Processes): Move current-processor-count and total-processor-count docs to Threads.
2016-10-23 20:28:48 +02:00
(define (thread-handler tag . args)
(let ((n (length args))
(p (current-error-port)))
(display "In thread:" p)
(newline p)
(if (>= n 3)
(display-error #f
p
(car args)
(cadr args)
(caddr args)
(if (= n 4)
(cadddr args)
'()))
(begin
(display "uncaught throw to " p)
(display tag p)
(display ": " p)
(display args p)
(newline p)))
#f))
;;; Set system thread handler
(define %thread-handler thread-handler)
(use-modules (ice-9 futures))
(define-syntax parallel
(lambda (x)
(syntax-case x ()
((_ e0 ...)
(with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
#'(let ((tmp0 (future e0))
...)
(values (touch tmp0) ...)))))))
(define-syntax-rule (letpar ((v e) ...) b0 b1 ...)
(call-with-values
(lambda () (parallel e ...))
(lambda (v ...)
b0 b1 ...)))
(define (par-mapper mapper cons)
(lambda (proc . lists)
(let loop ((lists lists))
(match lists
(((heads tails ...) ...)
(let ((tail (future (loop tails)))
(head (apply proc heads)))
(cons head (touch tail))))
(_
'())))))
(define par-map (par-mapper map cons))
(define par-for-each (par-mapper for-each (const *unspecified*)))
(define (n-par-map n proc . arglists)
(let* ((m (make-mutex))
2006-01-29 19:44:45 +00:00
(threads '())
(results (make-list (length (car arglists))))
(result results))
(do ((i 0 (+ 1 i)))
((= i n)
(for-each join-thread threads)
results)
2006-01-29 19:44:45 +00:00
(set! threads
(cons (begin-thread
(let loop ()
(lock-mutex m)
(if (null? result)
(unlock-mutex m)
(let ((args (map car arglists))
(my-result result))
(set! arglists (map cdr arglists))
(set! result (cdr result))
(unlock-mutex m)
(set-car! my-result (apply proc args))
(loop)))))
2006-01-29 19:44:45 +00:00
threads)))))
(define (n-par-for-each n proc . arglists)
(let ((m (make-mutex))
2006-01-29 19:44:45 +00:00
(threads '()))
(do ((i 0 (+ 1 i)))
((= i n)
2006-06-17 22:53:04 +00:00
(for-each join-thread threads))
2006-01-29 19:44:45 +00:00
(set! threads
(cons (begin-thread
(let loop ()
(lock-mutex m)
(if (null? (car arglists))
(unlock-mutex m)
(let ((args (map car arglists)))
(set! arglists (map cdr arglists))
(unlock-mutex m)
(apply proc args)
(loop)))))
2006-01-29 19:44:45 +00:00
threads)))))
;;; The following procedure is motivated by the common and important
2003-04-25 19:48:33 +00:00
;;; case where a lot of work should be done, (not too much) in parallel,
;;; but the results need to be handled serially (for example when
;;; writing them to a file).
;;;
(define (n-for-each-par-map n s-proc p-proc . arglists)
"Using N parallel processes, apply S-PROC in serial order on the results
of applying P-PROC on ARGLISTS."
(let* ((m (make-mutex))
2006-01-29 19:44:45 +00:00
(threads '())
(no-result '(no-value))
(results (make-list (length (car arglists)) no-result))
(result results))
(do ((i 0 (+ 1 i)))
((= i n)
2006-06-17 22:53:04 +00:00
(for-each join-thread threads))
2006-01-29 19:44:45 +00:00
(set! threads
(cons (begin-thread
(let loop ()
(lock-mutex m)
(cond ((null? results)
(unlock-mutex m))
((not (eq? (car results) no-result))
(let ((arg (car results)))
;; stop others from choosing to process results
(set-car! results no-result)
(unlock-mutex m)
(s-proc arg)
(lock-mutex m)
(set! results (cdr results))
(unlock-mutex m)
(loop)))
((null? result)
(unlock-mutex m))
(else
(let ((args (map car arglists))
(my-result result))
(set! arglists (map cdr arglists))
(set! result (cdr result))
(unlock-mutex m)
(set-car! my-result (apply p-proc args))
(loop))))))
2006-01-29 19:44:45 +00:00
threads)))))
;;; threads.scm ends here