This is one of the first articles i have written in preparation for the launch of my new site redesign and i thought i would cover something that has little documentation on the Internet. If you want the quick answer feel free to jump to the Examples section or take a peak at the How It Works section. Apart from that feel free to listen to my take/rant on the situation with this semi undocumented API.
The signal module in Python provides a way to receive a notification of a POSIX signal over a FD using a pipe via signal.set_wakeup_fd(fd). Unfortunately the documentation is a tiny bit lacking, the unit tests don’t reveal much info and the doc string leaves a lot to be desired. Couple this with the lack of articles on it and it can be frustrating to try and decipher how to use what seems like a handy thing to have.
While set_wakeup_fd is documented here, it is hard to find in google (10 pages deep for me) and easy to overlook in the search results, the search results tend to be cluttered with unit tests that reveal how to set it up but not use this feature. I still have no idea why the doc string did not contain more information or why it only sends “0x00” bytes across the pipe instead of the signal number.
Python implements set_wakeup_fd by writing a single byte (0x00) to the write end of a pipe upon signal reception. the file descriptor it writes to is set up by calling signal.set_wakeup_fd, However the file descriptor must be set to non blocking mode (os.O_NONBLOCK). Once this is done you are able to poll the read end of the pipe for the byte to indicate a signal has been received. While not ideal from a performance point of view it is a nice portable workaround and easy to back port to previous version of Python should you require it (earlier than 2.6).
To use set_wakeup_fd you will need to do the following:
that you must clear the event by reading from the pipe
You can just read the file descriptor using os.read rather than polling with one of the polling methods however the signal module already provides pause method which may be more appropriate.
One alternative if you are dealing with multiple signals and need a way to tell them apart is to add your own signal handler rather a the dummy and write a byte to the write end of the pipe with the signal number instead of a null byte. I have included a quick example bellow in the second example.
I am partial to the second solution of writing your own handler rather than relying on Python to do so as it gives you the extra flexibility to add some extra code that is time critical in the handler itself.
Even with its limitations there are a couple of uses where set_wakeup_fd is a perfect fit:
I hope this helps at least one other person from wasting a couple of days investigating how set_wakeup_fd works and allows them to get on with cutting code or working out a better way to do this than the standard library. If you have any updated code feel free to submit it and i will update the article or if you know of alternatives, let me know so i can pass this on to others.
All code examples shown below are placed in the public domain, chuck them in a library, use them in your program, add them as a kill switch to SKYNET.
#!/usr/bin/env python from time import sleep import select import signal import fcntl import os pipe_r, pipe_w = os.pipe() flags = fcntl.fcntl(pipe_w, fcntl.F_GETFL, 0) flags = flags | os.O_NONBLOCK flags = fcntl.fcntl(pipe_w, fcntl.F_SETFL, flags) signal.set_wakeup_fd(pipe_w) # Mask out signal handler signal.signal(signal.SIGALRM, lambda x,y: None) # Set up a signal to repeat every 2 seconds signal.setitimer(signal.ITIMER_REAL, 2, 2) poller = select.epoll() poller.register(pipe_r, select.EPOLLIN) # Begin Bad joke print "Main screen turn on" while True: try: events = poller.poll() for fd, flags in events: print "We get Signal" # read a single event os.read(pipe_r, 1) except IOError: pass
#!/usr/bin/env python """A set_wakeup_fd clone with slightly more functionality than offered by the standard library""" from time import sleep import select import signal import fcntl import os def handle_signal(sig, fd, func=lambda x,y: False): """Automatically send the signal number down a pipe when a signal occurs This function will automatically change the write end to non blocking mode and set up the signal handler for you If your signal handler (optionally passed in as func) returns True then the write to the pipe will be suppressed """ flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) flags = flags | os.O_NONBLOCK flags = fcntl.fcntl(fd, fcntl.F_SETFL, flags) def signalfd_handler(signal, frame): """Defined inside handle_signal""" val = func(signal, frame) if not val: os.write(fd, chr(signal)) signal.signal(sig, signalfd_handler) pipe_r, pipe_w = os.pipe() handle_signal(signal.SIGALRM, pipe_w) # Set up a signal to repeat every 2 seconds signal.setitimer(signal.ITIMER_REAL, 2, 2) poller = select.epoll() poller.register(pipe_r, select.EPOLLIN) # Begin Bad joke print "Main screen turn on" while True: try: events = poller.poll() for fd, flags in events: print "We get Signal:", ord(os.read(pipe_r, 1)) except IOError: pass