Appendix 5: @files: Deprecated syntax¶
This is deprecated syntax
which is no longer supported and
should NOT be used in new code.
The python functions which do the actual work of each stage or task of a Ruffus pipeline are written by you.The role of Ruffus is to make sure these functions are called in the right order, with the right parameters, running in parallel using multiprocessing if desired.
Running this code:from ruffus import * @files('a.1', ['a.2', 'b.2'], 'A file') def single_job_io_task(infile, outfiles, text): for o in outfiles: open(o, "w") # prepare input file open('a.1', "w") pipeline_run()
- Is equivalent to calling:
- single_job_io_task('a.1', ['a.2', 'b.2'], 'A file')
- And produces:
- >>> pipeline_run() Job = [a.1 -> [a.2, b.2], A file] completed Completed Task = single_job_io_task
Ruffus will automatically check if your task is up to date. The second time pipeline_run() is called, nothing will happen. But if you update
a.1, the task will rerun:>>> open('a.1', "w") >>> pipeline_run() Job = [a.1 -> [a.2, b.2], A file] completed Completed Task = single_job_io_task
See chapter 2 for a more in-depth discussion of how Ruffus decides which parts of the pipeline are complete and up-to-date.
Running the same code on different parameters in parallel¶
Your pipeline may require the same function to be called multiple times on independent parameters. In which case, you can supply all the parameters to @files, each will be sent to separate jobs that may run in parallel if necessary. Ruffus will check if each separate job is up-to-date using the inputs and outputs (first two) parameters (See the Up-to-date jobs are not re-run unnecessarily ).
For example, if a sequence (e.g. a list or tuple) of 5 parameters are passed to @files, that indicates there will also be 5 separate jobs:from ruffus import * parameters = [ [ 'job1.file' ], # 1st job [ 'job2.file', 4 ], # 2st job [ 'job3.file', [3, 2] ], # 3st job [ 67, [13, 'job4.file'] ], # 4st job [ 'job5.file' ], # 5st job ] @files(parameters) def task_file(*params): ""Ruffus creates as many jobs as there are elements in
parameters.In turn, each of these elements consist of series of parameters which will be passed to each separate job.
Thus the above code is equivalent to calling:task_file('job1.file') task_file('job2.file', 4) task_file('job3.file', [3, 2]) task_file(67, [13, 'job4.file']) task_file('job5.file')
task_file()does with these parameters is up to you!
The only constraint on the parameters is that Ruffus will treat any first parameter of each job as the inputs and any second as the output. Any strings in the inputs or output parameters (including those nested in sequences) will be treated as file names.
Thus, to pick the parameters out of one of the above jobs:task_file(67, [13, 'job4.file'])inputs ==
[13, 'job4.file']The solitary output filename is
Checking if jobs are up to date¶
Usually we do not want to run all the stages in a pipeline but only where the input data has changed or is no longer up to date.One easy way to do this is to check the modification times for files produced at each stage of the pipeline.Let us first create our starting files
b.1We can then run the following pipeline function to create
b.1# create starting files open("a.1", "w") open("b.1", "w") from ruffus import * parameters = [ [ 'a.1', 'a.2', 'A file'], # 1st job [ 'b.1', 'b.2', 'B file'], # 2nd job ] @files(parameters) def parallel_io_task(infile, outfile, text): # copy infile contents to outfile infile_text = open(infile).read() f = open(outfile, "w").write(infile_text + "\n" + text) pipeline_run()
- This produces the following output:
- >>> pipeline_run() Job = [a.1 -> a.2, A file] completed Job = [b.1 -> b.2, B file] completed Completed Task = parallel_io_taskIf you called pipeline_run() again, nothing would happen because the files are up to date:
a.2is more recent than
b.2is more recent than
- However, if you subsequently modified
- open("a.1", "w") pipeline_run(verbose = 1)
you would see the following:>>> pipeline_run([parallel_io_task]) Task = parallel_io_task Job = ["a.1" -> "a.2", "A file"] completed Job = ["b.1" -> "b.2", "B file"] unnecessary: already up to date Completed Task = parallel_io_task
The 2nd job is up to date and will be skipped.