#
# Jafar Al-Gharaibeh
#
# June/15/2012
#
# updated with Don Ward's thread pool, December 2016
#
package threads
#
# create a communication channel with thread x
#
# Note : chTable is a table of tables of all private comunication
# channels in the aplication that are waiting for the second
# end of the channel to respond and pick the call from the table
#
class Shared_Variable(value)
method lock()
static builtin_lock
initial builtin_lock := ::proc("lock", 0)
return builtin_lock(self)
end
method trylock()
static builtin_trylock
initial builtin_trylock := ::proc("trylock", 0)
return builtin_trylock(self)
end
method unlock()
static builtin_unlock
initial builtin_unlock := ::proc("unlock", 0)
return builtin_unlock(self)
end
initially(v, mtx)
value := v
if \mtx then
::mutex(self, mtx)
else
::mutex(self)
end
procedure channel(x, port)
local ce, T, L, chnl, TP
static chTable, chTableP, chTableN
initial {
chTable := ::mutex(::table())
chTableP := ::mutex(::table())
chTableN := ::mutex(::table())
}
if ::string(x) then { # the name of the channel is what matters
critical chTableN:
if ::member(chTableN, x) then
chnl := chTableN[x]
else
chTableN[x] := chnl := ::condvar([])
return chnl
}
else if /port then { # connect channels based on order they were received
ce := ¤t
critical chTable: {
if not(::member(chTable, ce)) then chTable[ce] := ::table()
if not(::member(chTable, x)) then chTable[x] := ::table()
if \(L := chTable[ce][x]) & chnl := ::get(L) then {
if *L=0 then ::delete(chTable[ce], x)
}
else if \(L := chTable[x][ce]) then
::put(L, chnl := ::condvar([]))
else {
L := []
::put(L, chnl := ::condvar([]))
chTable[x][ce] := L
}
} # critical
return chnl
}
else {
ce := ¤t
critical chTableP: { # connect channels based on the supplied port
if not(::member(chTableP, ce)) then chTableP[ce] := ::table()
if not(::member(chTableP, x)) then chTableP[x] := ::table()
if \(TP := chTableP[ce][x]) & \(chnl := TP[port]) then {
::delete(TP, port)
if *TP=0 then ::delete(chTableP[ce], x)
}
else if \(TP := chTableP[x][ce]) then {
TP[port] := chnl := ::condvar([])
}
else {
TP := ::table()
TP[port] := chnl := ::condvar([])
chTableP[x][ce] := TP
}
} # critical
return chnl
}
end
class Thread(
t, # the thread
work,
cv
)
method reset()
end
method stop()
end
method pause()
end
method run()
::signal(cv)
end
method thread_func()
::wait(cv)
end
initially(work, t)
self.work := work
cv := ::condvar()
if \t then
self.t := t
else
t := thread thread_func()
end
class Task(
nthread, # number of threads asked for
active_threads, # the threads assigned to this task
func, # proc, what I'm supposed to do
reduce_func, # proc, how do I combine results
tttype, # thread task type , repeat, divide, chunk
chunk_size, #
caller,
cv_caller, #
args,
result_list,
done # set when done
)
method do_work(args)
return
end
method exec()
local rslt
#write("args=",image(args))
if runners()<nthread then {
::insert(active_threads, ¤t)
if \func then {
if rslt := func(args) then done := "Yes"
}
else if rslt := do_work(args) then done := "yes"
::delete(active_threads, ¤t)
::put(result_list, rslt)
return \done | *active_threads=0;
}
end
method exec_map()
local rslt, x, rslt_lst:=[], i:=0
# if the task needs more threads and there is more work
if runners()<nthread & *args>0 then {
# each thread does chunk_size work and put it in rslt_list
while i<chunk_size & ::put(rslt_lst, func(get(args)))
if *rslt_lst>0 then # if we have anything to report
return rslt_lst
}
end
method exec_reduce()
end
method signal()
static bsignal
initial bsignal := ::proc("signal", 0)
bsignal(cv_caller)
end
method is_done()
return \done
end
method wait()
static bwait
initial bwait := ::proc("wait", 0)
critical cv_caller: bwait(cv_caller)
end
method runners()
return *active_threads
end
method set_caller(c)
caller := ( \c | ¤t )
end
method set_args(args)
self.args := args
end
method reduce(rslt_lst)
return (\reduce_func)(rslt_lst)
end
method init(func, r_func, args, n:1, chnk_size:1)
self.func := func
self.reduce_func := r_func
self.args := args
n := n<1
nthread := n
cv_caller := ::condvar()
active_threads := ::mutex(::set())
result_list := ::mutex([])
chunk_size := chnk_size
end
initially(func, r_func, ar, n)
init(func, r_func, ar, n)
end
class Threads(
master, # "the house keeper thread"
thread_ready, # a list of all ready threads
thread_active,# a list of all ready threads
task_ready, # the list of tasks waiting to be processed
task_active, # the list of tasks being processed
cv_work, # condition variable that all threads on
# thread_pool wait on when there is no
# work to do.
cv_master,
actual_work, # points to a procedure (work) that the
# thread are supposed to do.
done
)
# The work that the thread has to do.
# This function can be implemented by the subclass
# or actual_work can be pointed to a procedure that
# does the work
method do_work()
actual_work()
end
method thread_func()
local tsk, rslt, rslt_lst, cur_thread := ¤t
while /done do {
if tsk := ::pop(task_ready) then {
::insert(tsk.active_threads, ¤t)
if tsk.runners()+1<tsk.nthread then
::push(task_ready, tsk)
::delete(thread_ready, cur_thread)
::insert(thread_active,cur_thread)
#write("thread is execing task")
if rslt_lst := tsk.exec_map() then
::put(tsk.result_list, tsk.reduce_func(rslt_lst))
#write("thread done execing task")
::delete(thread_active, cur_thread)
::insert(thread_ready, cur_thread)
critical task_ready: {
if *tsk.args>0 & tsk~===task_ready[1] then
::push(task_ready, tsk)
}
::delete(tsk.active_threads, ¤t)
if tsk.runners()=*tsk.args=0 then tsk.signal()
}
if *task_ready=0 then
critical cv_work: while *task_ready=0 do
::wait(cv_work)
}
end
# The master thread routine.
method master_func()
while /done do {
if *task_ready>0 & *thread_ready>0 then
::signal(cv_work)
else
::delay(1000)
#write("master")
}
end
method house_keeping()
end
method submit_async(tsk, r_func, args, n)
if (::type(tsk)=="procedure") then {
tsk := Task(tsk, r_func, args, n)
}
else {
tsk.set_args(args)
tsk.nthread := n
}
tsk.set_caller()
critical task_ready: ::put(task_ready, tsk)
::signal(cv_work, tsk.nthread)
end
method submit_sync(tsk, r_func, args, n)
if (::type(tsk)=="procedure") then {
tsk := Task(tsk, r_func, args, n)
}
else {
tsk.set_args(args)
tsk.nthread := n
}
tsk.set_caller()
critical task_ready: ::put(task_ready, tsk)
::signal(cv_work, tsk.nthread)
#write("waiting to finish...")
tsk.wait()
return tsk.reduce(tsk.result_list)
end
method map_reduce(tsk, r_func, args, n)
if (::type(tsk)=="procedure") then {
tsk := Task(tsk, r_func, args, n)
}
else {
tsk.set_args(args)
tsk.nthread := n
}
tsk.set_caller()
critical task_ready: ::put(task_ready, tsk)
::signal(cv_work, tsk.nthread)
#write("waiting to finish...")
tsk.wait()
return tsk.reduce(tsk.result_list)
end
method shutdown()
done := "force"
end
initially(n)
cv_work := ::condvar()
cv_master := ::condvar()
if /n | n<1 then n := 8
thread_ready := ::mutex(set())
thread_active := ::mutex(set())
task_ready := ::mutex([ ])
task_active := ::mutex([ ])
every 1 to n do ::insert(thread_ready, thread thread_func())
master := thread master_func()
#Threads := self # singleton class ?
end
#################################################################################
#
# A simple pool of worker threads
#
# Don Ward
# March 2015
#
#--------------------------------------------------------------------------------
# Typical usage
# MakePool(n) Create a pool of n worker threads (default cores+2).
# Dispatch(proc, params ...) Post a task to be executed in parallel; a worker thread calls proc(params ...)
# IsIdle() Succeeds if no workers are active and no tasks are waiting.
# ClosePool() Shuts down the pool after any remaining tasks have been performed.
#
#--------------------------------------------------------------------------------
global ToDo # A List of tasks to execute. Each task is itself a list where
# the first element is a procedure to call and the rest are its parameters.
global Idlers # A List of waiting worker threads
global Work # A mutex protecting the above
global Workforce # The collection of all worker threads.
#--------------------------------------------------------------------------------
# Construct a pool of n worker threads.
procedure MakePool(n: integer: 0)
local id
initial {
Work := ::mutex()
ToDo := ::mutex([], Work)
Idlers := ::mutex([], Work)
Workforce := []
}
if n <= 0 then { # Use default no of threads, which is 2 + number of cores
&features ? { ="CPU cores " & n := 2 + ::tab(0) }
}
# Create the requested number of workers and tell each one their thread id
while 0 <= (n-:= 1) do { ::put(Workforce, id := ( thread worker(<<@)) ); id @>> id }
return #success
end
#--------------------------------------------------------------------------------
# dummy procedure used to request thread (self) termination.
# NB. If called directly, causes an "emergency stop"
procedure stopWork(reason: string: "")
::stop("Emergency Exit: ", reason )
end
#--------------------------------------------------------------------------------
# work thread: repeatedly get a task, which is a list [proc, param1, param2 ....]
# and call the procedure with the supplied parameters.
procedure worker(MyId)
local task, proc
if ::type(MyId) ~== "thread" then stopWork("Invalid thread Id")
repeat {
::lock(Work)
if 0 = *ToDo then { # Nothing in queue, wait for work to arrive
::push(Idlers, MyId) # Indicate availability for work
::unlock(Work)
task := <<@ # Wait for work to arrive
} else {
task := ::get(ToDo) # Remove the next task from the queue
::unlock(Work)
}
proc := ::pop(task) # Recover the procedure placed by Dispatch()
if (proc === stopWork) then { return } else { proc ! task }
}
end
#--------------------------------------------------------------------------------
# Add a task (described by a procedure plus parameters) to the list of tasks
# to be executed by (one of) the pool of worker threads.
procedure Dispatch(p, args[])
local worker
if ::type(p) == "procedure" then {
::push(args, p) # Add the procedure to call to the front of the argument list
::lock(Work)
if 0 < *Idlers then { # A worker thread is available
worker := ::pop(Idlers);
::unlock(Work)
args @>> worker # send the task to the worker
} else { # No worker available; queue task for later.
::put(ToDo, args)
::unlock(Work)
}
return # success
}
fail
end
#--------------------------------------------------------------------------------
# Fail if there is work in progress, or waiting to be done.
procedure IsIdle()
::lock(Work)
if ((*Idlers ~= *Workforce) | (*ToDo > 0)) then { ::unlock(Work); fail }
::unlock(Work)
return # success
end
#--------------------------------------------------------------------------------
# Shut up shop. Remaining work placed in the queue before the call of ClosePool
# will be executed before closure. Fails if all work has not been done.
procedure ClosePool()
local n
every n := 1 to *Workforce do Dispatch(stopWork)
every ::wait(!Workforce)
# At this point, Idlers must be empty.
if *Idlers > 0 then ::stop("(ClosePool) Idlers queue not empty")
Workforce := []
# There ought to be no work to do, unless Dispatch() has been called in parallel with ClosePool()
if *ToDo > 0 then {
::write(&errout, "Warning (ClosePool): Some work is left in the queue")
ToDo := []
fail
}
return #success
end
This page produced by UniDoc on 2021/04/15 @ 23:59:43.