Msg Queues and Watcher Threads

I like the use of the watcher thread in pkt.demod_pkts, and was
wondering if I could use a similar technique in the following
situation:

Two disconnected subgraphs:
msg_source_1 -> transform_blk_1 -> msg_sink_1

msg_source_2 -> modulator

The user places ‘m’-byte messages in msg_source_1.
transform_blk consumes ‘m’ bytes and produces ‘n’ different bytes.
A queue watcher thread takes the ‘n’-byte messages from msg_sink_1,
applies a second transform, and places the result in msg_source_2.

I want to use this for encoding packets with FEC (transform_blk_1 is a
viterbi_encoder, and the queue watcher thread calls
packet_utils.make_packet(msg) when msg = self.msg_queue.delete_head()
unblocks).
Does this seem reasonable?

Is it guaranteed that x messages into msg_source_1 produces x messages
into msg_source_2? (In other words, is there any danger of messages
being combined?)

Also, if transform_blk_1 has some saved state that I’d like to reset
in between packets, is it possible/safe to do so from the Queue
watcher thread? I’m worried about race conditions…

-Steven

On Tue, Mar 4, 2008 at 2:27 PM, Steven C. [email protected]
wrote:

transform_blk consumes ‘m’ bytes and produces ‘n’ different bytes.
into msg_source_2? (In other words, is there any danger of messages
being combined?)

Also, if transform_blk_1 has some saved state that I’d like to reset
in between packets, is it possible/safe to do so from the Queue
watcher thread? I’m worried about race conditions…

-Steven

I wrote some code, and am seeing some message queue / message sink
related weirdness.
snippets:

class my_top_block(gr.top_block):
def init(self):
gr.top_block.init(self)

    f=trellis.fsm(fsm_pn)

    pkt_input = gr.message_source(gr.sizeof_char, msg_queue_limit)
    self.input_queue = pkt_input.msgq()
    enc = trellis.encoder_bb(f,0) # initial state = 0
    joiner = gr.unpacked_to_packed_bb(2,gr.GR_MSB_FIRST)
    #splitter = gr.packed_to_unpacked_bb(1,gr.GR_MSB_FIRST)

    self.output_queue = gr.msg_queue(msg_queue_limit)
    pkt_output = gr.message_sink(gr.sizeof_char, self.output_queue, 

True)

    self.connect(pkt_input, enc, joiner, pkt_output)
    #self.connect(pkt_input, enc, joiner, splitter, pkt_output)

def main():
f = open(‘…/pn15.dat’) #(2^15-1 bytes of 0x00 or 0x01)
d = f.read()
f.close()

tb = my_top_block()
iq = tb.input_queue
oq = tb.output_queue

tb.start()

for p in gen_packets(d, num_uncoded_bits_per_packet, num_packets):
    iq.insert_tail(gr.message_from_string(p))
    msg = oq.delete_head()
    data = msg.to_string()
    print len(data)
    #print_pkt(data)

iq.insert_tail(gr.message(1))

tb.wait()

print 'Exiting.'

As a result of the “joiner” block, I expect to see a 4:1 rate
reduction in bytes. So when I put 10 messages of length 1024 into the
input queue, I expect the output queue to get 10 messages of length
256. Instead I see output message lengths:
255
1
511
1
511
1
511
1
511
1

Any idea what is going on? Full code is attached.