aboutsummaryrefslogtreecommitdiffstats
path: root/thread_tools.c
diff options
context:
space:
mode:
authorko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2015-08-26 22:59:32 +0000
committerko1 <ko1@b2dd03c8-39d4-4d8f-98ff-823fe69b080e>2015-08-26 22:59:32 +0000
commitfd7ac9f3c9e106a57869da762a383536636f0f3d (patch)
tree789de6699e492fc58b26531147b4c6065d4a7db5 /thread_tools.c
parentb79d7910f5721b222443464ca576c5774c1a79ef (diff)
downloadruby-fd7ac9f3c9e106a57869da762a383536636f0f3d.tar.gz
* thread_tools.c: add Queue#close(exception=false) and
SizedQueue#close(exception=false). [Feature #10600] Trying to deq from a closed empty queue return nil if exception parameter equals to false (default). If exception parameter is truthy, it raises ClosedQueueError (< StopIteration). ClosedQueueError inherits StopIteration so that you can write: loop{ e = q.deq; (using e) } Trying to close a closed queue raises ClosedQueueError. Blocking threads to wait deq for Queue and SizedQueue will be restarted immediately by returning nil (exception=false) or raising a ClosedQueueError (exception=true). Blocking threads to wait enq for SizedQueue will be restarted by raising a ClosedQueueError immediately. The above specification is not proposed specification, so that we need to continue discussion to conclude specification this method. * test/thread/test_queue.rb: add tests originally written by John Anderson and modify detailed behavior. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@51699 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'thread_tools.c')
-rw-r--r--thread_tools.c215
1 files changed, 190 insertions, 25 deletions
diff --git a/thread_tools.c b/thread_tools.c
index eee185e6c3..06e73751bb 100644
--- a/thread_tools.c
+++ b/thread_tools.c
@@ -1,6 +1,7 @@
/* included by thraed.c */
VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
+VALUE rb_eClosedQueueError;
/* Mutex */
@@ -521,6 +522,9 @@ enum {
END_QUEUE
};
+#define QUEUE_CLOSED FL_USER5
+#define QUEUE_CLOSE_EXCEPTION FL_USER6
+
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
#define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS)
@@ -566,6 +570,77 @@ wakeup_all_threads(VALUE list)
rb_ary_clear(list);
}
+static unsigned long
+queue_length(VALUE self)
+{
+ VALUE que = GET_QUEUE_QUE(self);
+ return RARRAY_LEN(que);
+}
+
+static unsigned long
+queue_num_waiting(VALUE self)
+{
+ VALUE waiters = GET_QUEUE_WAITERS(self);
+ return RARRAY_LEN(waiters);
+}
+
+static unsigned long
+szqueue_num_waiting_producer(VALUE self)
+{
+ VALUE waiters = GET_SZQUEUE_WAITERS(self);
+ return RARRAY_LEN(waiters);
+}
+
+static int
+queue_closed_p(VALUE self)
+{
+ return FL_TEST_RAW(self, QUEUE_CLOSED) != 0;
+}
+
+static void
+raise_closed_queue_error(VALUE self)
+{
+ rb_raise(rb_eClosedQueueError, "queue closed");
+}
+
+static VALUE
+queue_closed_result(VALUE self)
+{
+ assert(queue_length(self) == 0);
+
+ if (FL_TEST(self, QUEUE_CLOSE_EXCEPTION)) {
+ raise_closed_queue_error(self);
+ }
+ return Qnil;
+}
+
+static VALUE
+queue_do_close(VALUE self, int argc, VALUE *argv, int is_szq)
+{
+ VALUE exception = Qfalse;
+
+ if (queue_closed_p(self)) raise_closed_queue_error(self);
+
+ rb_scan_args(argc, argv, "01", &exception);
+ FL_SET(self, QUEUE_CLOSED);
+
+ if (RTEST(exception)) {
+ FL_SET(self, QUEUE_CLOSE_EXCEPTION);
+ }
+
+ if (queue_num_waiting(self) > 0) {
+ VALUE waiters = GET_QUEUE_WAITERS(self);
+ wakeup_all_threads(waiters);
+ }
+
+ if (is_szq && szqueue_num_waiting_producer(self) > 0) {
+ VALUE waiters = GET_SZQUEUE_WAITERS(self);
+ wakeup_all_threads(waiters);
+ }
+
+ return self;
+}
+
/*
* Document-class: Queue
*
@@ -611,12 +686,75 @@ rb_queue_initialize(VALUE self)
static VALUE
queue_do_push(VALUE self, VALUE obj)
{
+ if (queue_closed_p(self)) {
+ raise_closed_queue_error(self);
+ }
rb_ary_push(GET_QUEUE_QUE(self), obj);
wakeup_first_thread(GET_QUEUE_WAITERS(self));
return self;
}
/*
+ * Document-method: Queue#close
+ * call-seq:
+ * close(exception=false)
+ *
+ * Closes the queue. A closed queue cannot be re-opened.
+ *
+ * After the call to close completes, the following are true:
+ *
+ * - +closed?+ will return true
+ *
+ * - calling enq/push/<< will raise ClosedQueueError('queue closed')
+ *
+ * - when +empty?+ is false, calling deq/pop/shift will return an object
+ * from the queue as usual.
+ *
+ * - when +empty?+ is true, deq(non_block=false) will not suspend and
+ * will either return nil. If +exception+ parameter is true, raise ClosedQueueError error.
+ * deq(non_block=true) will ignore the parameter and raise a ThreadError('queue empty').
+ *
+ * ClosedQueueError is inherited from StopIteration, so that you can break loop block.
+ *
+ * Example:
+ *
+ * q = Queue.new
+ * Thread.new{
+ * while e = q.deq # wait for nil to break loop
+ * # ...
+ * end
+ * }
+ * q.close # equals to q.close(false)
+ *
+ * q = Queue.new
+ * Thread.new{
+ * loop{
+ * e = q.deq; ... # braek with ClosedQueueError
+ * }
+ * }
+ * q.close(true)
+ */
+
+static VALUE
+rb_queue_close(int argc, VALUE *argv, VALUE self)
+{
+ return queue_do_close(self, argc, argv, FALSE);
+}
+
+/*
+ * Document-method: Queue#closed?
+ * call-seq: closed?
+ *
+ * Returns +true+ if the queue is closed.
+ */
+
+static VALUE
+rb_queue_closed_p(VALUE self)
+{
+ return queue_closed_p(self) ? Qtrue : Qfalse;
+}
+
+/*
* Document-method: Queue#push
* call-seq:
* push(object)
@@ -632,20 +770,6 @@ rb_queue_push(VALUE self, VALUE obj)
return queue_do_push(self, obj);
}
-static unsigned long
-queue_length(VALUE self)
-{
- VALUE que = GET_QUEUE_QUE(self);
- return RARRAY_LEN(que);
-}
-
-static unsigned long
-queue_num_waiting(VALUE self)
-{
- VALUE waiters = GET_QUEUE_WAITERS(self);
- return RARRAY_LEN(waiters);
-}
-
struct waiting_delete {
VALUE waiting;
VALUE th;
@@ -676,8 +800,16 @@ queue_do_pop(VALUE self, int should_block)
if (!should_block) {
rb_raise(rb_eThreadError, "queue empty");
}
- rb_ary_push(args.waiting, args.th);
- rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
+ else if (queue_closed_p(self)) {
+ return queue_closed_result(self);
+ }
+ else {
+ assert(queue_length(self) == 0);
+ assert(queue_closed_p(self) == 0);
+
+ rb_ary_push(args.waiting, args.th);
+ rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
+ }
}
return rb_ary_shift(GET_QUEUE_QUE(self));
@@ -805,6 +937,24 @@ rb_szqueue_initialize(VALUE self, VALUE vmax)
}
/*
+ * Document-method: SizedQueue#close
+ * call-seq:
+ * close(exception=false)
+ *
+ * Similar to Queue#close.
+ *
+ * The difference is behavior with waiting enqueuing threads.
+ *
+ * If there are waiting enqueuing threads, they are interrupted by
+ * raising ClosedQueueError('queue closed').
+ */
+static VALUE
+rb_szqueue_close(int argc, VALUE *argv, VALUE self)
+{
+ return queue_do_close(self, argc, argv, TRUE);
+}
+
+/*
* Document-method: SizedQueue#max
*
* Returns the maximum size of the queue.
@@ -879,9 +1029,20 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
if (!should_block) {
rb_raise(rb_eThreadError, "queue full");
}
- rb_ary_push(args.waiting, args.th);
- rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
+ else if (queue_closed_p(self)) {
+ goto closed;
+ }
+ else {
+ rb_ary_push(args.waiting, args.th);
+ rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
+ }
+ }
+
+ if (queue_closed_p(self)) {
+ closed:
+ raise_closed_queue_error(self);
}
+
return queue_do_push(self, argv[0]);
}
@@ -941,9 +1102,7 @@ rb_szqueue_clear(VALUE self)
static VALUE
rb_szqueue_num_waiting(VALUE self)
{
- long len = queue_num_waiting(self);
- VALUE waiters = GET_SZQUEUE_WAITERS(self);
- len += RARRAY_LEN(waiters);
+ long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self);
return ULONG2NUM(len);
}
@@ -1106,14 +1265,14 @@ Init_thread_tools(void)
rb_cThread,
"Queue", rb_cObject, rb_struct_alloc_noinit,
"que", "waiters", NULL);
- rb_cSizedQueue = rb_struct_define_without_accessor_under(
- rb_cThread,
- "SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
- "que", "waiters", "queue_waiters", "size", NULL);
+
+ rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration);
rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
rb_undef_method(rb_cQueue, "initialize_copy");
rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
+ rb_define_method(rb_cQueue, "close", rb_queue_close, -1);
+ rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
@@ -1127,7 +1286,13 @@ Init_thread_tools(void)
rb_define_alias(rb_cQueue, "shift", "pop"); /* Alias for #pop. */
rb_define_alias(rb_cQueue, "size", "length"); /* Alias for #length. */
+ rb_cSizedQueue = rb_struct_define_without_accessor_under(
+ rb_cThread,
+ "SizedQueue", rb_cQueue, rb_struct_alloc_noinit,
+ "que", "waiters", "queue_waiters", "size", NULL);
+
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
+ rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, -1);
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);