Virtual memory problem assumably caused by threaded process

Hi all,

the receiver of a point-to-point communication system is causing some
memory
trouble.

rx_graph() architecture:
usrp.board - some signal processing blocks - packet sink

Arriving packets are fed to a message_queue. A thread watches this
queue and arriving packets are proceeded to main().

Pseudo-code of the receiver:

class rx_graph (gr.flow_graph):
def init(self, callback):
gr.flow_graph.init(self)
self.dst_queue = gr.msg_queue(1)
self._watcher = _queue_watcher_thread (self.dst_queue, callback)

def configure(self, _k, _RXfreq):
‘all signal processing blocks are connect here…’

class _queue_watcher_thread(_threading.Thread):
def init(self, rcvd_pktq, callback):
_threading.Thread.init(self)
self.setDaemon(1)
self.rcvd_pktq = rcvd_pktq
self.callback = callback
self.keep_running = True
self.start()

   def run(self):
       while self.keep_running:
       msg = self.rcvd_pktq.delete_head()
             payload = msg.to_string()

             if self.callback:
                 self.callback(payload, bits_correct, bits_wrong)

main():
def callback(payload, bits_correct, bits_wrong):
“payload is processed here”

while True:
# a new object of rx_graph is created after every 100 transmitted
# packets in order to perform rate adaptation
rx = rx_graph(callback) # creates an object of rx_graph
# receiving modulation and carrier frequency via TCP socket"
# modulation order and carrier frequency have been separately
# transmitted with TCP before starting to receive payload packets
rx.configure( modulation_order, RXfreq )
rx.start()
…now receiving packets…
rx.stop()
rx = 0 # <------------ works somehow, but this does not free the
# memory and stop the thread!?

The application works as I wish, but some memory is not freed properly.
For some unknown reason virtual memory usage is continously increasing
and
the python application crashes after exceeding 1.6GB with the following
error message:

File “./GA_receiver_ver7.py”, line 237, in
main()
File “./GA_receiver_ver7.py”, line 192, in main
rx.start()
File
“/usr/local/lib/python2.5/site-packages/gnuradio/gr/flow_graph.py”,
line 93, in start
self.scheduler.start ()
File
“/usr/local/lib/python2.5/site-packages/gnuradio/gr/scheduler.py”,
line 57, in start
thread.start()
File “/usr/lib/python2.5/threading.py”, line 434, in start
_start_new_thread(self.__bootstrap, ())
thread.error: can’t start new thread
Exception exceptions.AssertionError: AssertionError(‘cannot join thread
before it is started’,) in <bound method rx_graph.del of
<main.rx_graph object at 0x82903ec>> ignored

I wonder if the _queue_watcher_thread is closed each time at the end of
the
while loop in main properly when I set the object rx=0. Do I need to
consider some memory deallocation issues?

I greatly appreciate any kind of help.

The full receiver script can be seen here:
http://www.mobnets.rwth-aachen.de/~ade/cso/GA_receiver_ver7.py

Thanks, Chris


Christian Sokolowski Office: +49(0)-240-7575-7043
RWTH Aachen FAX: +49(0)-240-7575-7050
Wireless Networks Dept
Kackertstrasse 9 E-mail: [email protected]
D-52072 Aachen Web: http://www.mobnets.rwth-aachen.de


On Fri, Aug 10, 2007 at 07:41:04PM +0200, Christian Sokolowski wrote:

   def run(self):

        ...now receiving packets...
 main()

thread.error: can’t start new thread

The full receiver script can be seen here:
http://www.mobnets.rwth-aachen.de/~ade/cso/GA_receiver_ver7.py

Thanks, Chris

Hi Chris,

I think that you are right that something is leaking memory, and that
it could be the _queue_watcher_thread. You should be able to confirm
the failure to delete the thread by watching with ps. The threads
associated with python will continue to grow.

To get info about threads on GNU/Linux:
ps -eLf
ps axms

Other things to check:

Is anybody is setting the _queue_watcher_thread self.keep_running to
False?

The loop could be hung in the rcvd_pktq.delete_head(). Somebody may
need to to insert_tail a special message that says “exit loop”.

def run(self):
    while self.keep_running:
        msg = self.rcvd_pktq.delete_head()
        ok, payload = packet_utils.unmake_packet(msg.to_string(), 

int(msg.arg1()))
if self.callback:
self.callback(ok, payload)

Thanks for looking at this,
Eric

   def run(self):

consider some memory deallocation issues?
On Aug 13, 2007, at 3:13 AM, Eric B. wrote:

Other things to check:
msg = self.rcvd_pktq.delete_head()
ok, payload = packet_utils.unmake_packet(msg.to_string
(), int(msg.arg1()))
if self.callback:
self.callback(ok, payload)

I investigated this earlier this year; I don’t know if it caused a
memory lead or not, but it did cause other issues. The issue of
multiple threads makes a difference when running a python script from
another python script: if the extra thread isn’t stopped properly,
then control won’t be returned to the calling script. The solution
is for the owning thread to “join” the other thread. Here is example
code for this particular application. You’ll need to find a way to
call “stop()” (or whatever you choose to name this method)
appropriately since it won’t be called by default when doing fg.stop
(). The “best” solution would be to create a “stop” method in
“rx_graph” (or a method inheriting from it) which first calls
“_queue_watcher_thread”.stop(), then “inherited_fg”.stop(). Note
that the “_queue_watcher_thread” must be called first, while data is
still being received, so that the thread doesn’t hang waiting for a
new received packet.

class _queue_watcher_thread(_threading.Thread):
def init(self, rcvd_pktq, callback):
_threading.Thread.init(self)
self.setDaemon(1)
self.rcvd_pktq = rcvd_pktq
self.callback = callback
self.keep_running = True
self.start()

 def run (self):
     while (self.keep_running):
      msg = self.rcvd_pktq.delete_head()
            payload = msg.to_string()
            if self.callback:
                self.callback(payload, bits_correct, bits_wrong)

 def stop (self):

tell the thread to not keep running

self.keep_running = False

join the thread to wait for it to finish

self.join ()

Hope this helps and/or makes sense. - MLD