Forum: GNU Radio Python block composed of other blocks

Posted by maiconkist (Guest)
on 2013-01-30 14:27
(Received via mailing list)
Hi list,

I'm trying to write a block in python.

What I want to do is create a block that passes the signal through some 
well
know blocks (such as FFT), do its processing and output the signal.

Basically, what I did so far is:

#####
class energy_detector(gr.hier_block2):
  class energy_calculator(gr.sync_block):
    """ Nested class
    """
    def __init__(self, fft_size):
      gr.sync_block.__init__(
          self,
          name = "energy_detector",
          in_sig =  [np.dtype((np.float32, fft_size))],
          out_sig = [np.float32]
          )

      def work(self, input_items, output_items):
        """ Work here
                                """

        return len(out)

    """
    Main block
    """
  def __init__(self, fft_size):
    """
    Constructor
    """

    """ Initialize our energy detector block
      in_sig:  vector of fft_size elements
      out_sig: one float for each input vector
    """
    gr.hier_block2.__init__(
        self,
        name = "energy_detector",
        in_sig =  [np.dtype((np.float32, fft_size))],
        out_sig = [np.float32]
        )

    #
    # Blocks
    #
    self.fft  =  fft. fft_vcc(fft_size, True, [], False)
    self.v2s = gr.vector_to_stream(gr.sizeof_gr_complex, fft_size)
    self.c2f = gr.complex_to_float()
    self.s2v = gr.stream_to_vector(gr.sizeof_float, fft_size)
    self.ec  = self.energy_calculator(fft_size)

    # Flow graph
    self.connect(self, self.fft, self.v2s, self.c2f, self.s2v, self.ec, 
self)
########

But it does not work. What can I do to achieve the result I need ?

Thanks



--
View this message in context: 
http://gnuradio.4.n7.nabble.com/Python-block-compo...
Sent from the GnuRadio mailing list archive at Nabble.com.
Posted by Tom Rondeau (Guest)
on 2013-01-30 14:38
(Received via mailing list)
On Wed, Jan 30, 2013 at 8:26 AM, maiconkist <maiconkist@gmail.com> 
wrote:

> #####
>                                         out_sig = [np.float32]
>                 """
>                                 self,
>                 self.v2s = gr.vector_to_stream(gr.sizeof_gr_complex,
> But it does not work. What can I do to achieve the result I need ?
>
> Thanks
>

Sorry, but "it does not work" is not a very useful way to get us to help
you. What about it does not work? What are the error messages, if any, 
that
you are seeing. What steps have you taken after writing the code to test
it? What version of GNU Radio are you using?

You need to help us help you.

Tom
Posted by maiconkist (Guest)
on 2013-01-30 15:20
(Received via mailing list)
Hi Tom,

I simply try to instantiate a object of class energy_detector and got 
the
following error:

TypeError: __init__() got an unexpected keyword argument 'in_sig'

I imagine that a gr.hier_block2 doesn't have such input. I dont know 
what
can be the superclass of my "top block".



--
View this message in context: 
http://gnuradio.4.n7.nabble.com/Python-block-compo...
Sent from the GnuRadio mailing list archive at Nabble.com.
Posted by maiconkist (Guest)
on 2013-01-30 21:32
(Received via mailing list)
I fixed the problem I had by changing in_sing for "input_signature".

But now my testbench never finish executing and print a value that is 
the 2x
the correct value.

My energy_detector code is as follows:

#####
import numpy as np

from gnuradio import gr, blks2

from gnuradio import fft
from gnuradio import uhd

class energy_detector(gr.hier_block2):
  class energy_calculator(gr.sync_block):
    """ Nested class
    """
    def __init__(self, fft_size):
      gr.sync_block.__init__(
          self,
          name = "energy_detector",
          in_sig =  [np.dtype((np.float32, fft_size))],
          out_sig = [np.float32]
          )

    def work(self, input_items, output_items):
      """
      Energy calculator main function
      """
      in0 = input_items[0][:]
      out = output_items[0][:]

      out[:] = np.sum( np.square(in0) )
      print "energy = %f" % out[0]

      self.consume_each(len(in0))

      return len(out)


  def __init__(self, fft_size):
    """
    Constructor
    """

    """ Initialize our energy detector block
      in_sig:  vector of fft_size elements
      out_sig: one float for each input vector
    """
    gr.hier_block2.__init__(
        self,
        name = "energy_detector",
        input_signature =  gr.io_signature(1, 1, fft_size * 
gr.sizeof_float),
        output_signature = gr.io_signature(1, 1, gr.sizeof_float)
        #in_sig =  [np.dtype((np.float32, fft_size))],
        #out_sig = [np.float32]
        )

    #
    # Blocks
    #
    self.fft = fft.fft_vfc(fft_size, True, [], False)
    self.v2s = gr.vector_to_stream(gr.sizeof_gr_complex, fft_size)
    self.c2f = gr.complex_to_float()
    self.s2v = gr.stream_to_vector(gr.sizeof_float, fft_size)
    self.ec  = self.energy_calculator(fft_size)

    # Flow graph
    self.connect(self, self.fft, self.v2s, self.c2f, self.s2v, self.ec, 
self)

#####


And my testbench do this:

####
    src_data = (1, 1)
    expected_result = np.sum(np.square(src_data))

    # blocks
    src = gr.vector_source_f (src_data)
    s2v = gr.stream_to_vector(gr.sizeof_float, len(src_data))
    ed = energy_detector(len(src_data))
    v2s = gr.vector_to_stream(gr.sizeof_float, 1)
    dst = gr.vector_sink_f ()

    # flowgraph
    self.tb.connect (src, s2v, ed, v2s, dst)
    self.tb.run ()

    result_data = dst.data ()
    self.assertEqual (expected_result, result_data)

####

Any suggestion ?




--
View this message in context: 
http://gnuradio.4.n7.nabble.com/Python-block-compo...
Sent from the GnuRadio mailing list archive at Nabble.com.
Please log in before posting. Registration is free and takes only a minute.
Existing account (Switch to SSL-encrypted connection)
NEW: Do you have a Google/GoogleMail or Yahoo account? No registration required!
Log in with Google account | Log in with Yahoo account
No account? Register here.