Source file thread.icn
#
# Jafar Al-Gharaibeh
#
#  June/15/2012
#
# updated with Don Ward's thread pool, December 2016
#

package threads


#
# create a communication channel with thread x
#
# Note : chTable is a table of tables of all private comunication
#        channels in the aplication that are waiting for the second
#        end of the channel to respond and pick the call from the table
#

class Shared_Variable(value)
   method lock()
      static builtin_lock
      initial builtin_lock := ::proc("lock", 0)

      return builtin_lock(self)
   end

   method trylock()
      static builtin_trylock
      initial builtin_trylock := ::proc("trylock", 0)

      return builtin_trylock(self)
   end

   method unlock()
      static builtin_unlock
      initial builtin_unlock := ::proc("unlock", 0)

      return builtin_unlock(self)
   end

   initially(v, mtx)

      value := v

      if \mtx then
         ::mutex(self, mtx)
      else
         ::mutex(self)
end


procedure channel(x, port)
   local ce, T, L, chnl, TP
   static chTable, chTableP, chTableN
   initial {
      chTable  := ::mutex(::table())
      chTableP := ::mutex(::table())
      chTableN := ::mutex(::table())
      }

   if ::string(x) then { # the name of the channel is what matters
       critical chTableN:
          if ::member(chTableN, x) then
             chnl :=  chTableN[x]
          else
	     chTableN[x] := chnl := ::condvar([])

       return chnl
      }
   else if /port then {   # connect channels based on order they were received
      ce := &current
      critical chTable: {
         if not(::member(chTable, ce)) then chTable[ce] := ::table()
         if not(::member(chTable, x)) then chTable[x] := ::table()

	 if \(L := chTable[ce][x]) & chnl := ::get(L) then {
	    if *L=0 then ::delete(chTable[ce], x)
	    }
	 else if \(L := chTable[x][ce]) then
	       ::put(L, chnl := ::condvar([]))
         else {
	    L := []
            ::put(L, chnl := ::condvar([]))
            chTable[x][ce] := L
	    }
         } # critical
      return chnl
      }
   else {
      ce := &current
      critical chTableP: {    # connect channels based on the supplied port
         if not(::member(chTableP, ce)) then chTableP[ce] := ::table()
         if not(::member(chTableP, x)) then chTableP[x] := ::table()

	 if \(TP := chTableP[ce][x]) & \(chnl := TP[port]) then {
	    ::delete(TP, port)
	    if *TP=0 then ::delete(chTableP[ce], x)
	    }
	 else if \(TP := chTableP[x][ce]) then {
	    TP[port] := chnl := ::condvar([])
            }
         else {
	    TP := ::table()
            TP[port] := chnl := ::condvar([])
            chTableP[x][ce] := TP
	    }
         } # critical
      return chnl
     }
end

class Thread(
   t, # the thread
   work,
   cv
   )
  method reset()
  end
  method stop()
  end

  method pause()
  end

  method run()
   ::signal(cv)
  end
  method thread_func()
    ::wait(cv)
  end

initially(work, t)
   self.work := work
   cv := ::condvar()
   if \t then
      self.t := t
   else
      t := thread thread_func()
end

class Task(
  nthread, 		# number of threads asked for
  active_threads, 	# the threads assigned to this task
  func,    		# proc, what I'm supposed to do
  reduce_func,    		# proc, how do I combine results
  tttype,    		# thread task type , repeat, divide, chunk
  chunk_size, 		#
  caller,
  cv_caller,		#
  args,
  result_list,
  done 			# set when done
  )

  method do_work(args)
     return
  end

  method exec()
    local rslt
    #write("args=",image(args))
    if runners()<nthread then {
       ::insert(active_threads, &current)
       if \func then {
          if rslt := func(args) then done := "Yes"
	}
       else if rslt := do_work(args) then done := "yes"
       ::delete(active_threads, &current)
       ::put(result_list, rslt)
       return \done | *active_threads=0;
       }
  end

  method exec_map()
    local rslt, x, rslt_lst:=[], i:=0

    # if the task needs more threads and there is more work
    if runners()<nthread & *args>0 then {
       # each thread does chunk_size work and put it in rslt_list
       while i<chunk_size & ::put(rslt_lst, func(get(args)))
       if *rslt_lst>0 then # if we have anything to report
          return rslt_lst
       }
  end

  method exec_reduce()

  end

  method signal()
     static bsignal
     initial bsignal := ::proc("signal", 0)
     bsignal(cv_caller)
  end

  method is_done()
     return \done
  end

  method wait()
     static bwait
     initial bwait := ::proc("wait", 0)
     critical cv_caller: bwait(cv_caller)
  end

  method runners()
     return *active_threads
  end

  method set_caller(c)
     caller := ( \c | &current )
  end

  method set_args(args)
     self.args := args
  end

  method reduce(rslt_lst)
     return (\reduce_func)(rslt_lst)
  end

  method init(func, r_func, args, n:1, chnk_size:1)
     self.func := func
     self.reduce_func := r_func
     self.args := args
     n := n<1
     nthread := n
     cv_caller := ::condvar()
     active_threads := ::mutex(::set())
     result_list := ::mutex([])
     chunk_size := chnk_size
   end

  initially(func, r_func, ar, n)

     init(func, r_func, ar, n)

