

Lastly we place a poll that waits for a ctrl-C kill command and if it recieves one shuts everything down gracefully.ĭef process_load_queue ( q ): '''This is the worker thread function. Lets start with the main bit, here we instantiate a Queue object, a thread object and run it in daemon (detached) mode and instantiate a watchdog.
#Python queue code#
The code is in a git repo here with a simple working example. Queues operate as the conduits through which threads (or processes) communicate. Given our jobs queue will be lightweight and memory non-intensive a thread is the perfect option. Threading is one approach to achieving parallelism and concurrency in Python (the other major one being multiprocessing). The threading and queue libraries provides a high level interface to manage threads. All these tools detect changes in a directory structure and then perform a set action when a change is detected (for instance if a file is modified do X if it is deleted do Y). It’s a Python api layered over one of a number of watchdog tools depending on the OS you’re using. I havn’t used the Python watchdog library before but it’s pretty slick. There are three key libraries that are used in our watchdog jobs queue: This triggers the watchdog to place a load job in a waiting queue When a data set is ready to be loaded into the database a trigger file will be created in the directory. The basic idea of the jobs queue is that we set up a background process to monitor a directory for changes. What we really need is a way to decouple the extract-transform from the load… Enter our Watchdog powered job queue However we obviously don’t want to constrain the entire ETL pipeline to a single process (the database load is only one of many processes that will utilise the transformed data).


For a robustness there is a real need to serialize the database load component of the pipeline. The database, while having a pretty damn fast write speed, doesn’t seem to handle lots of concurrent write processes too well (if the DB is hit too hard it will rollback one or all jobs complaing of concurrency conflicts). Once in production this pipeline will be handling 100’s of individual files (each will be loaded as it’s own separate table), ranging in sizes from 10MB through to ~0.5TB.
#Python queue software#
Currently I am building an ETL pipeline that ingests some god-awful proprietary software data format type, decodes it into something useful, performs a number of validation and cleansing steps and then loads it into a speedy columnar database ready for some interesting analysis to be done. So here’s something cool I made this afternoon at work.
