commit 8e9f213c887bf366efa7bba90da30bc8d683ccaf Author: Sjoerd Simons Date: Sat Mar 29 18:37:39 2008 +0100 * src/rbgst-element.c: Spawn an extra thread to perform the get_state and set_state functions so the ruby interpreter thread can continue during these functions so it can handle signals which might be needed to complete a state change. * src/rbgst-element.c: Remove stopped?,ready?, paused?, playing? and wait as these aren't thread-safe. The get_state functions should be used to get states and/or wait for state changes diff --git a/gstreamer/src/rbgst-element.c b/gstreamer/src/rbgst-element.c index 84375f7..ba2591c 100644 --- a/gstreamer/src/rbgst-element.c +++ b/gstreamer/src/rbgst-element.c @@ -2,6 +2,7 @@ /* * Copyright (C) 2003, 2004 Laurent Sansonetti * Copyright (C) 2007, 2008 Ruby-GNOME2 Project Team + * Copyright (C) 2006, 2008 Sjoerd Simons * * This file is part of Ruby/GStreamer. * @@ -20,6 +21,8 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ +#include + #include "rbgst.h" #include "rbgst-private.h" @@ -47,6 +50,117 @@ instance2robj(gpointer instance) return rbgst_object_instance2robj(instance); } +struct thread_state_data { + GstElement *element; + GstState state; + GstState pending; + GstState ret; + GstClockTime timeout; + int writefd; +}; + +static void +thread_set_state_function(gpointer data, gpointer user_data) { + struct thread_state_data *sdata = (struct thread_state_data *) data; + + sdata->ret = gst_element_set_state (sdata->element, sdata->state); + write(sdata->writefd, "d", 1); +} + +/* Threaded set_state function */ +static VALUE +rb_gst_element_threaded_set_state(VALUE element, int state) { + int comm[2]; + GstState ret; + struct thread_state_data *data; + GError *error = NULL; + static GThreadPool *pool = NULL; + + if (G_UNLIKELY(pool == NULL)) { + pool = g_thread_pool_new(thread_set_state_function, + NULL, -1, FALSE, &error); + if (pool == NULL) { + rb_bug("Couldn't create rbgst thread: %s", error->message); + } + } + + if (pipe(comm) != 0) { + rb_bug("Unable to create rbgst communication pipe"); + } + + data = g_slice_new(struct thread_state_data); + data->element = RGST_ELEMENT(element); + data->state = state; + data->writefd = comm[1]; + + g_thread_pool_push(pool, data, &error); + if (error != NULL) { + rb_bug("Couldn't create rbgst thread: %s", error->message); + } + rb_thread_wait_fd(comm[0]); + ret = data->ret; + + g_slice_free(struct thread_state_data, data); + close(comm[0]); + close(comm[1]); + + return GENUM2RVAL (ret, GST_TYPE_STATE_CHANGE_RETURN); +} + +static void +thread_get_state_function(gpointer data, gpointer user_data) { + struct thread_state_data *sdata = (struct thread_state_data *) data; + + sdata->ret = gst_element_get_state (sdata->element, + &(sdata->state), + &(sdata->pending), + sdata->timeout); + write(sdata->writefd, "d", 1); +} + +/* Threaded get_state function */ +static VALUE +rb_gst_element_threaded_get_state(VALUE element, GstState *state, + GstState *pending, GstClockTime timeout) { + int comm[2]; + struct thread_state_data *data; + GError *error = NULL; + static GThreadPool *pool = NULL; + GstState ret; + + if (G_UNLIKELY(pool == NULL)) { + pool = g_thread_pool_new(thread_get_state_function, + NULL, -1, FALSE, &error); + if (pool == NULL) { + rb_bug("Couldn't create rbgst thread: %s", error->message); + } + } + + if (pipe(comm) != 0) { + rb_bug("Unable to create rbgst communication pipe"); + } + + data = g_slice_new(struct thread_state_data); + data->element = RGST_ELEMENT(element); + data->timeout = timeout; + data->writefd = comm[1]; + + g_thread_pool_push(pool, data, &error); + if (error != NULL) { + rb_bug("Couldn't create rbgst thread: %s", error->message); + } + rb_thread_wait_fd(comm[0]); + + *state = data->state; + *pending = data->pending; + ret = data->ret; + + g_slice_free(struct thread_state_data, data); + close(comm[0]); + close(comm[1]); + + return GENUM2RVAL (ret, GST_TYPE_STATE_CHANGE_RETURN); +} /* Class: Gst::Element * Base class for all pipeline elements. @@ -56,7 +170,7 @@ instance2robj(gpointer instance) * Method: set_state(state) * state: the state you want to set (see Gst::Element::State). * - * Sets the state of the element. + * Sets the state of the element. * * This method will try to set the requested state by going through all * the intermediary states and calling the class's state change function @@ -65,20 +179,30 @@ instance2robj(gpointer instance) * Returns: a code (see Gst::Element::StateReturn). */ static VALUE -rb_gst_element_set_state(VALUE self, VALUE value) +rb_gst_element_set_state (VALUE self, VALUE value) { - return GENUM2RVAL(gst_element_set_state(SELF(self), - RVAL2GENUM(value, GST_TYPE_STATE)), - GST_TYPE_STATE_CHANGE_RETURN); + const int state = RVAL2GENUM (value, GST_TYPE_STATE); + return rb_gst_element_threaded_set_state(self, state); } -/* Method: state - * Returns: the state of the element (see Gst::Element::State). +/* Method: get_state([timeout]) + * Returns: [Gst::StateChangeReturn, current state, pending state] + * (see Gst::Element::State). */ static VALUE -rb_gst_element_get_state(VALUE self) +rb_gst_element_get_state (int argc, VALUE *argv, VALUE self) { - return GENUM2RVAL(GST_STATE(SELF(self)), GST_TYPE_STATE); + VALUE result, rstate, rpending, timeout; + GstState state; + GstState pending; + rb_scan_args(argc, argv, "01", &timeout); + + result = rb_gst_element_threaded_get_state(self, &state, &pending, + NIL_P(timeout) ? GST_CLOCK_TIME_NONE + : NUM2ULL(timeout)); + rstate = GENUM2RVAL(state, GST_TYPE_STATE); + rpending = GENUM2RVAL(state, GST_TYPE_STATE); + return rb_ary_new3(3, result, rstate, rpending); } /* @@ -91,8 +215,7 @@ rb_gst_element_get_state(VALUE self) static VALUE rb_gst_element_stop (VALUE self) { - return GENUM2RVAL(gst_element_set_state(SELF(self), GST_STATE_NULL), - GST_TYPE_STATE_CHANGE_RETURN); + return rb_gst_element_threaded_set_state(self, GST_STATE_NULL); } /* @@ -105,8 +228,7 @@ rb_gst_element_stop (VALUE self) static VALUE rb_gst_element_ready(VALUE self) { - return GENUM2RVAL(gst_element_set_state(SELF(self), GST_STATE_READY), - GST_TYPE_STATE_CHANGE_RETURN); + return rb_gst_element_threaded_set_state(self, GST_STATE_READY); } /* @@ -119,9 +241,7 @@ rb_gst_element_ready(VALUE self) static VALUE rb_gst_element_pause(VALUE self) { - return GENUM2RVAL(gst_element_set_state(SELF(self), - GST_STATE_PAUSED), - GST_TYPE_STATE_CHANGE_RETURN); + return rb_gst_element_threaded_set_state(self, GST_STATE_PAUSED); } /* @@ -134,63 +254,7 @@ rb_gst_element_pause(VALUE self) static VALUE rb_gst_element_play(VALUE self) { - return GENUM2RVAL(gst_element_set_state(SELF(self), - GST_STATE_PLAYING), - GST_TYPE_STATE_CHANGE_RETURN); -} - -/* Method: stopped? - * Returns: true if the current state is set to Gst::Element::STATE_NULL, - * false otherwise. - */ -static VALUE -rb_gst_element_is_stopped(VALUE self) -{ - return CBOOL2RVAL(GST_STATE(SELF(self)) == GST_STATE_NULL); -} - -/* Method: ready? - * Returns: true if the current state equals Gst::Element::STATE_READY, - * false otherwise. - */ -static VALUE -rb_gst_element_is_ready(VALUE self) -{ - return CBOOL2RVAL(GST_STATE(SELF(self)) == GST_STATE_READY); -} - -/* Method: paused? - * Returns: true if the current state equals Gst::Element::STATE_PAUSED, - * false otherwise. - */ -static VALUE -rb_gst_element_is_paused(VALUE self) -{ - return CBOOL2RVAL(GST_STATE(SELF(self)) == GST_STATE_PAUSED); -} - -/* Method: playing? - * Returns: true if the current state equals Gst::Element::STATE_PLAYING, - * false otherwise. - */ -static VALUE -rb_gst_element_is_playing(VALUE self) -{ - return CBOOL2RVAL(GST_STATE(SELF(self)) == GST_STATE_PLAYING); -} - -/* - * Method: wait - * - * Waits and blocks until the element changed its state. - * - * Returns: self. - */ -static VALUE -rb_gst_element_wait(VALUE self) -{ - GST_STATE_WAIT(SELF(self)); - return Qnil; + return rb_gst_element_threaded_set_state(self, GST_STATE_PLAYING); } /* @@ -776,16 +840,11 @@ Init_gst_element(void) rb_gst_element_each_pad_template, 0); rb_define_method(rb_cGstElement, "set_state", rb_gst_element_set_state, 1); - rb_define_method(rb_cGstElement, "state", rb_gst_element_get_state, 0); + rb_define_method(rb_cGstElement, "get_state", rb_gst_element_get_state, -1); rb_define_method(rb_cGstElement, "stop", rb_gst_element_stop, 0); rb_define_method(rb_cGstElement, "ready", rb_gst_element_ready, 0); rb_define_method(rb_cGstElement, "pause", rb_gst_element_pause, 0); rb_define_method(rb_cGstElement, "play", rb_gst_element_play, 0); - rb_define_method(rb_cGstElement, "stopped?", rb_gst_element_is_stopped, 0); - rb_define_method(rb_cGstElement, "ready?", rb_gst_element_is_ready, 0); - rb_define_method(rb_cGstElement, "paused?", rb_gst_element_is_paused, 0); - rb_define_method(rb_cGstElement, "playing?", rb_gst_element_is_playing, 0); - rb_define_method(rb_cGstElement, "wait", rb_gst_element_wait, 0); rb_define_method(rb_cGstElement, "link", rb_gst_element_link, 1); rb_define_alias(rb_cGstElement, ">>", "link"); rb_define_method(rb_cGstElement, "link_filtered", rb_gst_element_link_filtered, 2);