From 0098caf510e6241982fe3c04068d8fa58d839fec Mon Sep 17 00:00:00 2001 From: seki Date: Sun, 27 Jul 2008 00:04:38 +0000 Subject: merged from 1.8 git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@18228 b2dd03c8-39d4-4d8f-98ff-823fe69b080e --- lib/rinda/tuplespace.rb | 111 +++++++++++++++++++++++++++++++++++------------- 1 file changed, 82 insertions(+), 29 deletions(-) (limited to 'lib') diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb index 73e79bb401..b0409dde3b 100644 --- a/lib/rinda/tuplespace.rb +++ b/lib/rinda/tuplespace.rb @@ -2,6 +2,8 @@ require 'monitor' require 'thread' require 'drb/drb' require 'rinda/rinda' +require 'enumerator' +require 'forwardable' module Rinda @@ -286,45 +288,70 @@ module Rinda # of Tuplespace. class TupleBag + class TupleBin + extend Forwardable + def_delegators '@bin', :find_all, :delete_if, :each, :empty? + + def initialize + @bin = [] + end + + def add(tuple) + @bin.push(tuple) + end + + def delete(tuple) + idx = @bin.rindex(tuple) + @bin.delete_at(idx) if idx + end + + def find(&blk) + @bin.reverse_each do |x| + return x if yield(x) + end + nil + end + end def initialize # :nodoc: @hash = {} + @enum = Enumerable::Enumerator.new(self, :each_entry) end ## # +true+ if the TupleBag to see if it has any expired entries. def has_expires? - @hash.each do |k, v| - v.each do |tuple| - return true if tuple.expires - end + @enum.find do |tuple| + tuple.expires end - false end ## - # Add +ary+ to the TupleBag. + # Add +tuple+ to the TupleBag. - def push(ary) - size = ary.size - @hash[size] ||= [] - @hash[size].push(ary) + def push(tuple) + key = bin_key(tuple) + @hash[key] ||= TupleBin.new + @hash[key].add(tuple) end ## - # Removes +ary+ from the TupleBag. + # Removes +tuple+ from the TupleBag. - def delete(ary) - size = ary.size - @hash.fetch(size, []).delete(ary) + def delete(tuple) + key = bin_key(tuple) + bin = @hash[key] + return nil unless bin + bin.delete(tuple) + @hash.delete(key) if bin.empty? + tuple end ## # Finds all live tuples that match +template+. - def find_all(template) - @hash.fetch(template.size, []).find_all do |tuple| + bin_for_find(template).find_all do |tuple| tuple.alive? && template.match(tuple) end end @@ -333,7 +360,7 @@ module Rinda # Finds a live tuple that matches +template+. def find(template) - @hash.fetch(template.size, []).find do |tuple| + bin_for_find(template).find do |tuple| tuple.alive? && template.match(tuple) end end @@ -343,7 +370,7 @@ module Rinda # +tuple+ and are alive. def find_all_template(tuple) - @hash.fetch(tuple.size, []).find_all do |template| + @enum.find_all do |template| template.alive? && template.match(tuple) end end @@ -354,20 +381,39 @@ module Rinda def delete_unless_alive deleted = [] - @hash.keys.each do |size| - ary = [] - @hash[size].each do |tuple| + @hash.each do |key, bin| + bin.delete_if do |tuple| if tuple.alive? - ary.push(tuple) + false else deleted.push(tuple) + true end end - @hash[size] = ary end deleted end + private + def each_entry(&blk) + @hash.each do |k, v| + v.each(&blk) + end + end + + def bin_key(tuple) + head = tuple[0] + if head.class == Symbol + return head + else + false + end + end + + def bin_for_find(template) + key = bin_key(template) + key ? @hash.fetch(key, []) : @enum + end end ## @@ -403,8 +449,7 @@ module Rinda # Adds +tuple+ def write(tuple, sec=nil) - entry = TupleEntry.new(tuple, sec) - start_keeper + entry = create_entry(tuple, sec) synchronize do if entry.expired? @read_waiter.find_all_template(entry).each do |template| @@ -414,6 +459,7 @@ module Rinda notify_event('delete', entry.value) else @bag.push(entry) + start_keeper if entry.expires @read_waiter.find_all_template(entry).each do |template| template.read(tuple) end @@ -439,7 +485,6 @@ module Rinda def move(port, tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? - start_keeper synchronize do entry = @bag.find(template) if entry @@ -452,6 +497,7 @@ module Rinda begin @take_waiter.push(template) + start_keeper if template.expires while true raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? @@ -476,7 +522,6 @@ module Rinda def read(tuple, sec=nil) template = WaitTemplateEntry.new(self, tuple, sec) yield(template) if block_given? - start_keeper synchronize do entry = @bag.find(template) return entry.value if entry @@ -484,6 +529,7 @@ module Rinda begin @read_waiter.push(template) + start_keeper if template.expires template.wait raise RequestCanceledError if template.canceled? raise RequestExpiredError if template.expired? @@ -529,6 +575,10 @@ module Rinda private + def create_entry(tuple, sec) + TupleEntry.new(tuple, sec) + end + ## # Removes dead tuples. @@ -566,9 +616,12 @@ module Rinda def start_keeper return if @keeper && @keeper.alive? @keeper = Thread.new do - while need_keeper? - keep_clean + while true sleep(@period) + synchronize do + break unless need_keeper? + keep_clean + end end end end -- cgit v1.2.3