class Backports::FilteredQueue

Constants

CONSUME_ON_ESCAPE

Attributes

num_waiting[R]

Like ::Queue, but with

  • filtering

  • timeout

  • raises on closed queues

Public Class Methods

new() click to toggle source

Timeout processing based on spin.atomicobject.com/2017/06/28/queue-pop-with-timeout-fixed/

# File lib/backports/tools/filtered_queue.rb, line 31
def initialize
  @mutex = ::Mutex.new
  @queue = []
  @closed = false
  @received = ::ConditionVariable.new
  @num_waiting = 0
end

Public Instance Methods

<<(x) click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 51
def <<(x)
  @mutex.synchronize do
    ensure_open
    @queue << Message.new(x)
    @received.signal
  end
  self
end
Also aliased as: push
clear() click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 61
def clear
  @mutex.synchronize do
    @queue.clear
  end
  self
end
close() click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 39
def close
  @mutex.synchronize do
    @closed = true
    @received.broadcast
  end
  self
end
closed?() click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 47
def closed?
  @closed
end
empty?() click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 82
def empty?
  avail = @mutex.synchronize do
    available!
  end

  !avail
end
pop(timeout: nil, &block) click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 68
def pop(timeout: nil, &block)
  msg = nil
  exclude = [] if block # exclusion list of messages rejected by this call
  timeout_time = timeout + Time.now.to_f if timeout
  while true do
    @mutex.synchronize do
      reenter if reentrant?
      msg = acquire!(timeout_time, exclude)
      return consume!(msg).value unless block
    end
    return msg.value if filter?(msg, &block)
  end
end
push(x)
Alias for: <<

Protected Instance Methods

closed_queue_value() click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 94
          def closed_queue_value
  ensure_open
end
reenter() click to toggle source

@return if outer message should be consumed or not

# File lib/backports/tools/filtered_queue.rb, line 99
          def reenter
  true
end
timeout_value() click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 90
          def timeout_value
  raise self.class::TimeoutError, "timeout elapsed"
end

Private Instance Methods

acquire!(timeout_time, exclude = nil) click to toggle source

private methods assume @mutex synchonized adds to exclude list

# File lib/backports/tools/filtered_queue.rb, line 169
        def acquire!(timeout_time, exclude = nil)
  while true do
    if (msg = available!(exclude))
      msg.reserved = true
      exclude << msg if exclude
      return msg
    end
    return closed_queue_value if @closed
    # wait for element or timeout
    if timeout_time
      remaining_time = timeout_time - ::Time.now.to_f
      return timeout_value if remaining_time <= 0
    end
    begin
      @num_waiting += 1
      @received.wait(@mutex, remaining_time)
    ensure
      @num_waiting -= 1
    end
  end
end
available!(exclude = nil) click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 191
        def available!(exclude = nil)
  @queue.find do |msg|
    next if exclude && exclude.include?(msg)
    !msg.reserved
  end
end
commit(msg, consume) click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 132
        def commit(msg, consume)
  @mutex.synchronize do
    if consume
      consume!(msg)
    else
      reject!(msg)
    end
  end
end
consume!(msg) click to toggle source

@returns msg

# File lib/backports/tools/filtered_queue.rb, line 123
        def consume!(msg)
  @queue.delete(msg)
end
consume_on_reentry(msg) { || ... } click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 142
        def consume_on_reentry(msg)
  q_map = current_filtered_queues
  if (outer_msg = q_map[self])
    commit(outer_msg, reenter)
  end
  q_map[self] = msg
  begin
    yield
  ensure
    reentered = !q_map.delete(self)
  end
  reentered
end
current_filtered_queues() click to toggle source

@returns Hash { FilteredQueue => Message }

# File lib/backports/tools/filtered_queue.rb, line 161
        def current_filtered_queues
  t = Thread.current
  t.thread_variable_get(:backports_currently_filtered_queues) or
    t.thread_variable_set(:backports_currently_filtered_queues, {}.compare_by_identity)
end
ensure_open() click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 198
        def ensure_open
  raise self.class::ClosedQueueError, 'queue closed' if @closed
end
filter?(msg) { |value)| ... } click to toggle source

@returns:

  • true if message consumed (block result truthy or due to reentrant call)

  • false if rejected

# File lib/backports/tools/filtered_queue.rb, line 110
        def filter?(msg)
  consume = self.class::CONSUME_ON_ESCAPE
  begin
    reentered = consume_on_reentry(msg) do
      consume = !!(yield msg.value)
    end
    reentered ? reenter : consume
  ensure
    commit(msg, consume) unless reentered
  end
end
reentrant?() click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 156
        def reentrant?
  !!current_filtered_queues[self]
end
reject!(msg) click to toggle source
# File lib/backports/tools/filtered_queue.rb, line 127
        def reject!(msg)
  msg.reserved = false
  @received.broadcast
end