end

class Threads(
 master,       # "the house keeper thread"
 thread_ready, # a list of all ready threads
 thread_active,# a list of all ready threads
 task_ready,   # the list of tasks waiting to be processed
 task_active,  # the list of tasks being processed
 cv_work,      # condition variable that all threads on
               # thread_pool wait on when there is no
               # work to do.
 cv_master,
 actual_work,  # points to a procedure (work) that the
               # thread are supposed to do.
 done
)

# The work that the thread has to do.
# This function can be implemented by the subclass
# or actual_work can be pointed to a procedure that
# does the work
method do_work()
   actual_work()
end

method thread_func()
   local tsk, rslt, rslt_lst, cur_thread := &current
   while /done do {
      if tsk := ::pop(task_ready) then {
         ::insert(tsk.active_threads, &current)
         if tsk.runners()+1<tsk.nthread then
            ::push(task_ready, tsk)

         ::delete(thread_ready, cur_thread)
         ::insert(thread_active,cur_thread)

         #write("thread is execing task")
	 if rslt_lst := tsk.exec_map() then
	    ::put(tsk.result_list, tsk.reduce_func(rslt_lst))
         #write("thread done execing task")

         ::delete(thread_active, cur_thread)
         ::insert(thread_ready,  cur_thread)

         critical task_ready: {
	    if *tsk.args>0 & tsk~===task_ready[1] then
            ::push(task_ready, tsk)
            }

         ::delete(tsk.active_threads, &current)
         if tsk.runners()=*tsk.args=0 then tsk.signal()
         }

         if *task_ready=0 then
             critical cv_work: while *task_ready=0 do
                            ::wait(cv_work)
     }
end

# The master thread routine.
method master_func()
  while /done do {
      if *task_ready>0 & *thread_ready>0 then
         ::signal(cv_work)
      else
         ::delay(1000)
    #write("master")
   }
end

method house_keeping()
end

method submit_async(tsk, r_func, args, n)
   if (::type(tsk)=="procedure") then {
      tsk := Task(tsk, r_func, args, n)
      }
   else {
    tsk.set_args(args)
    tsk.nthread := n
    }

   tsk.set_caller()
   critical task_ready: ::put(task_ready, tsk)
   ::signal(cv_work, tsk.nthread)
end

method submit_sync(tsk, r_func, args, n)
   if (::type(tsk)=="procedure") then {
      tsk := Task(tsk, r_func, args, n)
      }
   else {
      tsk.set_args(args)
      tsk.nthread := n
      }

   tsk.set_caller()
   critical task_ready: ::put(task_ready, tsk)
   ::signal(cv_work, tsk.nthread)
   #write("waiting to finish...")
   tsk.wait()
   return tsk.reduce(tsk.result_list)
end


method map_reduce(tsk, r_func, args, n)
   if (::type(tsk)=="procedure") then {
      tsk := Task(tsk, r_func, args, n)
      }
   else {
      tsk.set_args(args)
      tsk.nthread := n
      }

   tsk.set_caller()
   critical task_ready: ::put(task_ready, tsk)
   ::signal(cv_work, tsk.nthread)
   #write("waiting to finish...")
   tsk.wait()
   return tsk.reduce(tsk.result_list)
end

method shutdown()
   done := "force"
end

initially(n)
   cv_work := ::condvar()
   cv_master := ::condvar()
   if /n | n<1 then n := 8
   thread_ready := ::mutex(set())
   thread_active := ::mutex(set())
   task_ready := ::mutex([ ])
   task_active := ::mutex([ ])
   every 1 to n do ::insert(thread_ready, thread thread_func())

   master := thread master_func()
   #Threads := self   # singleton class ?
