.. include:: global.inc .. _pipeline_functions: See :ref:`Decorators ` for more decorators .. |pipeline_run| replace:: `pipeline_run` .. _pipeline_run: `pipeline_functions.pipeline_run`_ .. |pipeline_printout| replace:: `pipeline_printout` .. _pipeline_printout: `pipeline_functions.pipeline_printout`_ .. |pipeline_printout_graph| replace:: `pipeline_printout_graph` .. _pipeline_printout_graph: `pipeline_functions.pipeline_printout_graph`_ .. |pipeline_get_task_names| replace:: `pipeline_get_task_names` .. _pipeline_get_task_names: `pipeline_functions.pipeline_get_task_names`_ .. |pr_target_tasks| replace:: `target_tasks` .. _pr_target_tasks: `pipeline_functions.pipeline_run.target_tasks`_ .. |pr_forcedtorun_tasks| replace:: `forcedtorun_tasks` .. _pr_forcedtorun_tasks: `pipeline_functions.pipeline_run.forcedtorun_tasks`_ .. |pr_multiprocess| replace:: `multiprocess` .. _pr_multiprocess: `pipeline_functions.pipeline_run.multiprocess`_ .. |pr_logger| replace:: `logger` .. _pr_logger: `pipeline_functions.pipeline_run.logger`_ .. |pr_gnu_make| replace:: `gnu_make_maximal_rebuild_mode` .. _pr_gnu_make: `pipeline_functions.pipeline_run.gnu_make`_ .. |pr_verbose| replace:: `verbose` .. _pr_verbose: `pipeline_functions.pipeline_run.verbose`_ .. |pr_runtime_data| replace:: `runtime_data` .. _pr_runtime_data: `pipeline_functions.pipeline_run.runtime_data`_ .. |pr_one_second_per_job| replace:: `one_second_per_job` .. _pr_one_second_per_job: `pipeline_functions.pipeline_run.one_second_per_job`_ .. |pr_touch_files_only| replace:: `touch_files_only` .. _pr_touch_files_only: `pipeline_functions.pipeline_run.touch_files_only`_ .. |pr_exceptions_terminate_immediately| replace:: `exceptions_terminate_immediately` .. _pr_exceptions_terminate_immediately: `pipeline_functions.pipeline_run.exceptions_terminate_immediately`_ .. |pr_log_exceptions| replace:: `log_exceptions` .. _pr_log_exceptions: `pipeline_functions.pipeline_run.log_exceptions`_ .. |pr_multithread| replace:: `multithread` .. _pr_multithread: `pipeline_functions.pipeline_run.multithread`_ .. |pr_checksum_level| replace:: `checksum_level` .. _pr_checksum_level: `pipeline_functions.pipeline_run.checksum_level`_ .. |pr_history_file| replace:: `history_file` .. _pr_history_file: `pipeline_functions.pipeline_run.history_file`_ .. |pr_verbose_abbreviated_path| replace:: `verbose_abbreviated_path` .. _pr_verbose_abbreviated_path: `pipeline_functions.pipeline_run.verbose_abbreviated_path`_ .. |pp_output_stream| replace:: `output_stream` .. _pp_output_stream: `pipeline_functions.pipeline_printout.output_stream`_ .. |pp_target_tasks| replace:: `target_tasks` .. _pp_target_tasks: `pipeline_functions.pipeline_printout.target_tasks`_ .. |pp_forcedtorun_tasks| replace:: `forcedtorun_tasks` .. _pp_forcedtorun_tasks: `pipeline_functions.pipeline_printout.forcedtorun_tasks`_ .. |pp_verbose| replace:: `verbose` .. _pp_verbose: `pipeline_functions.pipeline_printout.verbose`_ .. |pp_indent| replace:: `indent` .. _pp_indent: `pipeline_functions.pipeline_printout.indent`_ .. |pp_wrap_width| replace:: `wrap_width` .. _pp_wrap_width: `pipeline_functions.pipeline_printout.wrap_width`_ .. |pp_gnu_make| replace:: `gnu_make_maximal_rebuild_mode` .. _pp_gnu_make: `pipeline_functions.pipeline_printout.gnu_make`_ .. |pp_runtime_data| replace:: `runtime_data` .. _pp_runtime_data: `pipeline_functions.pipeline_printout.runtime_data`_ .. |pp_checksum_level| replace:: `checksum_level` .. _pp_checksum_level: `pipeline_functions.pipeline_printout.checksum_level`_ .. |pp_history_file| replace:: `history_file` .. _pp_history_file: `pipeline_functions.pipeline_printout.history_file`_ .. |pp_verbose_abbreviated_path| replace:: `verbose_abbreviated_path` .. _pp_verbose_abbreviated_path: `pipeline_functions.pipeline_printout.verbose_abbreviated_path`_ .. |ppg_stream| replace:: `stream` .. _ppg_stream: `pipeline_functions.pipeline_printout_graph.stream`_ .. |ppg_output_format| replace:: `output_format` .. _ppg_output_format: `pipeline_functions.pipeline_printout_graph.output_format`_ .. |ppg_target_tasks| replace:: `target_tasks` .. _ppg_target_tasks: `pipeline_functions.pipeline_printout_graph.target_tasks`_ .. |ppg_forcedtorun_tasks| replace:: `forcedtorun_tasks` .. _ppg_forcedtorun_tasks: `pipeline_functions.pipeline_printout_graph.forcedtorun_tasks`_ .. |ppg_draw_vertically| replace:: `draw_vertically` .. _ppg_draw_vertically: `pipeline_functions.pipeline_printout_graph.draw_vertically`_ .. |ppg_ignore_upstream_of_target| replace:: `ignore_upstream_of_target` .. _ppg_ignore_upstream_of_target: `pipeline_functions.pipeline_printout_graph.ignore_upstream_of_target`_ .. |ppg_skip_uptodate_tasks| replace:: `skip_uptodate_tasks` .. _ppg_skip_uptodate_tasks: `pipeline_functions.pipeline_printout_graph.skip_uptodate_tasks`_ .. |ppg_gnu_make| replace:: `gnu_make_maximal_rebuild_mode` .. _ppg_gnu_make: `pipeline_functions.pipeline_printout_graph.gnu_make`_ .. |ppg_test_all_task_for_update| replace:: `test_all_task_for_update` .. _ppg_test_all_task_for_update: `pipeline_functions.pipeline_printout_graph.test_all_task_for_update`_ .. |ppg_no_key_legend| replace:: `no_key_legend` .. _ppg_no_key_legend: `pipeline_functions.pipeline_printout_graph.no_key_legend`_ .. |ppg_minimal_key_legend| replace:: `minimal_key_legend` .. _ppg_minimal_key_legend: `pipeline_functions.pipeline_printout_graph.minimal_key_legend`_ .. |ppg_pipeline_name| replace:: `pipeline_name` .. _ppg_pipeline_name: `pipeline_functions.pipeline_printout_graph.pipeline_name`_ .. |ppg_user_colour_scheme| replace:: `user_colour_scheme` .. _ppg_user_colour_scheme: `pipeline_functions.pipeline_printout_graph.user_colour_scheme`_ .. |ppg_size| replace:: `size` .. _ppg_size: `pipeline_functions.pipeline_printout_graph.size`_ .. |ppg_dpi| replace:: `dpi` .. _ppg_dpi: `pipeline_functions.pipeline_printout_graph.dpi`_ .. |ppg_runtime_data| replace:: `runtime_data` .. _ppg_runtime_data: `pipeline_functions.pipeline_printout_graph.runtime_data`_ .. |ppg_checksum_level| replace:: `checksum_level` .. _ppg_checksum_level: `pipeline_functions.pipeline_printout_graph.checksum_level`_ .. |ppg_history_file| replace:: `history_file` .. _ppg_history_file: `pipeline_functions.pipeline_printout_graph.history_file`_ ################################################ Ruffus Functions ################################################ There are four functions for **Ruffus** pipelines: * |pipeline_run|_ executes a pipeline * |pipeline_printout|_ prints a list of tasks and jobs which will be run in a pipeline * |pipeline_printout_graph|_ prints a schematic flowchart of pipeline tasks in various graphical formats * |pipeline_get_task_names|_ returns a list of all task names in the pipeline and a helper function to run jobs via drmaa: * |run_job|_ Switches commands between execution remotely on a computational cluster or locally (via `subprocess `__). .. _pipeline_functions.pipeline_run: .. index:: single: pipeline functions; pipeline_run pair: pipeline_run; Run pipeline ************************************************************************************************************************************************************************************** *pipeline_run* ************************************************************************************************************************************************************************************** **pipeline_run** ( |pr_target_tasks|_ = [], |pr_forcedtorun_tasks|_ = [], |pr_multiprocess|_ = 1, |pr_logger|_ = stderr_logger, |pr_gnu_make|_ = True, |pr_verbose|_ =1, |pr_runtime_data|_ = None, |pr_one_second_per_job|_ = True, |pr_touch_files_only|_ = False, |pr_exceptions_terminate_immediately|_ = None, |pr_log_exceptions|_ = None, |pr_history_file|_ = None, |pr_checksum_level|_ = None, |pr_multithread|_ = 0, |pr_verbose_abbreviated_path|_ = None) **Purpose:** Runs all specified pipelined functions if they or any antecedent tasks are incomplete or out-of-date. **Example**: .. code-block:: python # # Run task2 whatever its state, and also task1 and antecedents if they are incomplete # Do not log pipeline progress messages to stderr # pipeline_run([task1, task2], forcedtorun_tasks = [task2], logger = blackhole_logger) **Parameters:** .. _pipeline_functions.pipeline_run.target_tasks: * *target_tasks* Pipeline functions and any necessary antecedents (specified implicitly or with :ref:`@follows `) which should be invoked with the appropriate parameters if they are incomplete or out-of-date. .. _pipeline_functions.pipeline_run.forcedtorun_tasks: * *forcedtorun_tasks* Optional. These pipeline functions will be invoked regardless of their state. Any antecedents tasks will also be executed if they are out-of-date or incomplete. .. _pipeline_functions.pipeline_run.multiprocess: * *multiprocess* Optional. The number of processes which should be dedicated to running in parallel independent tasks and jobs within each task. If ``multiprocess`` is set to 1, the pipeline will execute in the main process. .. _pipeline_functions.pipeline_run.multithread: * *multithread* Optional. The number of threads which should be dedicated to running in parallel independent tasks and jobs within each task. Should be used only with drmaa. Otherwise the CPython `global interpreter lock (GIL) `__ will slow down your pipeline .. _pipeline_functions.pipeline_run.logger: * *logger* For logging messages indicating the progress of the pipeline in terms of tasks and jobs. Defaults to outputting to sys.stderr. Setting ``logger=blackhole_logger`` will prevent any logging output. .. _pipeline_functions.pipeline_run.gnu_make: * *gnu_make_maximal_rebuild_mode* .. warning :: This is a dangerous option. Use rarely and with caution Optional parameter governing how **Ruffus** determines which part of the pipeline is out of date and needs to be re-run. If set to ``False``, **ruffus** will work back from the ``target_tasks`` and only execute the pipeline after the first up-to-date tasks that it encounters. For example, if there are four tasks: :: # # task1 -> task2 -> task3 -> task4 -> task5 # target_tasks = [task5] If ``task3()`` is up-to-date, then only ``task4()`` and ``task5()`` will be run. This will be the case even if ``task2()`` and ``task1()`` are incomplete. This allows you to remove all intermediate results produced by ``task1 -> task3``. .. _pipeline_functions.pipeline_run.verbose: * *verbose* Optional parameter indicating the verbosity of the messages sent to ``logger``: (Defaults to level 1 if unspecified) * level **0** : *nothing* * level **1** : *Out-of-date Task names* * level **2** : *All Tasks (including any task function docstrings)* * level **3** : *Out-of-date Jobs in Out-of-date Tasks, no explanation* * level **4** : *Out-of-date Jobs in Out-of-date Tasks, with explanations and warnings* * level **5** : *All Jobs in Out-of-date Tasks, (include only list of up-to-date tasks)* * level **6** : *All jobs in All Tasks whether out of date or not* * level **10**: *logs messages useful only for debugging ruffus pipeline code* ``verbose >= 10`` are intended for debugging **Ruffus** by the developers and the details are liable to change from release to release .. _pipeline_functions.pipeline_run.runtime_data: * *runtime_data* Experimental feature for passing data to tasks at run time .. _pipeline_functions.pipeline_run.one_second_per_job: * *one_second_per_job* To work around poor file timepstamp resolution for some file systems. Defaults to True if checksum_level is 0 forcing Tasks to take a minimum of 1 second to complete. If your file system has coarse grained time stamps, you can turn on this delay by setting *one_second_per_job* to ``True`` .. _pipeline_functions.pipeline_run.touch_files_only: * *touch_files_only* Create or update output files only to simulate the running of the pipeline. Does not invoke real task functions to run jobs. This is most useful to force a pipeline to acknowledge that a particular part is now up-to-date. This will not work properly if the identities of some files are not known before hand, and depend on run time. In other words, not recommended if ``@split`` or custom parameter generators are being used. .. _pipeline_functions.pipeline_run.exceptions_terminate_immediately: * *exceptions_terminate_immediately* Exceptions cause immediate termination of the pipeline. .. _pipeline_functions.pipeline_run.log_exceptions: * *log_exceptions* Print exceptions to the logger as soon as they occur. .. _pipeline_functions.pipeline_run.history_file: * *history_file* The database file which stores checksums and file timestamps for input/output files. Defaults to ``.ruffus_history.sqlite`` if unspecified .. _pipeline_functions.pipeline_run.checksum_level: * *checksum_level* Several options for checking up-to-dateness are available: Default is level 1. * level 0 : Use only file timestamps * level 1 : above, plus timestamp of successful job completion * level 2 : above, plus a checksum of the pipeline function body * level 3 : above, plus a checksum of the pipeline function default arguments and the additional arguments passed in by task decorators .. _pipeline_functions.pipeline_run.verbose_abbreviated_path: * *verbose_abbreviated_path* Whether input and output paths are abbreviated. Defaults to 2 if unspecified * level 0: The full (expanded, abspath) input or output path * level > 1: The number of subdirectories to include. Abbreviated paths are prefixed with ``[,,,]/`` * level < 0: Input / Output parameters are truncated to ``MMM`` letters where ``verbose_abbreviated_path ==-MMM``. Subdirectories are first removed to see if this allows the paths to fit in the specified limit. Otherwise abbreviated paths are prefixed by ```` .. _pipeline_functions.pipeline_printout: .. index:: single: pipeline functions; pipeline_run pair: pipeline_printout; Printout simulated run of the pipeline ********************************************************************************************************************************************************************************************************** *pipeline_printout* ********************************************************************************************************************************************************************************************************** **pipeline_printout** (|pp_output_stream|_ = sys.stdout, |pp_target_tasks|_ = [], |pp_forcedtorun_tasks|_ = [], |pp_verbose|_ = 1, |pp_indent|_ = 4, |pp_gnu_make|_ = True, |pp_wrap_width|_ = 100, |pp_runtime_data|_ = None, |pp_checksum_level|_ = None, |pp_history_file|_ = None, |pr_verbose_abbreviated_path|_ = None) **Purpose:** Prints out all the pipelined functions which will be invoked given specified ``target_tasks`` without actually running the pipeline. Because this is a simulation, some of the job parameters may be incorrect. For example, the results of a :ref:`@split` operation is not predetermined and will only be known after the pipelined function splits up the original data. Parameters of all downstream pipelined functions will be changed depending on this initial operation. **Example**: :: # # Simulate running task2 whatever its state, and also task1 and antecedents # if they are incomplete # Print out results to STDOUT # pipeline_printout(sys.stdout, [task1, task2], forcedtorun_tasks = [task2], verbose = 1) **Parameters:** .. _pipeline_functions.pipeline_printout.output_stream: * *output_stream* Where to printout the results of simulating the running of the pipeline. .. _pipeline_functions.pipeline_printout.target_tasks: * *target_tasks* As in :ref:`pipeline_run`: Pipeline functions and any necessary antecedents (specified implicitly or with :ref:`@follows `) which should be invoked with the appropriate parameters if they are incomplete or out-of-date. .. _pipeline_functions.pipeline_printout.forcedtorun_tasks: * *forcedtorun_tasks* As in :ref:`pipeline_run`:These pipeline functions will be invoked regardless of their state. Any antecedents tasks will also be executed if they are out-of-date or incomplete. .. _pipeline_functions.pipeline_printout.verbose: * *verbose* Optional parameter indicating the verbosity of the messages sent to ``logger``: (Defaults to level 4 if unspecified) * level **0** : *nothing* * level **1** : *Out-of-date Task names* * level **2** : *All Tasks (including any task function docstrings)* * level **3** : *Out-of-date Jobs in Out-of-date Tasks, no explanation* * level **4** : *Out-of-date Jobs in Out-of-date Tasks, with explanations and warnings* * level **5** : *All Jobs in Out-of-date Tasks, (include only list of up-to-date tasks)* * level **6** : *All jobs in All Tasks whether out of date or not* * level **10**: *logs messages useful only for debugging ruffus pipeline code* ``verbose >= 10`` are intended for debugging **Ruffus** by the developers and the details are liable to change from release to release .. _pipeline_functions.pipeline_printout.indent: * *indent* Optional parameter governing the indentation when printing out the component job parameters of each task function. .. _pipeline_functions.pipeline_printout.gnu_make: * *gnu_make_maximal_rebuild_mode* .. warning :: This is a dangerous option. Use rarely and with caution See explanation in :ref:`pipeline_run `. .. _pipeline_functions.pipeline_printout.wrap_width: * *wrap_width* Optional parameter governing the length of each line before it starts wrapping around. .. _pipeline_functions.pipeline_printout.runtime_data: * *runtime_data* Experimental feature for passing data to tasks at run time .. _pipeline_functions.pipeline_printout.history_file: * *history_file* The database file which stores checksums and file timestamps for input/output files. Defaults to ``.ruffus_history.sqlite`` if unspecified .. _pipeline_functions.pipeline_printout.checksum_level: * *checksum_level* Several options for checking up-to-dateness are available: Default is level 1. * level 0 : Use only file timestamps * level 1 : above, plus timestamp of successful job completion * level 2 : above, plus a checksum of the pipeline function body * level 3 : above, plus a checksum of the pipeline function default arguments and the additional arguments passed in by task decorators .. _pipeline_functions.pipeline_printout.verbose_abbreviated_path: * *verbose_abbreviated_path* Whether input and output paths are abbreviated. Defaults to 2 if unspecified * level 0: The full (expanded, abspath) input or output path * level > 1: The number of subdirectories to include. Abbreviated paths are prefixed with ``[,,,]/`` * level < 0: Input / Output parameters are truncated to ``MMM`` letters where ``verbose_abbreviated_path ==-MMM``. Subdirectories are first removed to see if this allows the paths to fit in the specified limit. Otherwise abbreviated paths are prefixed by ```` .. _pipeline_functions.pipeline_printout_graph: .. index:: single: pipeline functions; pipeline_printout_graph pair: pipeline_printout_graph; print flowchart representation of pipeline functions ************************************************************************************************************************************************************************************************************************************************************************************ *pipeline_printout_graph* ************************************************************************************************************************************************************************************************************************************************************************************ **pipeline_printout_graph** (|ppg_stream|_, |ppg_output_format|_ = None, |ppg_target_tasks|_ = [], |ppg_forcedtorun_tasks|_ = [], |ppg_ignore_upstream_of_target|_ = False, |ppg_skip_uptodate_tasks|_ = False, |ppg_gnu_make|_ = True, |ppg_test_all_task_for_update|_ = True, |ppg_no_key_legend|_ = False, |ppg_minimal_key_legend|_ = True, |ppg_user_colour_scheme|_ = None, |ppg_pipeline_name|_ = "Pipeline", |ppg_size|_ = (11,8), |ppg_dpi|_ = 120, |ppg_runtime_data|_ = None, |ppg_checksum_level|_ = None, |ppg_history_file|_ = None) **Purpose:** Prints out flowchart of all the pipelined functions which will be invoked given specified ``target_tasks`` without actually running the pipeline. See :ref:`Flowchart colours ` **Example**: :: pipeline_printout_graph("flowchart.jpg", "jpg", [task1, task16], forcedtorun_tasks = [task2], no_key_legend = True) **Customising appearance:** The :ref:`user_colour_scheme ` parameter can be used to change flowchart colours. This allows the default :ref:`Colour Schemes ` to be set. An example of customising flowchart appearance is available :ref:`(see code) ` . **Parameters:** .. _pipeline_functions.pipeline_printout_graph.stream: * *stream* The file or file-like object to which the flowchart should be printed. If a string is provided, it is assumed that this is the name of the output file which will be opened automatically. .. _pipeline_functions.pipeline_printout_graph.output_format: * *output_format* If missing, defaults to the extension of the *stream* file name (i.e. ``jpg`` for ``a.jpg``) | If the programme ``dot`` can be found on the executio path, this can be any number of `formats `_ supported by `Graphviz `_, including, for example, ``jpg``, ``png``, ``pdf``, ``svg`` etc. | Otherwise, **ruffus** will only output without error in the `dot `_ format, which is a plain-text graph description language. .. _pipeline_functions.pipeline_printout_graph.target_tasks: * *target_tasks* As in :ref:`pipeline_run`: Pipeline functions and any necessary antecedents (specified implicitly or with :ref:`@follows `) which should be invoked with the appropriate parameters if they are incomplete or out-of-date. .. _pipeline_functions.pipeline_printout_graph.forcedtorun_tasks: * *forcedtorun_tasks* As in :ref:`pipeline_run`:These pipeline functions will be invoked regardless of their state. Any antecedents tasks will also be executed if they are out-of-date or incomplete. .. _pipeline_functions.pipeline_printout_graph.draw_vertically: * *draw_vertically* Draw flowchart in vertical orientation .. _pipeline_functions.pipeline_printout_graph.ignore_upstream_of_target: * *ignore_upstream_of_target* Start drawing flowchart from specified target tasks. Do not draw tasks which are downstream (subsequent) to the targets. .. _pipeline_functions.pipeline_printout_graph.skip_uptodate_tasks: * *ignore_upstream_of_target* Do not draw up-to-date / completed tasks in the flowchart unless they are lie on the execution path of the pipeline. .. _pipeline_functions.pipeline_printout_graph.gnu_make: * *gnu_make_maximal_rebuild_mode* .. warning :: This is a dangerous option. Use rarely and with caution See explanation in :ref:`pipeline_run `. .. _pipeline_functions.pipeline_printout_graph.test_all_task_for_update: * *test_all_task_for_update* | Indicates whether intermediate tasks are out of date or not. Normally **Ruffus** will stop checking dependent tasks for completion or whether they are out-of-date once it has discovered the maximal extent of the pipeline which has to be run. | For displaying the flow of the pipeline, this is hardly very informative. .. _pipeline_functions.pipeline_printout_graph.no_key_legend: * *no_key_legend* Do not include key legend explaining the colour scheme of the flowchart. .. _pipeline_functions.pipeline_printout_graph.minimal_key_legend: * *minimal_key_legend* Do not include unused task types in key legend. .. _pipeline_functions.pipeline_printout_graph.user_colour_scheme: * *user_colour_scheme* Dictionary specifying colour scheme for flowchart See complete :ref:`list of Colour Schemes `. | Colours can be names e.g. ``"black"`` or quoted hex e.g. ``'"#F6F4F4"'`` (note extra quotes) | Default values will be used unless specified .. csv-table:: :header: "key", "Subkey", "" " - ``'colour_scheme_index'`` ", "| index of default colour scheme, | 0-7, defaults to 0 unless specified", "" " - ``'Final target'`` - ``'Explicitly specified task'`` - ``'Task to run'`` - ``'Down stream'`` - ``'Up-to-date Final target'`` - ``'Up-to-date task forced to rerun'`` - ``'Up-to-date task'`` - ``'Vicious cycle'`` "," - ``'fillcolor'`` - ``'fontcolor'`` - ``'color'`` - ``'dashed'`` = ``0/1`` ", "Colours / attributes for each task type" " - ``'Vicious cycle'`` - ``'Task to run'`` - ``'Up-to-date'``", "- ``'linecolor'``", "Colours for arrows between tasks" "- ``'Pipeline'``", "- ``'fontcolor'``","Flowchart title colour" "- ``'Key'``", " - ``'fontcolor'`` - ``'fillcolor'``", "Legend colours" Example: Use colour scheme index = 1 :: pipeline_printout_graph ("flowchart.svg", "svg", [final_task], user_colour_scheme = { "colour_scheme_index" :1, "Pipeline" :{"fontcolor" : '"#FF3232"' }, "Key" :{"fontcolor" : "Red", "fillcolor" : '"#F6F4F4"' }, "Task to run" :{"linecolor" : '"#0044A0"' }, "Final target" :{"fillcolor" : '"#EFA03B"', "fontcolor" : "black", "dashed" : 0 } }) .. _pipeline_functions.pipeline_printout_graph.pipeline_name: * *pipeline_name* Specify title for flowchart .. _pipeline_functions.pipeline_printout_graph.size: * *size* Size in inches for flowchart .. _pipeline_functions.pipeline_printout_graph.dpi: * *dpi* Resolution in dots per inch. Ignored for svg output .. _pipeline_functions.pipeline_printout_graph.runtime_data: * *runtime_data* Experimental feature for passing data to tasks at run time .. _pipeline_functions.pipeline_printout_graph.history_file: * *history_file* The database file which stores checksums and file timestamps for input/output files. Defaults to ``.ruffus_history.sqlite`` if unspecified .. _pipeline_functions.pipeline_printout_graph.checksum_level: * *checksum_level* Several options for checking up-to-dateness are available: Default is level 1. * level 0 : Use only file timestamps * level 1 : above, plus timestamp of successful job completion * level 2 : above, plus a checksum of the pipeline function body * level 3 : above, plus a checksum of the pipeline function default arguments and the additional arguments passed in by task decorators .. _pipeline_functions.pipeline_get_task_names: .. index:: single: pipeline functions; pipeline_get_task_names pair: pipeline_get_task_names; print list of task names without running the pipeline ************************************************************************************************************************************************************************************** *pipeline_get_task_names* ************************************************************************************************************************************************************************************** **pipeline_get_task_names** () **Purpose:** Returns a list of all task names in the pipeline without running the pipeline or checking to see if the tasks are connected correctly **Example**: Given: .. code-block:: python from ruffus import * @originate([]) def create_data(output_files): pass @transform(create_data, suffix(".txt"), ".task1") def task1(input_files, output_files): pass @transform(task1, suffix(".task1"), ".task2") def task2(input_files, output_files): pass Produces a list of three task names: .. code-block:: pycon >>> pipeline_get_task_names () ['create_data', 'task1', 'task2'] .. _drmaa_functions: .. comments: function name .. |run_job| replace:: `drmaa_wrapper.run_job` .. _run_job: `drmaa_wrapper.run_job`_ .. comments: parameters .. |dw_cmd_str| replace:: `cmd_str` .. _dw_cmd_str: `drmaa_wrapper.run_job.cmd_str`_ .. |dw_job_script_directory| replace:: `job_script_directory` .. _dw_job_script_directory: `drmaa_wrapper.run_job.job_script_directory`_ .. |dw_job_environment| replace:: `job_environment` .. _dw_job_environment: `drmaa_wrapper.run_job.job_environment`_ .. |dw_working_directory| replace:: `working_directory` .. _dw_working_directory: `drmaa_wrapper.run_job.working_directory`_ .. |dw_retain_job_scripts| replace:: `retain_job_scripts` .. _dw_retain_job_scripts: `drmaa_wrapper.run_job.retain_job_scripts`_ .. |dw_job_name| replace:: `job_name` .. _dw_job_name: `drmaa_wrapper.run_job.job_name`_ .. |dw_job_other_options| replace:: `job_other_options` .. _dw_job_other_options: `drmaa_wrapper.run_job.job_other_options`_ .. |dw_logger| replace:: `logger` .. _dw_logger: `drmaa_wrapper.run_job.logger`_ .. |dw_drmaa_session| replace:: `drmaa_session` .. _dw_drmaa_session: `drmaa_wrapper.run_job.drmaa_session`_ .. |dw_run_locally| replace:: `run_locally` .. _dw_run_locally: `drmaa_wrapper.run_job.run_locally`_ .. |dw_output_files| replace:: `output_files` .. _dw_output_files: `drmaa_wrapper.run_job.output_files`_ .. |dw_touch_only| replace:: `touch_only` .. _dw_touch_only: `drmaa_wrapper.run_job.touch_only`_ ************************************************************************************************************************************************************************************************************************************************************************************ *run_job* ************************************************************************************************************************************************************************************************************************************************************************************ .. note:: ``drmaa_wrapper`` is not exported automatically by ruffus and must be specified explicitly: .. code-block:: python :emphasize-lines: 1 # imported ruffus.drmaa_wrapper explicitly from ruffus.drmaa_wrapper import run_job, error_drmaa_job .. _drmaa_wrapper.run_job: .. index:: single: drmaa ; run_job pair: run_job; Run drmaa **run_job** (|dw_cmd_str|_, |dw_job_name|_, |dw_job_other_options|_, |dw_job_script_directory|_ = None, |dw_job_environment|_, |dw_working_directory|_, |dw_logger|_, |dw_drmaa_session|_, |dw_retain_job_scripts|_, |dw_run_locally|_, |dw_output_files|_, |dw_touch_only|_) **Purpose:** ``ruffus.drmaa_wrapper.run_job`` dispatches a command with arguments to a cluster or Grid Engine node and waits for the command to complete. It is the semantic equivalent of calling `os.system `__ or `subprocess.check_output `__. **Example**: .. code-block:: python from ruffus.drmaa_wrapper import run_job, error_drmaa_job import drmaa my_drmaa_session = drmaa.Session() my_drmaa_session.initialize() run_job("ls", job_name = "test", job_other_options="-P mott-flint.prja -q short.qa", job_script_directory = "test_dir", job_environment={ 'BASH_ENV' : '~/.bashrc' }, retain_job_scripts = True, drmaa_session=my_drmaa_session) run_job("ls", job_name = "test", job_other_options="-P mott-flint.prja -q short.qa", job_script_directory = "test_dir", job_environment={ 'BASH_ENV' : '~/.bashrc' }, retain_job_scripts = True, drmaa_session=my_drmaa_session, working_directory = "/gpfs1/well/mott-flint/lg/src/oss/ruffus/doc") # # catch exceptions # try: stdout_res, stderr_res = run_job(cmd, job_name = job_name, logger = logger, drmaa_session = drmaa_session, run_locally = options.local_run, job_other_options = get_queue_name()) # relay all the stdout, stderr, drmaa output to diagnose failures except error_drmaa_job as err: raise Exception("\n".join(map(str, ["Failed to run:", cmd, err, stdout_res, stderr_res]))) my_drmaa_session.exit() **Parameters:** .. _drmaa_wrapper.run_job.cmd_str: * *cmd_str* The command which will be run remotely including all parameters .. _drmaa_wrapper.run_job.job_name: * *job_name* A descriptive name for the command. This will be displayed by `SGE qstat `__, for example. Defaults to "ruffus_job" .. _drmaa_wrapper.run_job.job_other_options: * *job_other_options* Other drmaa parameters can be passed verbatim as a string. Examples for SGE include project name (``-P project_name``), parallel environment (``-pe parallel_environ``), account (``-A account_string``), resource (``-l resource=expression``), queue name (``-q a_queue_name``), queue priority (``-p 15``). These are parameters which you normally need to include when submitting jobs interactively, for example via `SGE qsub `__ or `SLURM `__ (`srun `__) .. _drmaa_wrapper.run_job.job_script_directory: * *job_script_directory* The directory where drmaa temporary script files will be found. Defaults to the current working directory. .. _drmaa_wrapper.run_job.job_environment: * *job_environment* A dictionary of key / values with environment variables. E.g. ``"{'BASH_ENV': '~/.bashrc'}"`` .. _drmaa_wrapper.run_job.working_directory: * *working_directory* * Sets the working directory. * Should be a fully qualified path. * Defaults to the current working directory. .. _drmaa_wrapper.run_job.retain_job_scripts: * *retain_job_scripts* If set to True, will not delete temporary script files containg drmaa commands. Useful for debugging, running on the command line directly, and can provide a useful record of the commands. .. _drmaa_wrapper.run_job.logger: * *logger* For logging messages indicating the progress of the pipeline in terms of tasks and jobs. Takes objects with the standard python `logging `__ module interface. .. _drmaa_wrapper.run_job.drmaa_session: * *drmaa_session* A shared drmaa session created and managed separately. In the main part of your **Ruffus** pipeline script somewhere there should be code looking like this: .. code-block:: python # # start shared drmaa session for all jobs / tasks in pipeline # import drmaa drmaa_session = drmaa.Session() drmaa_session.initialize() # # pipeline functions # if __name__ == '__main__': cmdline.run (options, multithread = options.jobs) drmaa_session.exit() .. _drmaa_wrapper.run_job.run_locally: * *run_locally* If set to True, will run commands locally using the standard python `subprocess `__ module rather than dispatching remotely. This allows scripts to be debugged easily .. _drmaa_wrapper.run_job.touch_only: * *touch_only* If set to True, will create or update :ref:`Output files ` only to simulate the running of the pipeline. Does not dispatch commands remotely or locally. This is most useful to force a pipeline to acknowledge that a particular part is now up-to-date. See also: :ref:`pipeline_run(touch_files_only=True) ` .. _drmaa_wrapper.run_job.output_files: * *output_files* A list of output file names which will be created or updated if :ref:`touch_only ` ``=True``