This is a simple example to illustrate the convenience Ruffus brings to simple tasks in bioinformatics.
- Split a problem into multiple fragments that can be
- Run in parallel giving partial solutions that can be
- Recombined into the complete solution.
- Split each of the four sequences into a separate file.
- Run in parallel Blastall on each sequence file
- Recombine the BLAST results by simple concatenation.
In real life,
- BLAST already provides support for multiprocessing
- Sequence files would be split in much larger chunks, with many sequences
- The jobs would be submitted to large computational farms (in our case, using the SunGrid Engine).
- The High Scoring Pairs (HSPs) would be parsed / filtered / stored in your own formats.
To install Ruffus on most systems with python installed:easy_install -U ruffus
Otherwise, download Ruffus and run:tar -xvzf ruffus-xxx.tar.gz cd ruffus-xxx ./setup install
where xxx is the latest Ruffus version.
We also need to download the human refseq sequence file and format the ncbi database:wget ftp://ftp.ncbi.nih.gov/refseq/H_sapiens/mRNA_Prot/human.protein.faa.gz gunzip human.protein.faa.gz formatdb -i human.protein.faa
We want each of our sequences in the query file original.fa to be placed in a separate files named XXX.segment where XXX = 1 -> the number of sequences.current_file_index = 0 for line in open("original.fa"): # start a new file for each accession line if line == '>': current_file_index += 1 current_file = open("%d.segment" % current_file_index, "w") current_file.write(line)
To use this in a pipeline, we only need to wrap this in a function, “decorated” with the Ruffus keyword @split:This indicates that we are splitting up the input file original.fa into however many *.segment files as it takes.The pipelined function itself takes two arguments, for the input and output.
We shall see later this simple @split decorator already gives all the benefits of:
- Dependency checking
- Flowchart printing
Assuming that blast is already installed, sequence matches can be found with this python code:os.system("blastall -p blastp -d human.protein.faa -i 1.segment > 1.blastResult")
To pipeline this, we need to simply wrap in a function, decorated with the Ruffus keyword @transform.
This indicates that we are taking all the output files from the previous splitFasta operation (*.segment) and @transform-ing each to a new file with the .blastResult suffix. Each of these transformation operations can run in parallel if specified.
- The following python code will concatenate the results together
- output_file = open("final.blast_results", "w") for i in glob("*.blastResults"): output_file.write(open(i).read())
To pipeline this, we need again to decorate with the Ruffus keyword @merge.
This indicates that we are taking all the output files from the previous runBlast operation (*.blastResults) and @merge-ing them to the new file final.blast_results.
We can run the completed pipeline using a maximum of 4 parallel processes by calling pipeline_run :pipeline_run([combineBlastResults], verbose = 2, multiprocess = 4)
Though we have only asked Ruffus to run combineBlastResults, it traces all the dependencies of this task and runs all the necessary parts of the pipeline.
The full code for this example can be found here suitable for pasting into the python command shell.
The verbose parameter causes the following output to be printed to stderr as the pipeline runs:>>> pipeline_run([combineBlastResults], verbose = 2, multiprocess = 4) Job = [original.fa -> *.segment] completed Completed Task = splitFasta Job = [1.segment -> 1.blastResult] completed Job = [3.segment -> 3.blastResult] completed Job = [2.segment -> 2.blastResult] completed Job = [4.segment -> 4.blastResult] completed Completed Task = runBlast Job = [[1.blastResult, 2.blastResult, 3.blastResult, 4.blastResult] -> final.blast_results] completed Completed Task = combineBlastResults
If we invoked pipeline_run again, nothing further would happen because the pipeline is now up-to-date. But what if the pipeline had not run to completion?
We can simulate the failure of one of the blastall jobs by deleting its results:os.unlink("4.blastResult")
Let us use the pipeline_printout function to print out the dependencies of the pipeline at a high verbose level which will show both complete and incomplete jobs:>>> import sys >>> pipeline_printout(sys.stdout, [combineBlastResults], verbose = 4) ________________________________________ Tasks which are up-to-date: Task = splitFasta "Split sequence file into as many fragments as appropriate depending on the size of original_fasta" ________________________________________ Tasks which will be run: Task = runBlast "Run blast" Job = [4.segment ->4.blastResult] Job needs update: Missing file 4.blastResult Task = combineBlastResults "Combine blast results" Job = [[1.blastResult, 2.blastResult, 3.blastResult, 4.blastResult] ->final.blast_results] Job needs update: Missing file 4.blastResult ________________________________________
Only the parts of the pipeline which involve the missing BLAST result will be rerun. We can confirm this by invoking the pipeline.>>> pipeline_run([combineBlastResults], verbose = 2, multiprocess = 4) Job = [1.segment -> 1.blastResult] unnecessary: already up to date Job = [2.segment -> 2.blastResult] unnecessary: already up to date Job = [3.segment -> 3.blastResult] unnecessary: already up to date Job = [4.segment -> 4.blastResult] completed Completed Task = runBlast Job = [[1.blastResult, 2.blastResult, 3.blastResult, 4.blastResult] -> final.blast_results] completed Completed Task = combineBlastResults