Allows debugging of child subprocesses, author of script is Bertrand Janin. (Yes, I know that some IDE has own way: PyCharm for example, but this is IDE-independ way).
Next script must be put into Libs of Python install dir, then in app type somewhere:
import rpdb
rpdb.set_trace()
And run telnet to 127.0.0.1:4444 and get (pdb) console in telnet window.
"""Remote Python Debugger (pdb wrapper)."""
__author__ = "Bertrand Janin <b@janin.com>"
__version__ = "0.1.6"
import pdb
import socket
import threading
import signal
import sys
import traceback
from functools import partial
DEFAULT_ADDR = "127.0.0.1"
DEFAULT_PORT = 4444
class FileObjectWrapper(object):
def __init__(self, fileobject, stdio):
self._obj = fileobject
self._io = stdio
def __getattr__(self, attr):
if hasattr(self._obj, attr):
attr = getattr(self._obj, attr)
elif hasattr(self._io, attr):
attr = getattr(self._io, attr)
else:
raise AttributeError("Attribute %s is not found" % attr)
return attr
class Rpdb(pdb.Pdb):
def __init__(self, addr=DEFAULT_ADDR, port=DEFAULT_PORT):
"""Initialize the socket and initialize pdb."""
# Backup stdin and stdout before replacing them by the socket handle
self.old_stdout = sys.stdout
self.old_stdin = sys.stdin
self.port = port
# Open a 'reusable' socket to let the webapp reload on the same port
self.skt = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.skt.bind((addr, port))
self.skt.listen(1)
# Writes to stdout are forbidden in mod_wsgi environments
try:
sys.stderr.write("pdb is running on %s:%d\n"
% self.skt.getsockname())
except IOError:
pass
(clientsocket, address) = self.skt.accept()
handle = clientsocket.makefile('rw')
pdb.Pdb.__init__(self, completekey='tab',
stdin=FileObjectWrapper(handle, self.old_stdin),
stdout=FileObjectWrapper(handle, self.old_stdin))
sys.stdout = sys.stdin = handle
OCCUPIED.claim(port, sys.stdout)
def shutdown(self):
"""Revert stdin and stdout, close the socket."""
sys.stdout = self.old_stdout
sys.stdin = self.old_stdin
OCCUPIED.unclaim(self.port)
self.skt.close()
def do_continue(self, arg):
"""Clean-up and do underlying continue."""
try:
return pdb.Pdb.do_continue(self, arg)
finally:
self.shutdown()
do_c = do_cont = do_continue
def do_quit(self, arg):
"""Clean-up and do underlying quit."""
try:
return pdb.Pdb.do_quit(self, arg)
finally:
self.shutdown()
do_q = do_exit = do_quit
def do_EOF(self, arg):
"""Clean-up and do underlying EOF."""
try:
return pdb.Pdb.do_EOF(self, arg)
finally:
self.shutdown()
def set_trace(addr=DEFAULT_ADDR, port=DEFAULT_PORT, frame=None):
"""Wrapper function to keep the same import x; x.set_trace() interface.
We catch all the possible exceptions from pdb and cleanup.
"""
try:
debugger = Rpdb(addr=addr, port=port)
except socket.error:
if OCCUPIED.is_claimed(port, sys.stdout):
# rpdb is already on this port - good enough, let it go on:
sys.stdout.write("(Recurrent rpdb invocation ignored)\n")
return
else:
# Port occupied by something else.
raise
try:
debugger.set_trace(frame or sys._getframe().f_back)
except Exception:
traceback.print_exc()
def _trap_handler(addr, port, signum, frame):
set_trace(addr, port, frame=frame)
def handle_trap(addr=DEFAULT_ADDR, port=DEFAULT_PORT):
"""Register rpdb as the SIGTRAP signal handler"""
signal.signal(signal.SIGTRAP, partial(_trap_handler, addr, port))
def post_mortem(addr=DEFAULT_ADDR, port=DEFAULT_PORT):
debugger = Rpdb(addr=addr, port=port)
type, value, tb = sys.exc_info()
traceback.print_exc()
debugger.reset()
debugger.interaction(None, tb)
class OccupiedPorts(object):
"""Maintain rpdb port versus stdin/out file handles.
Provides the means to determine whether or not a collision binding to a
particular port is with an already operating rpdb session.
Determination is according to whether a file handle is equal to what is
registered against the specified port.
"""
def __init__(self):
self.lock = threading.RLock()
self.claims = {}
def claim(self, port, handle):
self.lock.acquire(True)
self.claims[port] = id(handle)
self.lock.release()
def is_claimed(self, port, handle):
self.lock.acquire(True)
got = (self.claims.get(port) == id(handle))
self.lock.release()
return got
def unclaim(self, port):
self.lock.acquire(True)
del self.claims[port]
self.lock.release()
# {port: sys.stdout} pairs to track recursive rpdb invocation on same port.
# This scheme doesn't interfere with recursive invocations on separate ports -
# useful, eg, for concurrently debugging separate threads.
OCCUPIED = OccupiedPorts()