Chapter 2: Transforming data in a pipeline with @transform¶
Remember to look at the example code:
Computational pipelines transform your data in stages until the final result is produced. Ruffus automates the plumbing in your pipeline. You supply the python functions which perform the data transformation, and tell Ruffus how these pipeline stages or task functions are connected together.
The best way to design a pipeline is to:
- write down the file names of the data as it flows across your pipeline
- write down the names of functions which transforms the data at each stage of the pipeline.
Task functions as recipes¶
For example, one can have
compile()task which will compile any number of source code files, or
count_lines()task which will count the number of lines in any file or
align_dna()task which will align the DNA of many chromosomes.
@transform is a 1 to 1 operation¶
@transformis a 1:1 operation because for each input, it generates one output.
This is obvious when you count the number of jobs at each step. In our example pipeline, there are always three jobs moving through in step at each stage (task).
Each Input or Output is not limited, however, to a single filename. Each job can accept, for example, a pair of files as its Input, or generate more than one file or a dictionary or numbers as its Output.
When each job outputs a pair of files, this does not generate two jobs downstream. It just means that the successive task in the pipeline will receive a list or tuple of files as its input parameter.
The different sort of decorators in Ruffus determine the topology of your pipeline, i.e. how the jobs from different tasks are linked together seamlessly.
@transform always generates one Output for one Input.
In the later parts of the tutorial, we will encounter more decorators which can split up, or join together or group inputs.
In other words, using other decorators Input and Output can have many to one, many to many etc. relationships.
A pair of files as the Input¶
Let us rewrite our previous example so that the Input of the first task are matching pairs of DNA sequence files, processed in tandem.from ruffus import * starting_files = [("a.1.fastq", "a.2.fastq"), ("a.1.fastq", "a.2.fastq"), ("a.1.fastq", "a.2.fastq")] # # STAGE 1 fasta->sam # @transform(starting_files, # Input = starting files suffix(".1.fastq"), # suffix = .1.fastq ".sam") # Output suffix = .sam def map_dna_sequence(input_files, output_file): # remember there are two input files now ii1 = open(input_files) ii2 = open(input_files) oo = open(output_file, "w")
The only changes are to the first task:pipeline_run() Job = [[a.1.fastq, a.2.fastq] -> a.sam] completed Job = [[a.1.fastq, a.2.fastq] -> a.sam] completed Job = [[a.1.fastq, a.2.fastq] -> a.sam] completed Completed Task = map_dna_sequence
suffix always matches only the first file name in each Input.
Input and Output parameters¶
Ruffus chains together different tasks by taking the Output from one job and plugging it automatically as the Input of the next.
The first two parameters of each job are the Input and Output parameters respectively.
In the above example, we have:>>> pipeline_run() Job = [a.bam -> a.statistics, use_linear_model] completed Job = [b.bam -> b.statistics, use_linear_model] completed Job = [c.bam -> c.statistics, use_linear_model] completed Completed Task = summarise_bam_file
¶ Inputs Outputs Extra
Extra parameters are for the consumption of
summarise_bam_file()and will not passed to the next task.
Ruffus was designed for pipelines which save intermediate data in files. This is not compulsory but saving your data in files at each step provides many advantages:
- Ruffus can use file system time stamps to check if your pipeline is up to date
- Your data is persistent across runs
- This is a good way to pass large amounts of data across processes and computational nodes
Nevertheless, all the task parameters can include anything which suits your workflow, from lists of files, to numbers, sets or tuples. Ruffus imposes few constraints on what you would like to send to each stage of your pipeline.
Ruffus does, however, assume that if the Input and Output parameter contains strings, these will be interpreted as file names required by and produced by that job. As we shall see, the modification times of these file names indicate whether that part of the pipeline is up to date or needs to be rerun.