end


#################################################################################
#
#  A simple pool of worker threads
#
#       Don Ward
#       March 2015
#
#--------------------------------------------------------------------------------
#  Typical usage
#     MakePool(n)                Create a pool of n worker threads (default cores+2).
#     Dispatch(proc, params ...) Post a task to be executed in parallel; a worker thread calls proc(params ...)
#     IsIdle()                   Succeeds if no workers are active and no tasks are waiting.
#     ClosePool()                Shuts down the pool after any remaining tasks have been performed.
#
#--------------------------------------------------------------------------------

global ToDo     # A List of tasks to execute. Each task is itself a list where
                # the first element is a procedure to call and the rest are its parameters.
global Idlers   # A List of waiting worker threads
global Work     # A mutex protecting the above

global Workforce    # The collection of all worker threads.

#--------------------------------------------------------------------------------
# Construct a pool of n worker threads.
procedure MakePool(n: integer: 0)
    local id
    initial {
        Work := ::mutex()
        ToDo := ::mutex([], Work)
        Idlers := ::mutex([], Work)
        Workforce := []
    }

    if n <= 0 then { # Use default no of threads, which is 2 + number of cores
        &features ? { ="CPU cores " & n := 2 + ::tab(0) }
    }

    # Create the requested number of workers and tell each one their thread id
    while 0 <= (n-:= 1) do { ::put(Workforce, id := ( thread worker(<<@)) ); id @>> id }

    return #success
end

#--------------------------------------------------------------------------------
# dummy procedure used to request thread (self) termination.
# NB. If called directly, causes an "emergency stop"
procedure stopWork(reason: string: "")
    ::stop("Emergency Exit: ", reason )
end

#--------------------------------------------------------------------------------
# work thread: repeatedly get a task, which is a list [proc, param1, param2 ....]
#              and call the procedure with the supplied parameters.
procedure worker(MyId)
    local task, proc
    if ::type(MyId) ~== "thread" then stopWork("Invalid thread Id")

    repeat {
        ::lock(Work)
        if 0 = *ToDo then {     # Nothing in queue, wait for work to arrive
            ::push(Idlers, MyId)  # Indicate availability for work
            ::unlock(Work)
            task := <<@         # Wait for work to arrive
        } else {
            task := ::get(ToDo)   # Remove the next task from the queue
            ::unlock(Work)
        }
        proc := ::pop(task)       # Recover the procedure placed by Dispatch()
        if (proc === stopWork) then { return } else { proc ! task }
    }
end

#--------------------------------------------------------------------------------
# Add a task (described by a procedure plus parameters) to the list of tasks
# to be executed by (one of) the pool of worker threads.
procedure Dispatch(p, args[])
    local worker
    if ::type(p) == "procedure" then {
        ::push(args, p)           # Add the procedure to call to the front of the argument list
        ::lock(Work)
        if 0 < *Idlers then {   # A worker thread is available
            worker := ::pop(Idlers);
            ::unlock(Work)
            args @>> worker     # send the task to the worker
        } else {                # No worker available; queue task for later.
            ::put(ToDo, args)
            ::unlock(Work)
        }
        return # success
    }

    fail
end

#--------------------------------------------------------------------------------
# Fail if there is work in progress, or waiting to be done.
procedure IsIdle()
    ::lock(Work)
    if ((*Idlers ~= *Workforce) | (*ToDo > 0)) then { ::unlock(Work); fail }

    ::unlock(Work)
    return   # success
end

#--------------------------------------------------------------------------------
# Shut up shop. Remaining work placed in the queue before the call of ClosePool
# will be executed before closure. Fails if all work has not been done.
procedure ClosePool()
    local n
    every n := 1 to *Workforce do Dispatch(stopWork)
    every ::wait(!Workforce)

    # At this point, Idlers must be empty.
    if *Idlers > 0 then ::stop("(ClosePool) Idlers queue not empty")

    Workforce := []

    # There ought to be no work to do, unless Dispatch() has been called in parallel with ClosePool()
    if *ToDo > 0 then {
        ::write(&errout, "Warning (ClosePool): Some work is left in the queue")
        ToDo := []
        fail
    }
    return #success
end

This page produced by UniDoc on 2021/04/15 @ 23:59:43.