Messages get lost randomly

Hi Guys,

I am using the messaging support of basic_block registering the
message queue with message_port_register_in and removed the obtained
messages with delete_head_blocking. This block is producing streams
and everything seems to work fine as long as I consume the messages
fast enough. However, if I put a throttle block after my stream
producing block, then messages get lost.

The producer generates 10000 messages with message_port_pub, but the
consumer does not receive all of those messages. I intentionally wait
500 ms on the producer for the consumer to be started (otherwise the
consumer might not get registered in time). So I have verified that
the problem is not in my code, but somewhere the messages get lost.

As far as I see in the basic_block code these message queues are not
regular msg_queue objects but std::deque objects protected by locks. I
do not see how messages could get lost. Any ideas?

Miklos

Hi Guys,

Ok, I have found out how to make it work reliably. You must register a
listener with set_msg_handler, and then you will get the missing
messages there.

In light of this, I do not see how pdu_to_tagged_stream could work
reliably. It does not register a listener (but it does not block
either), so if there is a gap in the message stream, then it will miss
that message.

Miklos

On Fri, Feb 21, 2014 at 11:55 PM, Miklos M.

On 02/22/2014 12:40 AM, Miklos M. wrote:

Miklos

Miklos,

that is right–a thorough discussion of message passing is high on our
agenda at the moment. pdu_to_tagged_stream is a bit of a special case,
since the only thing to do with a rx’d message is to put it onto the
output buffer, which is only available in work(), i.e., if we had a msg
handler, all it could do is re-buffer the msg before we can copy it into
the output buffer. One thing we did very recently was increase the size
of the msg q, so msg loss would be less likely.

MB

Hi Martin,

On Sat, Feb 22, 2014 at 1:44 PM, Martin B. [email protected]
wrote:

that message.
the output buffer. One thing we did very recently was increase the size
of the msg q, so msg loss would be less likely.

My block does exactly the same: copies messages to a stream. In the
handler I just queue it again (no copying, so no memory overhead), and
in work first I check my private queue, if that is empty then I wait
for a new message with delete_head_blocking. It works perfectly
(provided you have enough ram to handle the bursts of messages).

Miklos

This forum is not affiliated to the Ruby language, Ruby on Rails framework, nor any Ruby applications discussed here.

| Privacy Policy | Terms of Service | Remote Ruby Jobs