Writing pipe on Windows problem

Hi, all

I tested pipe with this code:

readPipe, writePipe = IO.pipe
t = Thread.new { puts “got #{readPipe.readline.length} bytes” }
sleep 1
puts “hello from main”
writePipe.puts “a”*2048
t.join

It works on Linux, but it blocks and fails to respond Ctrl-C on Windows.
How to avoid freezing on Windows?

Regards,
Park H.

Hi,

Heesob P. wrote:

Hi, all

I tested pipe with this code:

readPipe, writePipe = IO.pipe
t = Thread.new { puts “got #{readPipe.readline.length} bytes” }
sleep 1
puts “hello from main”
writePipe.puts “a”*2048
t.join

It works on Linux, but it blocks and fails to respond Ctrl-C on Windows.
How to avoid freezing on Windows?

It seems nobody cares about pipe blocking on windows.:frowning:
I made my own patch applied for win32.c of ruby1.8.6p114.
It is not complete yet, but it will solve blocking problem of pipe and
cosole input on windows.
I hope this give some inpiration to the win32 ruby maintainer.

Regards,
Park H.

win32.c.org 2008-04-29 14:44:41.000000000 +0900
+++ win32.c 2008-05-02 14:08:10.000000000 +0900
@@ -2046,6 +2050,114 @@
return fileset->fd_count;
}

+static HANDLE main_select_event=NULL;
+static HANDLE *select_thread_list=NULL;
+
+static DWORD WINAPI
+select_read_thread(PVOID argp)
+{

  • HANDLE fd_handle = (HANDLE)argp;
  • DWORD ret = 1;
  • DWORD mode = 0;
  • GetConsoleMode(fd_handle,&mode);
  • if(mode) {
  •    /* Console Input */
    
  •    DWORD num_events,num_events_read,pre_num=0;
    
  •    INPUT_RECORD *input_record;
    
  •    int i;
    
  •    while(1) {
    
  •        if(WaitForSingleObject(main_select_event,0)!=WAIT_TIMEOUT) 
    

break;

  •        GetNumberOfConsoleInputEvents(fd_handle, &num_events);
    
  •        if(pre_num != num_events) {
    
  •            input_record = 
    

(INPUT_RECORD*)malloc(sizeof(INPUT_RECORD)*num_events);

  •            PeekConsoleInput(fd_handle, input_record, 
    

num_events,&num_events_read);

  •            for(i=0;i<num_events;i++) {
    

+if(input_record[i].Event.KeyEvent.uChar.AsciiChar) return ret;

  •                if ((input_record[i].EventType == KEY_EVENT) && 
    

input_record[i].Event.KeyEvent.bKeyDown) {
+if(input_record[i].Event.KeyEvent.uChar.AsciiChar==13) { /* carriage
return */

  •                        free(input_record);
    
  •                        return ret;
    
  •                    }
    
  •                    else 
    

if(input_record[i].Event.KeyEvent.uChar.AsciiChar==26) { /* ^Z - end of
file */

  •                        free(input_record);
    
  •                        return ret;
    
  •                    }
    
  •                }
    
  •            }
    
  •            free(input_record);
    
  •            pre_num = num_events;
    
  •       }
    
  •    }
    
  •    return ret;
    
  • } else {
  •    DWORD state;
    
  •    ret = 
    

GetNamedPipeHandleState(fd_handle,&state,NULL,NULL,NULL,NULL,0);

  •    if(ret) {
    
  •        /* Pipe Input */
    
  •        int bytes_avail=0;
    
  •        while(1) {
    

+if(WaitForSingleObject(main_select_event,0)!=WAIT_TIMEOUT) break;

  •            ret = 
    

PeekNamedPipe(fd_handle,NULL,0,NULL,&bytes_avail,NULL);

  •            if(bytes_avail>0) {
    
  •                return bytes_avail;
    
  •          }
    
  •        }
    
  •    }
    
  •    return ret;
    
  • }
  • return ret;
    +}

+typedef DWORD (WINAPI *PNtQueryInformationFile)(HANDLE, PVOID,
PVOID,DWORD, DWORD );
+#define FilePipeLocalInformation 24
+typedef struct _FILE_PIPE_LOCAL_INFORMATION {

  • ULONG NamedPipeType;
  • ULONG NamedPipeConfiguration;
  • ULONG MaximumInstances;
  • ULONG CurrentInstances;
  • ULONG InboundQuota;
  • ULONG ReadDataAvailable;
  • ULONG OutboundQuota;
  • ULONG WriteQuotaAvailable;
  • ULONG NamedPipeState;
  • ULONG NamedPipeEnd;
    +} FILE_PIPE_LOCAL_INFORMATION, *PFILE_PIPE_LOCAL_INFORMATION;
    +static PNtQueryInformationFile NtQueryInformationFile=NULL;

+static DWORD WINAPI
+select_write_thread(PVOID argp)
+{

  • HANDLE fd_handle = (HANDLE)argp;
  • DWORD ret = 1;
  • DWORD mode = 0;
  • GetConsoleMode(fd_handle,&mode);
  • if(mode) {
  •    /* Console Output */
    
  •    return ret;
    
  • } else {
  •    DWORD state;
    
  •    ret = 
    

GetNamedPipeHandleState(fd_handle,&state,NULL,NULL,NULL,NULL,0);

  •    if(ret) {
    
  •        /* Pipe output */
    
  •        int bytes_written=0;
    
  •        int bytes_avail=0;
    
  •        DWORD iob[2];
    
  •        FILE_PIPE_LOCAL_INFORMATION fpli = {0};
    
  •        if(NtQueryInformationFile) {
    
  •            while(1) {
    
  • if(WaitForSingleObject(main_select_event,0)!=WAIT_TIMEOUT) break;
    
  • ret = 
    

NtQueryInformationFile(fd_handle,iob,&fpli,sizeof(fpli),FilePipeLocalInformation);

  •         if(fpli.OutboundQuota==fpli.WriteQuotaAvailable) return 
    

ret;

  • }
  • }
    
  •    }
    
  • }
  • return ret;
    +}

long
rb_w32_select (int nfds, fd_set *rd, fd_set *wr, fd_set *ex,
struct timeval *timeout)
@@ -2074,6 +2187,86 @@
file_nfds += extract_file_fd(wr, &file_wr);
if (file_nfds)
{

  •    DWORD val,exit_code;
    
  •    int i;
    
  •    if(!NtQueryInformationFile)
    
  • NtQueryInformationFile = (PNtQueryInformationFile)
    +GetProcAddress(GetModuleHandle(“ntdll.dll”),“NtQueryInformationFile”);
  •    main_select_event = CreateEvent(NULL,TRUE,FALSE,NULL);
    
  •    if(main_select_event == NULL)
    
  •    {
    
  •      printf("CreateEvent failed (%d)\n", GetLastError());
    
  •      return -1;
    
  •    }
    
  •    select_thread_list = malloc(sizeof(HANDLE)*(file_nfds+1));
    
  •    for(i=0; i<file_nfds; i++)
    
  •    {
    
  •      if(i<file_rd.fd_count) {
    
  •        select_thread_list[i] = 
    

CreateThread(NULL,0,select_read_thread,

  •            (PVOID)file_rd.fd_array[i],0,&val);
    
  •      }
    
  •      else {
    
  •        select_thread_list[i] = 
    

CreateThread(NULL,0,select_write_thread,

  •            (PVOID)file_wr.fd_array[i-file_rd.fd_count],0,&val);
    
  •      }
    
  •      if (select_thread_list[i] == NULL)
    
  •      {
    
  •        printf("CreateThread failed (%d)\n", GetLastError());
    
  •        return -1;
    
  •      }
    
  •    }
    
  •    select_thread_list[file_nfds] = interrupted_event;
    
  •    if(timeout)
    
  •        r = 
    

WaitForMultipleObjects(file_nfds+1,select_thread_list,FALSE,

  •            timeout->tv_sec * 1000 + timeout->tv_usec / 1000);
    
  •    else
    
  •        r = 
    

WaitForMultipleObjects(file_nfds+1,select_thread_list,FALSE,

  •            INFINITE);
    
  •    if (!SetEvent(main_select_event) )
    
  •    {
    
  •      printf("SetEvent failed (%d)\n", GetLastError());
    
  •      return -1;
    
  •    }
    
  •    /* thread cleanup */
    
  •    for(i=0;i<file_nfds;i++) {
    
  •        while(1) {
    
  •              exit_code = 0;
    
  •              GetExitCodeThread(select_thread_list[i],&exit_code);
    
  •              if(exit_code!=STILL_ACTIVE) {
    
  •                  CloseHandle(select_thread_list[i]);
    
  •                  break;
    
  •              }
    
  •        }
    
  •    }
    
  •    free(select_thread_list);
    
  •    CloseHandle(main_select_event);
    
  •    if(r==WAIT_TIMEOUT) { /* timeout */
    
  •            FD_ZERO(&file_rd);
    
  •            FD_ZERO(&file_wr);
    
  •            if(rd) *rd = file_rd;
    
  •            if(wr) *wr = file_wr;
    
  •            return 0;
    
  •    } else if((r -= WAIT_OBJECT_0) == file_nfds) {  /* interrupt */
    
  •            FD_ZERO(&file_rd);
    
  •            FD_ZERO(&file_wr);
    
  •            if(rd) *rd = file_rd;
    
  •            if(wr) *wr = file_wr;
    
  •            return -1;
    
  • } else if(r<file_rd.fd_count) { /* read ready */
  •            FD_ZERO(&file_wr);
    
  •            if(rd) *rd = file_rd;
    
  •            if(wr) *wr = file_wr;
    
  •            return 1;
    
  •    } else { /* write ready */
    
  •            FD_ZERO(&file_rd);
    
  •            if(rd) *rd = file_rd;
    
  •            if(wr) *wr = file_wr;
    
  •            return 1;
    
  •    }
    
  • // assume normal files are always readable/writable
    // fake read/write fd_set and return value
    if (rd) *rd = file_rd;

This forum is not affiliated to the Ruby language, Ruby on Rails framework, nor any Ruby applications discussed here.

| Privacy Policy | Terms of Service | Remote Ruby Jobs