class Thimble::ThimbleQueue

Attributes

size[R]

Public Class Methods

new(size, name) click to toggle source
# File lib/ThimbleQueue.rb, line 9
def initialize(size, name)
  raise ArgumentError.new("make sure there is a size for the queue greater than 1! size received #{size}") unless size >= 1
  @id = Digest::SHA256.digest(rand(10**100).to_s + Time.now.to_i.to_s)
  @name = name
  @size = size
  @mutex = Mutex.new
  @queue = []
  @closed = false
  @close_now = false
  @empty = ConditionVariable.new
  @full = ConditionVariable.new
  @logger = Logger.new(STDOUT)
  @logger.sev_threshold = Logger::UNKNOWN
end

Public Instance Methods

+(other) click to toggle source

Will concatenate an enumerable to the ThimbleQueue @param [Enumerable] @return [ThimbleQueue]

# File lib/ThimbleQueue.rb, line 43
def +(other)
  raise ArgumentError.new("+ requires another Enumerable!") unless other.class < Enumerable
  merged_thimble = ThimbleQueue.new(length + other.length, @name)
  self.each {|item| merged_thimble.push(item)}
  other.each {|item| merged_thimble.push(item)}
  merged_thimble
end
close(now = false) click to toggle source

Closes the ThibleQueue @param [TrueClass, FalseClass] @return [nil]

# File lib/ThimbleQueue.rb, line 110
def close(now = false)
  raise ArgumentError.new("now must be true or false") unless (now == true || now == false)
  @logger.debug("#{@name} is closing")
  @mutex.synchronize do
    @closed = true
    @close_now = true if now
    @full.broadcast
    @empty.broadcast
  end
  @logger.debug("#{@name} is closed: #{@closed} now: #{@close_now}")
end
closed?() click to toggle source

checks if the ThimbleQueue is closed @return [TrueClass, FalseClass]

# File lib/ThimbleQueue.rb, line 134
def closed?
  @closed
end
each() { |item| ... } click to toggle source
# File lib/ThimbleQueue.rb, line 28
def each
  while item = self.next
    yield item.item
  end
end
length() click to toggle source

Returns the size of the ThimbleQueue @return [Fixnum]

# File lib/ThimbleQueue.rb, line 36
def length
  size
end
next() click to toggle source

Returns the first item in the queue @return [Object]

# File lib/ThimbleQueue.rb, line 53
def next
  @mutex.synchronize  do
    while !@close_now
      a = @queue.shift
      @logger.debug("#{@name}'s queue shifted to: #{a}")
      if !a.nil?
        @full.broadcast
        @empty.broadcast
        return a
      else 
        @logger.debug("#{@name}'s queue is currently closed?: #{closed?}")
        return nil if closed?
        @empty.wait(@mutex)
      end
    end
  end
end
push(x) click to toggle source

This will push whatever it is handed to the queue @param [Object]

# File lib/ThimbleQueue.rb, line 73
def push(x)
  raise RuntimeError.new("Queue is closed!") if @closed
  @logger.debug("Pushing into #{@name} values: #{x}")
  @mutex.synchronize do
    while !offer(x)
      @full.wait(@mutex)
      @logger.debug("#{@name} is waiting on full")
    end
    @empty.broadcast
  end
  @logger.debug("Finished pushing int #{@name}: #{x}")
end
push_flat(x) click to toggle source

This will flatten any nested arrays out and feed them one at a time to the queue. @param [Object, Enumerable] @return [nil]

# File lib/ThimbleQueue.rb, line 90
def push_flat(x)
  raise RuntimeError.new("Queue is closed!") if @closed
  @logger.debug("Pushing flat into #{@name} values: #{x}")
  if x.respond_to? :each
    x.each {|item| push(item)}
  else
    @mutex.synchronize do
      while !offer(x)
        @logger.debug("#{@name} is waiting on full")
        @full.wait(@mutex)
      end
      @empty.broadcast
    end
  end
  @logger.debug("Finished pushing flat into #{@name} values: #{x}")
end
setLogger(level) click to toggle source
# File lib/ThimbleQueue.rb, line 24
def setLogger(level)
  @logger.sev_threshold = level
end
to_a() click to toggle source

Will force the ThimbleQueue into an array @return [Array]

# File lib/ThimbleQueue.rb, line 124
def to_a
  a = []
  while item = self.next
    a << item.item
  end
  a
end

Private Instance Methods

offer(x) click to toggle source
# File lib/ThimbleQueue.rb, line 139
def offer(x)
  if @queue.size < @size
    @queue << QueueItem.new(x)
    @empty.broadcast
    true
  else
    false
  end
end