diff options
author | ko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2015-08-26 22:59:32 +0000 |
---|---|---|
committer | ko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2015-08-26 22:59:32 +0000 |
commit | fd7ac9f3c9e106a57869da762a383536636f0f3d (patch) | |
tree | 789de6699e492fc58b26531147b4c6065d4a7db5 /thread_tools.c | |
parent | b79d7910f5721b222443464ca576c5774c1a79ef (diff) | |
download | ruby-fd7ac9f3c9e106a57869da762a383536636f0f3d.tar.gz |
* thread_tools.c: add Queue#close(exception=false) and
SizedQueue#close(exception=false).
[Feature #10600]
Trying to deq from a closed empty queue return nil
if exception parameter equals to false (default).
If exception parameter is truthy, it raises
ClosedQueueError (< StopIteration).
ClosedQueueError inherits StopIteration so that you can write:
loop{ e = q.deq; (using e) }
Trying to close a closed queue raises ClosedQueueError.
Blocking threads to wait deq for Queue and SizedQueue will be
restarted immediately by returning nil (exception=false) or
raising a ClosedQueueError (exception=true).
Blocking threads to wait enq for SizedQueue will be
restarted by raising a ClosedQueueError immediately.
The above specification is not proposed specification, so that
we need to continue discussion to conclude specification this
method.
* test/thread/test_queue.rb: add tests originally written by
John Anderson and modify detailed behavior.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@51699 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread_tools.c')
-rw-r--r-- | thread_tools.c | 215 |
1 files changed, 190 insertions, 25 deletions
diff --git a/thread_tools.c b/thread_tools.c index eee185e6c3..06e73751bb 100644 --- a/thread_tools.c +++ b/thread_tools.c @@ -1,6 +1,7 @@ /* included by thraed.c */ VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; +VALUE rb_eClosedQueueError; /* Mutex */ @@ -521,6 +522,9 @@ enum { END_QUEUE }; +#define QUEUE_CLOSED FL_USER5 +#define QUEUE_CLOSE_EXCEPTION FL_USER6 + #define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE) #define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS) #define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS) @@ -566,6 +570,77 @@ wakeup_all_threads(VALUE list) rb_ary_clear(list); } +static unsigned long +queue_length(VALUE self) +{ + VALUE que = GET_QUEUE_QUE(self); + return RARRAY_LEN(que); +} + +static unsigned long +queue_num_waiting(VALUE self) +{ + VALUE waiters = GET_QUEUE_WAITERS(self); + return RARRAY_LEN(waiters); +} + +static unsigned long +szqueue_num_waiting_producer(VALUE self) +{ + VALUE waiters = GET_SZQUEUE_WAITERS(self); + return RARRAY_LEN(waiters); +} + +static int +queue_closed_p(VALUE self) +{ + return FL_TEST_RAW(self, QUEUE_CLOSED) != 0; +} + +static void +raise_closed_queue_error(VALUE self) +{ + rb_raise(rb_eClosedQueueError, "queue closed"); +} + +static VALUE +queue_closed_result(VALUE self) +{ + assert(queue_length(self) == 0); + + if (FL_TEST(self, QUEUE_CLOSE_EXCEPTION)) { + raise_closed_queue_error(self); + } + return Qnil; +} + +static VALUE +queue_do_close(VALUE self, int argc, VALUE *argv, int is_szq) +{ + VALUE exception = Qfalse; + + if (queue_closed_p(self)) raise_closed_queue_error(self); + + rb_scan_args(argc, argv, "01", &exception); + FL_SET(self, QUEUE_CLOSED); + + if (RTEST(exception)) { + FL_SET(self, QUEUE_CLOSE_EXCEPTION); + } + + if (queue_num_waiting(self) > 0) { + VALUE waiters = GET_QUEUE_WAITERS(self); + wakeup_all_threads(waiters); + } + + if (is_szq && szqueue_num_waiting_producer(self) > 0) { + VALUE waiters = GET_SZQUEUE_WAITERS(self); + wakeup_all_threads(waiters); + } + + return self; +} + /* * Document-class: Queue * @@ -611,12 +686,75 @@ rb_queue_initialize(VALUE self) static VALUE queue_do_push(VALUE self, VALUE obj) { + if (queue_closed_p(self)) { + raise_closed_queue_error(self); + } rb_ary_push(GET_QUEUE_QUE(self), obj); wakeup_first_thread(GET_QUEUE_WAITERS(self)); return self; } /* + * Document-method: Queue#close + * call-seq: + * close(exception=false) + * + * Closes the queue. A closed queue cannot be re-opened. + * + * After the call to close completes, the following are true: + * + * - +closed?+ will return true + * + * - calling enq/push/<< will raise ClosedQueueError('queue closed') + * + * - when +empty?+ is false, calling deq/pop/shift will return an object + * from the queue as usual. + * + * - when +empty?+ is true, deq(non_block=false) will not suspend and + * will either return nil. If +exception+ parameter is true, raise ClosedQueueError error. + * deq(non_block=true) will ignore the parameter and raise a ThreadError('queue empty'). + * + * ClosedQueueError is inherited from StopIteration, so that you can break loop block. + * + * Example: + * + * q = Queue.new + * Thread.new{ + * while e = q.deq # wait for nil to break loop + * # ... + * end + * } + * q.close # equals to q.close(false) + * + * q = Queue.new + * Thread.new{ + * loop{ + * e = q.deq; ... # braek with ClosedQueueError + * } + * } + * q.close(true) + */ + +static VALUE +rb_queue_close(int argc, VALUE *argv, VALUE self) +{ + return queue_do_close(self, argc, argv, FALSE); +} + +/* + * Document-method: Queue#closed? + * call-seq: closed? + * + * Returns +true+ if the queue is closed. + */ + +static VALUE +rb_queue_closed_p(VALUE self) +{ + return queue_closed_p(self) ? Qtrue : Qfalse; +} + +/* * Document-method: Queue#push * call-seq: * push(object) @@ -632,20 +770,6 @@ rb_queue_push(VALUE self, VALUE obj) return queue_do_push(self, obj); } -static unsigned long -queue_length(VALUE self) -{ - VALUE que = GET_QUEUE_QUE(self); - return RARRAY_LEN(que); -} - -static unsigned long -queue_num_waiting(VALUE self) -{ - VALUE waiters = GET_QUEUE_WAITERS(self); - return RARRAY_LEN(waiters); -} - struct waiting_delete { VALUE waiting; VALUE th; @@ -676,8 +800,16 @@ queue_do_pop(VALUE self, int should_block) if (!should_block) { rb_raise(rb_eThreadError, "queue empty"); } - rb_ary_push(args.waiting, args.th); - rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + else if (queue_closed_p(self)) { + return queue_closed_result(self); + } + else { + assert(queue_length(self) == 0); + assert(queue_closed_p(self) == 0); + + rb_ary_push(args.waiting, args.th); + rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + } } return rb_ary_shift(GET_QUEUE_QUE(self)); @@ -805,6 +937,24 @@ rb_szqueue_initialize(VALUE self, VALUE vmax) } /* + * Document-method: SizedQueue#close + * call-seq: + * close(exception=false) + * + * Similar to Queue#close. + * + * The difference is behavior with waiting enqueuing threads. + * + * If there are waiting enqueuing threads, they are interrupted by + * raising ClosedQueueError('queue closed'). + */ +static VALUE +rb_szqueue_close(int argc, VALUE *argv, VALUE self) +{ + return queue_do_close(self, argc, argv, TRUE); +} + +/* * Document-method: SizedQueue#max * * Returns the maximum size of the queue. @@ -879,9 +1029,20 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) if (!should_block) { rb_raise(rb_eThreadError, "queue full"); } - rb_ary_push(args.waiting, args.th); - rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + else if (queue_closed_p(self)) { + goto closed; + } + else { + rb_ary_push(args.waiting, args.th); + rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + } + } + + if (queue_closed_p(self)) { + closed: + raise_closed_queue_error(self); } + return queue_do_push(self, argv[0]); } @@ -941,9 +1102,7 @@ rb_szqueue_clear(VALUE self) static VALUE rb_szqueue_num_waiting(VALUE self) { - long len = queue_num_waiting(self); - VALUE waiters = GET_SZQUEUE_WAITERS(self); - len += RARRAY_LEN(waiters); + long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self); return ULONG2NUM(len); } @@ -1106,14 +1265,14 @@ Init_thread_tools(void) rb_cThread, "Queue", rb_cObject, rb_struct_alloc_noinit, "que", "waiters", NULL); - rb_cSizedQueue = rb_struct_define_without_accessor_under( - rb_cThread, - "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, - "que", "waiters", "queue_waiters", "size", NULL); + + rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration); rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0); rb_undef_method(rb_cQueue, "initialize_copy"); rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0); + rb_define_method(rb_cQueue, "close", rb_queue_close, -1); + rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0); rb_define_method(rb_cQueue, "push", rb_queue_push, 1); rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1); rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); @@ -1127,7 +1286,13 @@ Init_thread_tools(void) rb_define_alias(rb_cQueue, "shift", "pop"); /* Alias for #pop. */ rb_define_alias(rb_cQueue, "size", "length"); /* Alias for #length. */ + rb_cSizedQueue = rb_struct_define_without_accessor_under( + rb_cThread, + "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, + "que", "waiters", "queue_waiters", "size", NULL); + rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1); + rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, -1); rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0); rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1); rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1); |