PHP 8.2.31
Preview: thread_pool.rb Size: 4.64 KB
//proc/thread-self/root/opt/alt/ruby24/lib64/ruby/gems/2.4.0/gems/rake-12.0.0/lib/rake/thread_pool.rb

require "set"

require "rake/promise"

module Rake

  class ThreadPool # :nodoc: all

    # Creates a ThreadPool object.  The +thread_count+ parameter is the size
    # of the pool.
    def initialize(thread_count)
      @max_active_threads = [thread_count, 0].max
      @threads = Set.new
      @threads_mon = Monitor.new
      @queue = Queue.new
      @join_cond = @threads_mon.new_cond

      @history_start_time = nil
      @history = []
      @history_mon = Monitor.new
      @total_threads_in_play = 0
    end

    # Creates a future executed by the +ThreadPool+.
    #
    # The args are passed to the block when executing (similarly to
    # Thread#new) The return value is an object representing
    # a future which has been created and added to the queue in the
    # pool. Sending #value to the object will sleep the
    # current thread until the future is finished and will return the
    # result (or raise an exception thrown from the future)
    def future(*args, &block)
      promise = Promise.new(args, &block)
      promise.recorder = lambda { |*stats| stat(*stats) }

      @queue.enq promise
      stat :queued, item_id: promise.object_id
      start_thread
      promise
    end

    # Waits until the queue of futures is empty and all threads have exited.
    def join
      @threads_mon.synchronize do
        begin
          stat :joining
          @join_cond.wait unless @threads.empty?
          stat :joined
        rescue Exception => e
          stat :joined
          $stderr.puts e
          $stderr.print "Queue contains #{@queue.size} items. " +
            "Thread pool contains #{@threads.count} threads\n"
          $stderr.print "Current Thread #{Thread.current} status = " +
            "#{Thread.current.status}\n"
          $stderr.puts e.backtrace.join("\n")
          @threads.each do |t|
            $stderr.print "Thread #{t} status = #{t.status}\n"
            $stderr.puts t.backtrace.join("\n")
          end
          raise e
        end
      end
    end

    # Enable the gathering of history events.
    def gather_history          #:nodoc:
      @history_start_time = Time.now if @history_start_time.nil?
    end

    # Return a array of history events for the thread pool.
    #
    # History gathering must be enabled to be able to see the events
    # (see #gather_history). Best to call this when the job is
    # complete (i.e. after ThreadPool#join is called).
    def history                 # :nodoc:
      @history_mon.synchronize { @history.dup }.
        sort_by { |i| i[:time] }.
        each { |i| i[:time] -= @history_start_time }
    end

    # Return a hash of always collected statistics for the thread pool.
    def statistics              #  :nodoc:
      {
        total_threads_in_play: @total_threads_in_play,
        max_active_threads: @max_active_threads,
      }
    end

    private

    # processes one item on the queue. Returns true if there was an
    # item to process, false if there was no item
    def process_queue_item      #:nodoc:
      return false if @queue.empty?

      # Even though we just asked if the queue was empty, it
      # still could have had an item which by this statement
      # is now gone. For this reason we pass true to Queue#deq
      # because we will sleep indefinitely if it is empty.
      promise = @queue.deq(true)
      stat :dequeued, item_id: promise.object_id
      promise.work
      return true

    rescue ThreadError # this means the queue is empty
      false
    end

    def safe_thread_count
      @threads_mon.synchronize do
        @threads.count
      end
    end

    def start_thread # :nodoc:
      @threads_mon.synchronize do
        next unless @threads.count < @max_active_threads

        t = Thread.new do
          begin
            while safe_thread_count <= @max_active_threads
              break unless process_queue_item
            end
          ensure
            @threads_mon.synchronize do
              @threads.delete Thread.current
              stat :ended, thread_count: @threads.count
              @join_cond.broadcast if @threads.empty?
            end
          end
        end

        @threads << t
        stat(
          :spawned,
          new_thread: t.object_id,
          thread_count: @threads.count)
        @total_threads_in_play = @threads.count if
          @threads.count > @total_threads_in_play
      end
    end

    def stat(event, data=nil) # :nodoc:
      return if @history_start_time.nil?
      info = {
        event: event,
        data: data,
        time: Time.now,
        thread: Thread.current.object_id,
      }
      @history_mon.synchronize { @history << info }
    end

    # for testing only

    def __queue__ # :nodoc:
      @queue
    end
  end

end

Directory Contents

Dirs: 2 × Files: 39

Name Size Perms Modified Actions
ext DIR
- drwxr-xr-x 2024-03-03 22:50:40
Edit Download
loaders DIR
- drwxr-xr-x 2024-03-03 22:50:40
Edit Download
23.07 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
866 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
2.00 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
418 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
2.29 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
235 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
5.45 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
340 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
670 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
12.41 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
1.26 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
3.73 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
4.05 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
1.16 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
431 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
265 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
2.74 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
1.49 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
672 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
5.54 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
351 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
364 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
2.25 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
375 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
770 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
340 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
352 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
869 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
11.38 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
135 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
2.38 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
119 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
8.73 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
5.95 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
1.10 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
4.64 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download
545 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
178 B lrw-r--r-- 2020-03-31 11:42:20
Edit Download
1.49 KB lrw-r--r-- 2020-03-31 11:42:20
Edit Download

If ZipArchive is unavailable, a .tar will be created (no compression).