diff options
author | ocean <ocean@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2005-09-17 02:42:59 +0000 |
---|---|---|
committer | ocean <ocean@b2dd03c8-39d4-4d8f-98ff-823fe69b080e> | 2005-09-17 02:42:59 +0000 |
commit | 846b1c35e5c6d0ab787b9415ea586ccc843df562 (patch) | |
tree | 033393758050e8f54f1a965f566f7c3157cafd9d /win32/win32.c | |
parent | 762ed0d68c7cfb96c8e4c1a67030a01e7825c9a5 (diff) | |
download | ruby-846b1c35e5c6d0ab787b9415ea586ccc843df562.tar.gz |
* win32/win32.c (rb_w32_select): select for socket didn't work.
this caused deadlock in drb test. this happened because GetFileType
for socket handle returns FILE_TYPE_PIPE. Of course, it's not a
pipe. So socket handle didn't reach winsock's select function.
* win32/win32.c (rb_w32_select): read for pipe still kept brocking
even if writer handle was closed.
r,w = IO.pipe
Thread.new {
sleep 3; puts "------- 1"
w.puts("foo")
sleep 3; puts "------- 2"
w.puts("boo")
sleep 3; puts "------- 3"
w.close
}
until r.eof? # should break by w.close but didn't.
puts r.gets
end
* win32/win32.c (rb_w32_select): temprary reverted console support
but it'll be back soon.
git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@9192 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
Diffstat (limited to 'win32/win32.c')
-rw-r--r-- | win32/win32.c | 324 |
1 files changed, 101 insertions, 223 deletions
diff --git a/win32/win32.c b/win32/win32.c index 7518e1e726..02983e19e3 100644 --- a/win32/win32.c +++ b/win32/win32.c @@ -1906,226 +1906,130 @@ rb_w32_fdisset(int fd, fd_set *set) static int NtSocketsInitialized = 0; -static int -collect_file_fd(fd_set *set, fd_set *fileset) +static void +extract_fd(fd_set *dst, fd_set *src, int (*func)(SOCKET)) { - int idx; + int s = 0; + if (!src) return; - fileset->fd_count = 0; - if (!set) - return 0; - for (idx = 0; idx < set->fd_count; idx++) { - SOCKET fd = set->fd_array[idx]; + while (s < src->fd_count) { + SOCKET fd = src->fd_array[s]; - if (!is_socket(fd)) { - int i; + if (!func || (*func)(fd)) { /* move it to dst */ + int d; - for (i = 0; i < fileset->fd_count; i++) { - if (fileset->fd_array[i] == fd) { - break; - } + for (d = 0; d < dst->fd_count; d++) { + if (dst->fd_array[d] == fd) break; } - if (i == fileset->fd_count) { - if (fileset->fd_count < FD_SETSIZE) { - fileset->fd_array[i] = fd; - fileset->fd_count++; - } + if (d == dst->fd_count && dst->fd_count < FD_SETSIZE) { + dst->fd_array[dst->fd_count++] = fd; } + memmove( + &src->fd_array[s], + &src->fd_array[s+1], + sizeof(src->fd_array[0]) * (--src->fd_count - s)); } + else s++; } - return fileset->fd_count; } static int -extract_pipe_fd(fd_set *set, fd_set *fileset) +is_not_socket(SOCKET sock) { - int idx; - - fileset->fd_count = 0; - if (!set) - return 0; - for (idx = 0; idx < set->fd_count; idx++) { - DWORD type; - int fd = set->fd_array[idx]; - RUBY_CRITICAL(type = GetFileType((HANDLE)fd)); - - if (type == FILE_TYPE_PIPE) { - int i; - - for (i = 0; i < fileset->fd_count; i++) { - if (fileset->fd_array[i] == fd) { - break; - } - } - if (i == fileset->fd_count) { - if (fileset->fd_count < FD_SETSIZE) { - fileset->fd_array[i] = fd; - fileset->fd_count++; - } - } - - for (i = idx; i < set->fd_count - 1; i++) { - set->fd_array[i] = set->fd_array[i + 1]; - i++; - } - set->fd_count--; - idx--; - } - } - return fileset->fd_count; + return !is_socket(sock); } static int -peek_pipe(fd_set *set, fd_set *fileset) +is_pipe(SOCKET sock) /* DONT call this for SOCKET! it clains it is PIPE. */ { - int idx; - - fileset->fd_count = 0; - for (idx = 0; idx < set->fd_count; idx++) { - DWORD n; - int fd = set->fd_array[idx]; - if (PeekNamedPipe((HANDLE)fd, NULL, 0, NULL, &n, NULL) && n > 0) { - int i; + int ret; - for (i = 0; i < fileset->fd_count; i++) { - if (fileset->fd_array[i] == fd) { - break; - } - } - if (i == fileset->fd_count) { - if (fileset->fd_count < FD_SETSIZE) { - fileset->fd_array[i] = fd; - fileset->fd_count++; - } - } - } - } + RUBY_CRITICAL( + ret = (GetFileType((HANDLE)sock) == FILE_TYPE_PIPE) + ); - return fileset->fd_count; + return ret; } static int -extract_console_fd(fd_set *set, fd_set *fileset) +is_readable_pipe(SOCKET sock) /* call this for pipe only */ { - int idx; - - fileset->fd_count = 0; - if (!set) - return 0; - for (idx = 0; idx < set->fd_count; idx++) { - BOOL ret; - static INPUT_RECORD ir; - DWORD n; - int fd = set->fd_array[idx]; - RUBY_CRITICAL(ret = PeekConsoleInput((HANDLE)fd, &ir, 1, &n)); - - if (ret) { - int i; - - for (i = 0; i < fileset->fd_count; i++) { - if (fileset->fd_array[i] == fd) { - break; - } - } - if (i == fileset->fd_count) { - if (fileset->fd_count < FD_SETSIZE) { - fileset->fd_array[i] = fd; - fileset->fd_count++; - } - } + int ret; + DWORD n = 0; - for (i = idx; i < set->fd_count - 1; i++) { - set->fd_array[i] = set->fd_array[i + 1]; - i++; - } - set->fd_count--; - idx--; + RUBY_CRITICAL( + if (PeekNamedPipe((HANDLE)sock, NULL, 0, NULL, &n, NULL)) { + ret = (n > 0); } - } - return fileset->fd_count; + else { + ret = (GetLastError() == ERROR_BROKEN_PIPE); /* pipe was closed */ + } + ); + + return ret; } -static int -peek_console(fd_set *set, fd_set *fileset) -{ - int idx; - INPUT_RECORD ir; - - fileset->fd_count = 0; - for (idx = 0; idx < set->fd_count; idx++) { - DWORD n; - int fd = set->fd_array[idx]; - if (PeekConsoleInput((HANDLE)fd, &ir, 1, &n) && n > 0) { - if (ir.EventType == KEY_EVENT && ir.Event.KeyEvent.bKeyDown && - ir.Event.KeyEvent.uChar.AsciiChar) { - int i; +static long +do_select(int nfds, fd_set *rd, fd_set *wr, fd_set *ex, + struct timeval *timeout) +{ + long r = 0; - for (i = 0; i < fileset->fd_count; i++) { - if (fileset->fd_array[i] == fd) { - break; - } - } - if (i == fileset->fd_count) { - if (fileset->fd_count < FD_SETSIZE) { - fileset->fd_array[i] = fd; - fileset->fd_count++; - } - } - } - else { - ReadConsoleInput((HANDLE)fd, &ir, 1, &n); - } + if (nfds == 0 && timeout) { + Sleep(timeout->tv_sec * 1000 + timeout->tv_usec / 1000); + } + else { + r = select(nfds, rd, wr, ex, timeout); + if (r == SOCKET_ERROR) { + errno = map_errno(WSAGetLastError()); + r = -1; } } - return fileset->fd_count; + return r; } -#define PIPE_PEEK_INTERVAL (10 * 1000) /* usec */ - long rb_w32_select(int nfds, fd_set *rd, fd_set *wr, fd_set *ex, struct timeval *timeout) { long r; - fd_set file_rd; - fd_set file_wr; fd_set pipe_rd; - fd_set cons_rd; + fd_set unknown_rd; + fd_set unknown_wr; #ifdef USE_INTERRUPT_WINSOCK fd_set trap; #endif /* USE_INTERRUPT_WINSOCK */ - int file_nfds; - int pipe_nfds; - int cons_nfds; - struct timeval remainder; - struct timeval wait; + if (nfds < 0 || (timeout && (timeout->tv_sec < 0 || timeout->tv_usec < 0))) { + errno = EINVAL; + return -1; + } if (!NtSocketsInitialized) { StartSockets(); } + + pipe_rd.fd_count = 0; + unknown_rd.fd_count = 0; + unknown_wr.fd_count = 0; + + extract_fd(&unknown_rd, rd, is_not_socket); /* must exclude socket first! */ + extract_fd(&unknown_wr, wr, is_not_socket); /* must exclude socket first! */ + extract_fd(&pipe_rd, &unknown_rd, is_pipe); /* don't call is_pipe for socket! */ + + if (unknown_rd.fd_count + unknown_wr.fd_count) { + // assume unknown handles are always readable/writable + // fake read/write fd_set and return value + if (rd) *rd = unknown_rd; + if (wr) *wr = unknown_wr; + return unknown_rd.fd_count + unknown_wr.fd_count; + } + r = 0; if (rd && rd->fd_count > r) r = rd->fd_count; if (wr && wr->fd_count > r) r = wr->fd_count; if (ex && ex->fd_count > r) r = ex->fd_count; if (nfds > r) nfds = r; - if (nfds == 0 && timeout) { - Sleep(timeout->tv_sec * 1000 + timeout->tv_usec / 1000); - return 0; - } - pipe_nfds = extract_pipe_fd(rd, &pipe_rd); - cons_nfds = extract_console_fd(rd, &cons_rd); - nfds -= pipe_nfds; - file_nfds = collect_file_fd(rd, &file_rd); - file_nfds += collect_file_fd(wr, &file_wr); - if (file_nfds) { - // assume normal files are always readable/writable, - // and pipes are always writable. - // fake read/write fd_set and return value - if (rd) *rd = file_rd; - if (wr) *wr = file_wr; - return file_nfds; - } #if USE_INTERRUPT_WINSOCK if (ex) @@ -2138,60 +2042,34 @@ rb_w32_select(int nfds, fd_set *rd, fd_set *wr, fd_set *ex, ex = &trap; #endif /* USE_INTERRUPT_WINSOCK */ - if (timeout) - remainder = *timeout; - wait.tv_sec = 0; - RUBY_CRITICAL(do{ - if (pipe_nfds) { - r = peek_pipe(&pipe_rd, &file_rd); - if (r > 0) { - if (rd) *rd = file_rd; - if (wr) wr->fd_count = 0; - break; - } - } - if (cons_nfds) { - r = peek_console(&cons_rd, &file_rd); - if (r > 0) { - if (rd) *rd = file_rd; - if (wr) wr->fd_count = 0; - break; - } - } - if (timeout && remainder.tv_sec == 0 && - remainder.tv_usec < PIPE_PEEK_INTERVAL) { - wait.tv_usec = remainder.tv_usec; - remainder.tv_usec = 0; - } - else if (timeout && remainder.tv_usec < PIPE_PEEK_INTERVAL) { - remainder.tv_sec--; - remainder.tv_usec += 1000 * 1000 * 1000L- PIPE_PEEK_INTERVAL; - wait.tv_usec = PIPE_PEEK_INTERVAL; - } - else { - if (timeout) - remainder.tv_usec -= PIPE_PEEK_INTERVAL; - wait.tv_usec = PIPE_PEEK_INTERVAL; - } - if (nfds == 0) { - Sleep(wait.tv_sec * 1000 + wait.tv_usec / 1000); - } - else { - r = select(nfds, rd, wr, ex, &wait); - if (r == SOCKET_ERROR) { - errno = map_errno(WSAGetLastError()); - r = -1; - break; - } - else if (r > 0) { - break; - } - else if (timeout && - remainder.tv_sec == 0 && remainder.tv_usec == 0) { - break; - } + RUBY_CRITICAL( + if (pipe_rd.fd_count) { + long sec = timeout ? timeout->tv_sec + 1 : 0/*dummy*/; + while (!timeout || sec--) { + fd_set buf; buf.fd_count = 0; + // pipe + extract_fd(&buf, &pipe_rd, is_readable_pipe); + // socket + if (buf.fd_count) { + struct timeval val; val.tv_sec = 0; val.tv_usec = 0; + r = do_select(nfds, rd, wr, ex, &val); + if (r >= 0) { + r += buf.fd_count; + extract_fd(rd, &buf, NULL); // move all `buf' contents into `rd'. + } + // XXX: should I ignore socket error and returns only readable pipe? + break; + } + else { + struct timeval val; val.tv_sec = 1; val.tv_usec = 0; + r = do_select(nfds, rd, wr, ex, &val); + if (r) break; // readable or error (not pending) + } + } } - } while (0)); + else + r = do_select(nfds, rd, wr, ex, timeout); + ); return r; } |