commit 814a8c57d82f68d29e3b3756fa315f7d843d990d Author: Sjoerd Simons Date: Sun Mar 30 16:17:03 2008 +0200 * glib/src/rbgutil_callback.c: Avoid using rb_ functions in non-ruby threads and trigger signal handling by writing to one side of an IO.pipe * gstreamer/test/test_thread_handling.rb: Add a test to show that signals from threads are handled when needed diff --git a/glib/src/rbgutil_callback.c b/glib/src/rbgutil_callback.c index b6c8548..a11dcfe 100644 --- a/glib/src/rbgutil_callback.c +++ b/glib/src/rbgutil_callback.c @@ -10,6 +10,7 @@ **********************************************************************/ +#include #include "rbgprivate.h" #ifndef HAVE_RB_ERRINFO @@ -51,6 +52,8 @@ typedef struct _CallbackRequest { static GMutex *callback_dispatch_thread_mutex = NULL; static GAsyncQueue *callback_request_queue = NULL; static ID id_callback_dispatch_thread; +static ID id_callback_dispatch_write_io; +static int callback_dispatch_write_fd = -1; static VALUE exec_callback(VALUE data) @@ -77,36 +80,41 @@ rbgutil_callback_dispatch_thread(void) } static VALUE -mainloop(void) +mainloop(VALUE read_io) { + int read_fd = NUM2INT(rb_funcall(read_io, rb_intern("fileno"), 0)); for (;;) { CallbackRequest *request; + char buf[1]; - if (g_async_queue_length(callback_request_queue) <= 0) - rb_thread_sleep_forever(); + rb_thread_wait_fd(read_fd); + /* Every request will write exactly one byte per item on the async + * queue */ + read(read_fd, buf, 1); request = g_async_queue_pop(callback_request_queue); if (!request) break; rb_thread_create(process_request, request); - rb_thread_schedule(); } + rb_funcall(read_io, rb_intern("close"), 0); return Qnil; } static VALUE invoke_callback_in_ruby_thread(VALUE (*func)(VALUE), VALUE arg) { - VALUE callback_dispatch_thread; + /* This runs in the context of a non-native ruby thread, no rb_ functions + * should be called here. */ CallbackRequest request; - callback_dispatch_thread = rbgutil_callback_dispatch_thread(); - if (NIL_P(callback_dispatch_thread)) - rb_raise(rbgutil_eGLibCallbackNotInitializedError, - "Please call rbgutil_start_callback_dispatch_thread() " - "to dispatch a callback from non-ruby thread before " - "callbacks are requested from non-ruby thread."); + g_mutex_lock(callback_dispatch_thread_mutex); + + if (callback_dispatch_write_fd == -1) + g_error("Please call rbgutil_start_callback_dispatch_thread() " + "to dispatch a callback from non-ruby thread before " + "callbacks are requested from non-ruby thread."); request.function = func; request.argument = arg; @@ -115,8 +123,13 @@ invoke_callback_in_ruby_thread(VALUE (*func)(VALUE), VALUE arg) request.done_cond = g_cond_new(); g_mutex_lock(request.done_mutex); - rb_thread_wakeup(callback_dispatch_thread); + + /* Post our request and unlock the dispatch thread for other signals. As + * nobody else knew about request.done_mutex yet this is safe */ + write (callback_dispatch_write_fd, "R", 1); g_async_queue_push(callback_request_queue, &request); + g_mutex_unlock(callback_dispatch_thread_mutex); + g_cond_wait(request.done_cond, request.done_mutex); g_mutex_unlock(request.done_mutex); @@ -152,9 +165,16 @@ rbgutil_start_callback_dispatch_thread(void) g_mutex_lock(callback_dispatch_thread_mutex); callback_dispatch_thread = rbgutil_callback_dispatch_thread(); if (NIL_P(callback_dispatch_thread)) { - callback_dispatch_thread = rb_thread_create(mainloop, NULL); + VALUE pipe = rb_funcall(rb_cIO, rb_intern("pipe"), 0); + + callback_dispatch_thread = + rb_thread_create(mainloop, (void *)rb_ary_entry(pipe, 0)); rb_ivar_set(mGLib, id_callback_dispatch_thread, callback_dispatch_thread); + rb_ivar_set(mGLib, id_callback_dispatch_write_io, + rb_ary_entry(pipe, 1)); + callback_dispatch_write_fd = + NUM2INT(rb_funcall(rb_ary_entry(pipe, 1), rb_intern("fileno"), 0)); } g_mutex_unlock(callback_dispatch_thread_mutex); #endif @@ -169,9 +189,15 @@ rbgutil_stop_callback_dispatch_thread(void) g_mutex_lock(callback_dispatch_thread_mutex); callback_dispatch_thread = rbgutil_callback_dispatch_thread(); if (!NIL_P(callback_dispatch_thread)) { - rb_thread_wakeup(callback_dispatch_thread); + write (callback_dispatch_write_fd, "D", 1); g_async_queue_push(callback_request_queue, NULL); + rb_ivar_set(mGLib, id_callback_dispatch_thread, Qnil); + + rb_funcall(rb_ivar_get(mGLib, id_callback_dispatch_write_io), + rb_intern("close"), 0); + rb_ivar_set(mGLib, id_callback_dispatch_write_io, Qnil); + callback_dispatch_write_fd = -1; } g_mutex_unlock(callback_dispatch_thread_mutex); #endif @@ -189,6 +215,7 @@ void Init_gutil_callback() g_thread_init(NULL); id_callback_dispatch_thread = rb_intern("callback_dispatch_thread"); + id_callback_dispatch_write_io = rb_intern("callback_dispatch_write_io"); rb_ivar_set(mGLib, id_callback_dispatch_thread, Qnil); callback_request_queue = g_async_queue_new(); diff --git a/gstreamer/test/test_thread_handling.rb b/gstreamer/test/test_thread_handling.rb new file mode 100644 index 0000000..f373ff9 --- /dev/null +++ b/gstreamer/test/test_thread_handling.rb @@ -0,0 +1,27 @@ +class TestThreadHandling < Test::Unit::TestCase + def test_scheduling + pipeline = Gst::Pipeline.new + buffers_seen = 0; + + src = Gst::ElementFactory.make("videotestsrc") + identity = Gst::ElementFactory.make("identity") + sink = Gst::ElementFactory.make("fakesink") + caps = Gst::Caps.parse("video/x-raw-yuv, framerate=\(fraction\)30/1") + + src.live = true + identity.signal_handoffs = true + + identity.signal_connect("handoff") { | element, buffer | + buffers_seen += 1 + } + + pipeline.add(src, identity, sink) + src.link_filtered(identity, caps); + identity >> sink + + pipeline.play + sleep 1 + assert(buffers_seen > 15, + "Signals not handled often enough only #{buffers_seen} buffers seen") + end +end