camber.nextflow

The camber.nextflow handles workloads that uses the Nextflow framework and is deployed on Kubernetes as execution platform. It is a subclass of the CamberEngine that encapsulates the Python interface for Nextflow CLI.

Basic Usage

from camber import nextflow

nf_hello_job = nextflow.create_job(
    pipeline="nextflow-io/hello",
    engine_size="XSMALL",
    num_engines=4
)
# More nextflow workflow below

Methods

create_job

Creates a job to run a given pipeline using the NextflowEngine. Please note that if the pipeline requires the output directory, it should be placed in /camber_outputs. So when a job finishes, the output data will appear in the private stash jobs/<job_id>/outputs.

Args

pipeline: str
Supply a remote nextflow pipeline or a local one in the private stash. For remote pipelines, you can discover them more in nf-core pipelines.
engine_size: str
The size of the engine. One of XMICRO, MICRO, XXSMALL, XSMALL, SMALL, MEDIUM, or LARGE.
Default is XSMALL.
num_engines: Optional[int]
The number of engines will handle multiple tasks in a parallel manner.
Default is 1
params: Optional[Dict]
Define parameters for the pipeline. In case the pipeline requires passing input files to the params, the path of input files should be relative to the current working directory.
Example: {"--input": "./samplesheet.csv"}

Returns

CamberJob
An instance of the CamberJob class representing the created job.

Examples

Example 1: Basic “Hello World”

This example demonstrates running a simple Nextflow pipeline that writes a greeting to a file.

1. Create your Nextflow pipeline:

Store this file in your Camber Stash (e.g., at ./my_nextflow_pipelines/hello.nf).

#!/usr/bin/env nextflow
nextflow.enable.dsl=2

params.greeting = "Hello from Camber Nextflow!"
// Camber will provide params.outdir, which defaults to /camber_outputs in the job
params.outdir = "./results" // This default may be overridden by Camber.

process GREET {
    // Publish to a subdirectory within the Camber-managed output directory
    publishDir "${params.outdir}/greeting_output", mode: 'copy'

    output:
    path "greeting.txt"

    script:
    """
    echo "${params.greeting}" > greeting.txt
    """
}

workflow {
    GREET()
}

2. Python script to run the pipeline:

import camber.nextflow

# Path to your Nextflow script in Camber Stash
pipeline_path = "./my_nextflow_pipelines/hello.nf" # Adjust if needed

# Custom parameters for the Nextflow pipeline
job_params = {
    "--greeting": "'My Custom Hello via Camber!'"
}

hello_job = camber.nextflow.create_job(
    pipeline=pipeline_path,
    engine_size="XSMALL",
    params=job_params
)

print(f"Submitted Nextflow job ID: {hello_job.job_id}")

# View logs directly
hello_job.read_logs(tail_lines=50)

Example 2: End-to-End Nextflow: nf-core/sarek with Camber Stash

This tutorial demonstrates how to run the nf-core/sarek Nextflow pipeline on Camber. The pipeline will use a samplesheet uploaded to your Camber Stash to process genomic data. Output files will also be written back to a Camber-managed output directory, typically accessible via Stash.

1. Prepare and Upload Your Samplesheet to Camber Stash

The nf-core/sarek pipeline requires a samplesheet to define the input samples and their data.

1.1 Download the Samplesheet (samplesheet.csv):

The samplesheet for nf-core/sarek typically has columns like patient, sample, lane, fastq_1, fastq_2.

You can download an example template or a pre-filled example of samplesheet.csv here: Download samplesheet.csv

1.2 Upload to Stash:

  • Open your Camber Stash in your web browser.
  • Click the upload button.
    Upload Screen
  • In the upload screen that appears, select the samplesheet.csv file from your local machine.
  • Start the upload and wait for confirmation that the upload was successful.
    Camber Stash Upload

Your samplesheet.csv should now be in your Stash.

2. Create and Run the Nextflow Job via Jupyter Notebook

Now, go to your Camber Hub and open or create a new Jupyter Notebook. You’ll use the Camber SDK to define and launch the nf-core/sarek pipeline.

import camber.nextflow

nf_sarek_job = camber.nextflow.create_job(
    pipeline="nf-core/sarek",
    engine_size="MICRO",
    num_engines=4,
    params={
        "--input": "./samplesheet.csv", # Adjust if needed
        "--outdir": "/camber_outputs",
        "-r": "3.5.1",
        "--tools": "freebayes",
    },
)

print(f"Submitted Nextflow job ID: {nf_sarek_job.job_id}")

# View logs directly
nf_sarek_job.read_logs(tail_lines=50)