#!/usr/bin/env python3
"""Module containing the InterBasePairCorrelation class and the command line interface."""
from itertools import product
from typing import Optional
import matplotlib as mpl
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from biobb_common.generic.biobb_object import BiobbObject
from biobb_common.tools.file_utils import launchlogger
from biobb_dna.utils import constants
from biobb_dna.utils.common import _from_string_to_list
from biobb_dna.utils.loader import read_series
[docs]
class InterBasePairCorrelation(BiobbObject):
"""
| biobb_dna InterBasePairCorrelation
| Calculate correlation between all base pairs of a single sequence and for a single helical parameter.
| Calculate correlation between neighboring base pairs and pairs of helical parameters.
Args:
input_filename_shift (str): Path to .ser file with data for helical parameter 'shift'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_shift.ser>`_. Accepted formats: ser (edam:format_2330).
input_filename_slide (str): Path to .ser file with data for helical parameter 'slide'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_slide.ser>`_. Accepted formats: ser (edam:format_2330).
input_filename_rise (str): Path to .ser file with data for helical parameter 'rise'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_rise.ser>`_. Accepted formats: ser (edam:format_2330).
input_filename_tilt (str): Path to .ser file with data for helical parameter 'tilt'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_tilt.ser>`_. Accepted formats: ser (edam:format_2330).
input_filename_roll (str): Path to .ser file with data for helical parameter 'roll'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_roll.ser>`_. Accepted formats: ser (edam:format_2330).
input_filename_twist (str): Path to .ser file with data for helical parameter 'twist'. File type: input. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/data/correlation/canal_output_twist.ser>`_. Accepted formats: ser (edam:format_2330).
output_csv_path (str): Path to directory where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.csv>`_. Accepted formats: csv (edam:format_3752).
output_jpg_path (str): Path to .jpg file where output is saved. File type: output. `Sample file <https://raw.githubusercontent.com/bioexcel/biobb_dna/master/biobb_dna/test/reference/correlation/inter_bpcorr_ref.jpg>`_. Accepted formats: jpg (edam:format_3579).
properties (dict):
* **sequence** (*str*) - (None) Nucleic acid sequence for the input .ser file. Length of sequence is expected to be the same as the total number of columns in the .ser file, minus the index column (even if later on a subset of columns is selected with the *seqpos* option).
* **seqpos** (*list*) - (None) list of sequence positions (columns indices starting by 0) to analyze. If not specified it will analyse the complete sequence.
* **remove_tmp** (*bool*) - (True) [WF property] Remove temporal files.
* **restart** (*bool*) - (False) [WF property] Do not execute if output files exist.
* **sandbox_path** (*str*) - ("./") [WF property] Parent path to the sandbox directory.
Examples:
This is a use example of how to use the building block from Python::
from biobb_dna.interbp_correlations.interbpcorr import interbpcorr
interbpcorr(
input_filename_shift='path/to/input/shift.ser',
input_filename_slide='path/to/input/slide.ser',
input_filename_rise='path/to/input/slide.ser',
input_filename_tilt='path/to/input/tilt.ser',
input_filename_roll='path/to/input/roll.ser',
input_filename_twist='path/to/input/twist.ser',
output_csv_path='path/to/output/file.csv',
output_jpg_path='path/to/output/plot.jpg',
properties=prop)
Info:
* wrapped_software:
* name: In house
* license: Apache-2.0
* ontology:
* name: EDAM
* schema: http://edamontology.org/EDAM.owl
"""
def __init__(
self,
input_filename_shift,
input_filename_slide,
input_filename_rise,
input_filename_tilt,
input_filename_roll,
input_filename_twist,
output_csv_path,
output_jpg_path,
properties=None,
**kwargs,
) -> None:
properties = properties or {}
# Call parent class constructor
super().__init__(properties)
self.locals_var_dict = locals().copy()
# Input/Output files
self.io_dict = {
"in": {
"input_filename_shift": input_filename_shift,
"input_filename_slide": input_filename_slide,
"input_filename_rise": input_filename_rise,
"input_filename_tilt": input_filename_tilt,
"input_filename_roll": input_filename_roll,
"input_filename_twist": input_filename_twist,
},
"out": {
"output_csv_path": output_csv_path,
"output_jpg_path": output_jpg_path,
},
}
self.properties = properties
self.sequence = properties.get("sequence", None)
self.seqpos = [
int(elem) for elem in _from_string_to_list(properties.get("seqpos", None))
]
# Check the properties
self.check_properties(properties)
self.check_arguments()
[docs]
@launchlogger
def launch(self) -> int:
"""Execute the :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` object."""
# Setup Biobb
if self.check_restart():
return 0
self.stage_files()
# check sequence
if self.sequence is None or len(self.sequence) < 2:
raise ValueError("sequence is null or too short!")
# check seqpos
if self.seqpos:
if not (isinstance(self.seqpos, list) and len(self.seqpos) > 1):
raise ValueError("seqpos must be a list of at least two integers")
else:
self.seqpos = None # type: ignore
# read input
shift = read_series(
self.stage_io_dict["in"]["input_filename_shift"], usecols=self.seqpos
)
slide = read_series(
self.stage_io_dict["in"]["input_filename_slide"], usecols=self.seqpos
)
rise = read_series(
self.stage_io_dict["in"]["input_filename_rise"], usecols=self.seqpos
)
tilt = read_series(
self.stage_io_dict["in"]["input_filename_tilt"], usecols=self.seqpos
)
roll = read_series(
self.stage_io_dict["in"]["input_filename_roll"], usecols=self.seqpos
)
twist = read_series(
self.stage_io_dict["in"]["input_filename_twist"], usecols=self.seqpos
)
if not self.seqpos:
# drop first and last columns
shift = shift[shift.columns[1:-2]]
slide = slide[slide.columns[1:-2]]
rise = rise[rise.columns[1:-2]]
tilt = tilt[tilt.columns[1:-2]]
roll = roll[roll.columns[1:-2]]
twist = twist[twist.columns[1:-2]]
labels = [
f"{i+1}_{self.sequence[i:i+2]}"
for i in range(1, len(shift.columns) + 1)
]
corr_index = [
f"{self.sequence[i:i+3]}" for i in range(1, len(shift.columns) + 1)
]
else:
labels = [f"{i+1}_{self.sequence[i:i+2]}" for i in self.seqpos]
corr_index = [f"{self.sequence[i:i+3]}" for i in self.seqpos]
# rename duplicated subunits
shift.columns = labels
slide.columns = labels
rise.columns = labels
tilt.columns = labels
roll.columns = labels
twist.columns = labels
# set names to each dataset
shift.name = "shift"
slide.name = "slide"
rise.name = "rise"
tilt.name = "tilt"
roll.name = "roll"
twist.name = "twist"
# get correlation between neighboring basepairs among all helical parameters
results = {}
datasets = [shift, slide, rise, tilt, roll, twist]
for ser1, ser2 in product(datasets, datasets):
ser2_shifted = ser2.shift(axis=1)
ser2_shifted[labels[0]] = ser2[labels[-1]]
if ser1.name in constants.hp_angular and ser2.name in constants.hp_angular:
method = self.circular
elif (
ser1.name in constants.hp_angular and ser2.name not in constants.hp_angular
) or (
ser2.name in constants.hp_angular and ser1.name not in constants.hp_angular
):
method = self.circlineal
else:
method = "pearson" # type: ignore
corr_data = ser1.corrwith(ser2_shifted, method=method)
corr_data.index = corr_index
results[f"{ser1.name}/{ser2.name}"] = corr_data
result_df = pd.DataFrame.from_dict(results)
result_df.index = corr_index # type: ignore
# save csv data
result_df.to_csv(self.stage_io_dict["out"]["output_csv_path"])
# create heatmap
cmap = plt.get_cmap("bwr").copy()
bounds = [-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1]
num = cmap.N
norm = mpl.colors.BoundaryNorm(bounds, num) # type: ignore
cmap.set_bad(color="gainsboro")
fig, ax = plt.subplots(1, 1, dpi=300, figsize=(7.5, 5), tight_layout=True)
im = ax.imshow(result_df, cmap=cmap, norm=norm, aspect="auto")
plt.colorbar(im, ticks=[-1, -0.8, -0.6, -0.4, -0.2, 0.2, 0.4, 0.6, 0.8, 1])
# axes
xlocs = np.arange(len(result_df.columns))
_ = ax.set_xticks(xlocs)
_ = ax.set_xticklabels(result_df.columns.to_list(), rotation=90)
ylocs = np.arange(len(result_df.index))
_ = ax.set_yticks(ylocs)
_ = ax.set_yticklabels(result_df.index.to_list()) # type: ignore
ax.set_title(
"Correlation for neighboring basepairs " "and pairs of helical parameters"
)
fig.tight_layout()
fig.savefig(self.stage_io_dict["out"]["output_jpg_path"], format="jpg")
plt.close()
# Copy files to host
self.copy_to_host()
# Remove temporary file(s)
self.remove_tmp_files()
self.check_arguments(output_files_created=True, raise_exception=False)
return 0
[docs]
@staticmethod
def circular(x1, x2):
x1 = x1 * np.pi / 180
x2 = x2 * np.pi / 180
diff_1 = np.sin(x1 - x1.mean())
diff_2 = np.sin(x2 - x2.mean())
num = (diff_1 * diff_2).sum()
den = np.sqrt((diff_1**2).sum() * (diff_2**2).sum())
return num / den
[docs]
@staticmethod
def circlineal(x1, x2):
x2 = x2 * np.pi / 180
rc = np.corrcoef(x1, np.cos(x2))[1, 0]
rs = np.corrcoef(x1, np.sin(x2))[1, 0]
rcs = np.corrcoef(np.sin(x2), np.cos(x2))[1, 0]
num = (rc**2) + (rs**2) - 2 * rc * rs * rcs
den = 1 - (rcs**2)
correlation = np.sqrt(num / den)
if np.corrcoef(x1, x2)[1, 0] < 0:
correlation *= -1
return correlation
[docs]
def interbpcorr(
input_filename_shift: str,
input_filename_slide: str,
input_filename_rise: str,
input_filename_tilt: str,
input_filename_roll: str,
input_filename_twist: str,
output_csv_path: str,
output_jpg_path: str,
properties: Optional[dict] = None,
**kwargs,
) -> int:
"""Create :class:`HelParCorrelation <correlations.interbpcorr.InterBasePairCorrelation>` class and
execute the :meth:`launch() <correlations.interbpcorr.InterBasePairCorrelation.launch>` method."""
return InterBasePairCorrelation(**dict(locals())).launch()
interbpcorr.__doc__ = InterBasePairCorrelation.__doc__
main = InterBasePairCorrelation.get_main(interbpcorr, "Load .ser file from Canal output and calculate correlation between base pairs of the corresponding sequence.")
if __name__ == '__main__':
main()