diff options
Diffstat (limited to 'lib/rinda')
-rw-r--r-- | lib/rinda/rinda.rb | 129 | ||||
-rw-r--r-- | lib/rinda/ring.rb | 164 | ||||
-rw-r--r-- | lib/rinda/tuplespace.rb | 356 |
3 files changed, 649 insertions, 0 deletions
diff --git a/lib/rinda/rinda.rb b/lib/rinda/rinda.rb new file mode 100644 index 0000000000..72b0e45cdf --- /dev/null +++ b/lib/rinda/rinda.rb @@ -0,0 +1,129 @@ +require 'thread' + +module Rinda + class RequestCanceledError < ThreadError; end + class RequestExpiredError < ThreadError; end + + class Tuple + def initialize(ary_or_hash) + if Hash === ary_or_hash + init_with_hash(ary_or_hash) + else + init_with_ary(ary_or_hash) + end + end + + def size + @tuple.size + end + + def [](k) + @tuple[k] + end + + def each # FIXME + if Hash === @tuple + @tuple.each { |k, v| yield(k, v) } + else + @tuple.each_with_index { |v, k| yield(k, v) } + end + end + + def value + @tuple + end + + private + def init_with_ary(ary) + @tuple_size = ary.size + @tuple = Array.new(@tuple_size) + @tuple.size.times do |i| + @tuple[i] = ary[i] + end + end + + def init_with_hash(hash) + @tuple_size = hash[:size] + @tuple = Hash.new + hash.each do |k, v| + next unless String === k + @tuple[k] = v + end + end + end + + class Template < Tuple + def match(tuple) + return false unless tuple.respond_to?(:size) + return false unless tuple.respond_to?(:[]) + return false if @tuple_size && (@tuple_size != tuple.size) + each do |k, v| + next if v.nil? + return false unless (v === tuple[k] rescue false) + end + return true + end + + def ===(tuple) + match(tuple) + end + end + + class DRbObjectTemplate + def initialize(uri=nil, ref=nil) + @drb_uri = uri + @drb_ref = ref + end + + def ===(ro) + return true if super(ro) + unless @drb_uri.nil? + return false unless (@drb_uri === ro.__drburi rescue false) + end + unless @drb_ref.nil? + return false unless (@drb_ref === ro.__drbref rescue false) + end + true + end + end + + class TupleSpaceProxy + def initialize(ts) + @ts = ts + end + + def write(tuple, sec=nil) + @ts.write(tuple, sec) + end + + def take(tuple, sec=nil, &block) + port = [] + @ts.move(DRbObject.new(port), tuple, sec, &block) + port[0] + end + + def read(tuple, sec=nil) + @ts.read(tuple, sec) + end + + def read_all(tuple) + @ts.read_all + end + + def notify(ev, tuple, sec=nil) + @ts.notify(ev, tuple, sec) + end + end + + class SimpleRenewer + include DRbUndumped + def initialize(sec=180) + @sec = sec + end + + def renew + @sec + end + end +end + diff --git a/lib/rinda/ring.rb b/lib/rinda/ring.rb new file mode 100644 index 0000000000..7152dfb72e --- /dev/null +++ b/lib/rinda/ring.rb @@ -0,0 +1,164 @@ +# +# Note: Rinda::Ring API is unstable. +# +require 'drb/drb' +require 'rinda/rinda' +require 'thread' + +module Rinda + Ring_PORT = 7647 + class RingServer + include DRbUndumped + + def initialize(ts, port=Ring_PORT) + @ts = ts + @soc = UDPSocket.open + @soc.bind('', port) + @w_service = write_service + @r_service = reply_service + end + + def write_service + Thread.new do + loop do + msg, addr = @soc.recvfrom(1024) + do_write(msg) + end + end + end + + def do_write(msg) + Thread.new do + begin + tuple, sec = Marshal.load(msg) + @ts.write(tuple, sec) + rescue + end + end + end + + def reply_service + Thread.new do + loop do + do_reply + end + end + end + + def do_reply + tuple = @ts.take([:lookup_ring, DRbObject]) + Thread.new { tuple[1].call(@ts) rescue nil} + rescue + end + end + + class RingFinger + @@finger = nil + def self.finger + unless @@finger + @@finger = self.new + @@finger.lookup_ring_any + end + @@finger + end + + def self.primary + finger.primary + end + + def self.to_a + finger.to_a + end + + @@broadcast_list = ['<broadcast>', 'localhost'] + def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT) + @broadcast_list = broadcast_list || ['localhost'] + @port = port + @primary = nil + @rings = [] + end + attr_accessor :broadcast_list, :port, :primary + + def to_a + @rings + end + + def each + lookup_ring_any unless @primary + return unless @primary + yield(@primary) + @rings.each { |x| yield(x) } + end + + def lookup_ring(timeout=5, &block) + return lookup_ring_any(timeout) unless block_given? + + msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) + @broadcast_list.each do |it| + soc = UDPSocket.open + begin + soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) + soc.send(msg, 0, it, @port) + rescue + nil + ensure + soc.close + end + end + sleep(timeout) + end + + def lookup_ring_any(timeout=5) + queue = Queue.new + + th = Thread.new do + self.lookup_ring(timeout) do |ts| + queue.push(ts) + end + queue.push(nil) + while it = queue.pop + @rings.push(it) + end + end + + @primary = queue.pop + raise('RingNotFound') if @primary.nil? + @primary + end + end + + class RingProvider + def initialize(klass, front, desc, renewer = nil) + @tuple = [:name, klass, front, desc] + @renewer = renewer || Rinda::SimpleRenewer.new + end + + def provide + ts = Rinda::RingFinger.primary + ts.write(@tuple, @renewer) + end + end +end + +if __FILE__ == $0 + DRb.start_service + case ARGV.shift + when 's' + require 'rinda/tuplespace' + ts = Rinda::TupleSpace.new + place = Rinda::RingServer.new(ts) + $stdin.gets + when 'w' + finger = Rinda::RingFinger.new(nil) + finger.lookup_ring do |ts| + p ts + ts.write([:hello, :world]) + end + when 'r' + finger = Rinda::RingFinger.new(nil) + finger.lookup_ring do |ts| + p ts + p ts.take([nil, nil]) + end + end +end diff --git a/lib/rinda/tuplespace.rb b/lib/rinda/tuplespace.rb new file mode 100644 index 0000000000..4bd4a42af5 --- /dev/null +++ b/lib/rinda/tuplespace.rb @@ -0,0 +1,356 @@ +require 'monitor' +require 'thread' +require 'drb/drb' +require 'rinda/rinda' + +module Rinda + class TupleEntry + include DRbUndumped + + def initialize(ary, sec=nil) + @cancel = false + @ary = make_tuple(ary) + @renewer = nil + renew(sec) + end + attr_accessor :expires + + def cancel + @cancel = true + end + + def alive? + !canceled? && !expired? + end + + def value; @ary.value; end + def canceled?; @cancel; end + def expired? + return true unless @expires + return false if @expires > Time.now + return true if @renewer.nil? + renew(@renewer) + return true unless @expires + return @expires < Time.now + end + + def renew(sec_or_renewer) + sec, @renewer = get_renewer(sec_or_renewer) + @expires = make_expires(sec) + end + + def make_expires(sec=nil) + case sec + when Numeric + Time.now + sec + when true + Time.at(1) + when nil + Time.at(2**31-1) + end + end + + def [](key) + @ary[key] + end + + def size + @ary.size + end + + def make_tuple(ary) + Rinda::Tuple.new(ary) + end + + private + def get_renewer(it) + case it + when Numeric, true, nil + return it, nil + else + begin + return it.renew, it + rescue Exception + return it, nil + end + end + end + end + + class TemplateEntry < TupleEntry + def initialize(ary, expires=nil) + super(ary, expires) + @template = Rinda::Template.new(ary) + end + + def match(tuple) + @template.match(tuple) + end + + def ===(tuple) + match(tuple) + end + + def make_tuple(ary) + Rinda::Template.new(ary) + end + end + + class WaitTemplateEntry < TemplateEntry + def initialize(place, ary, expires=nil) + super(ary, expires) + @place = place + @cond = place.new_cond + @found = nil + end + attr_reader :found + + def cancel + super + signal + end + + def wait + @cond.wait + end + + def read(tuple) + @found = tuple + signal + end + + def signal + @place.synchronize do + @cond.signal + end + end + end + + class NotifyTemplateEntry < TemplateEntry + def initialize(place, event, tuple, expires=nil) + ary = [event, Rinda::Template.new(tuple)] + super(ary, expires) + @queue = Queue.new + @done = false + end + + def notify(ev) + @queue.push(ev) + end + + def pop + raise RequestExpiredError if @done + it = @queue.pop + @done = true if it[0] == 'close' + return it + end + + def each + while !@done + it = pop + yield(it) + end + rescue + ensure + cancel + end + end + + class TupleBag + def initialize + @hash = {} + end + + def push(ary) + size = ary.size + @hash[size] ||= [] + @hash[size].push(ary) + end + + def delete(ary) + size = ary.size + @hash.fetch(size, []).delete(ary) + end + + def find_all(template) + @hash.fetch(template.size, []).find_all do |tuple| + tuple.alive? && template.match(tuple) + end + end + + def find(template) + @hash.fetch(template.size, []).find do |tuple| + tuple.alive? && template.match(tuple) + end + end + + def find_all_template(tuple) + @hash.fetch(tuple.size, []).find_all do |template| + template.alive? && template.match(tuple) + end + end + + def delete_unless_alive + deleted = [] + @hash.keys.each do |size| + ary = [] + @hash[size].each do |tuple| + if tuple.alive? + ary.push(tuple) + else + deleted.push(tuple) + end + end + @hash[size] = ary + end + deleted + end + end + + class TupleSpace + include DRbUndumped + include MonitorMixin + def initialize(timeout=60) + super() + @bag = TupleBag.new + @read_waiter = TupleBag.new + @take_waiter = TupleBag.new + @notify_waiter = TupleBag.new + @timeout = timeout + @period = timeout * 2 + @keeper = keeper + end + + def write(tuple, sec=nil) + entry = TupleEntry.new(tuple, sec) + synchronize do + if entry.expired? + @read_waiter.find_all_template(entry).each do |template| + template.read(tuple) + end + notify_event('write', entry.value) + notify_event('delete', entry.value) + else + @bag.push(entry) + @read_waiter.find_all_template(entry).each do |template| + template.read(tuple) + end + @take_waiter.find_all_template(entry).each do |template| + template.signal + end + notify_event('write', entry.value) + end + end + entry + end + + def take(tuple, sec=nil, &block) + move(nil, tuple, sec, &block) + end + + def move(port, tuple, sec=nil) + template = WaitTemplateEntry.new(self, tuple, sec) + yield(template) if block_given? + synchronize do + entry = @bag.find(template) + if entry + port.push(entry.value) if port + @bag.delete(entry) + notify_event('take', entry.value) + return entry.value + end + return nil if template.expired? + + begin + @take_waiter.push(template) + while true + raise RequestCanceledError if template.canceled? + raise RequestExpiredError if template.expired? + entry = @bag.find(template) + if entry + port.push(entry.value) if port + @bag.delete(entry) + notify_event('take', entry.value) + return entry.value + end + template.wait + end + ensure + @take_waiter.delete(template) + end + end + end + + def read(tuple, sec=nil) + template = WaitTemplateEntry.new(self, tuple, sec) + yield(template) if block_given? + synchronize do + entry = @bag.find(template) + return entry.value if entry + return nil if template.expired? + + begin + @read_waiter.push(template) + template.wait + raise RequestCanceledError if template.canceled? + raise RequestExpiredError if template.expired? + return template.found + ensure + @read_waiter.delete(template) + end + end + end + + def read_all(tuple) + template = WaitTemplateEntry.new(self, tuple, nil) + synchronize do + entry = @bag.find_all(template) + entry.collect do |e| + e.value + end + end + end + + def notify(event, tuple, sec=nil) + template = NotifyTemplateEntry.new(self, event, tuple, sec) + synchronize do + @notify_waiter.push(template) + end + template + end + + private + def keep_clean + synchronize do + @read_waiter.delete_unless_alive.each do |e| + e.signal + end + @take_waiter.delete_unless_alive.each do |e| + e.signal + end + @notify_waiter.delete_unless_alive.each do |e| + e.notify(['close']) + end + @bag.delete_unless_alive.each do |e| + notify_event('delete', e.value) + end + end + end + + def notify_event(event, tuple) + ev = [event, tuple] + @notify_waiter.find_all_template(ev).each do |template| + template.notify(ev) + end + end + + def keeper + Thread.new do + loop do + sleep(@period) + keep_clean + end + end + end + end +end |