Skip to content

datahandling/data API

dnallm.datahandling.data

DNA Dataset handling and processing utilities.

This module provides comprehensive tools for loading, processing, and managing DNA sequence datasets. It supports various file formats, data augmentation techniques, and statistical analysis.

Classes

DNADataset

DNADataset(ds, tokenizer=None, max_length=512)

A comprehensive wrapper for DNA sequence datasets with advanced processing capabilities.

This class provides methods for loading DNA datasets from various sources (local files, Hugging Face Hub, ModelScope), encoding sequences with tokenizers, data augmentation, statistical analysis, and more.

Attributes:

Name Type Description
dataset

The underlying Hugging Face Dataset or DatasetDict

tokenizer

Tokenizer for sequence encoding

max_length

Maximum sequence length for tokenization

sep str | None

Separator for multi-label data

multi_label_sep str | None

Separator for multi-label sequences

data_type str | None

Type of the dataset (classification, regression, etc.)

stats dict | None

Cached dataset statistics

stats_for_plot DataFrame | None

Cached statistics for plotting

Initialize a DNADataset.

Parameters:

Name Type Description Default
ds Dataset | DatasetDict

A Hugging Face Dataset containing at least 'sequence' and 'label' fields

required
tokenizer PreTrainedTokenizerBase | None

A Hugging Face tokenizer for encoding sequences

None
max_length int

Maximum length for tokenization

512
Source code in dnallm/datahandling/data.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(
    self,
    ds: Dataset | DatasetDict,
    tokenizer: PreTrainedTokenizerBase | None = None,
    max_length: int = 512,
) -> None:
    """Initialize a DNADataset.

    Args:
        ds: A Hugging Face Dataset containing at least 'sequence' and
            'label' fields
        tokenizer: A Hugging Face tokenizer for encoding sequences
        max_length: Maximum length for tokenization
    """
    if ds is None:
        raise TypeError("Dataset cannot be None")

    if max_length <= 0:
        raise ValueError("max_length must be positive")

    self.dataset = ds
    self.tokenizer = tokenizer
    self.max_length = max_length
    self.sep: str | None = None
    self.multi_label_sep: str | None = None
    self.data_type: str | None = None
    self.stats: dict | None = None
    self.stats_for_plot: pd.DataFrame | None = None
    self.__data_type__()  # Determine the data type of the dataset
Functions
__data_type__
__data_type__()

Get the data type of the dataset (classification, regression, etc.).

This method analyzes the labels to determine if the dataset is for: - classification (integer or string labels) - regression (float labels) - multi-label (multiple labels per sample) - multi-regression (multiple float values per sample)

Source code in dnallm/datahandling/data.py
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
def __data_type__(self) -> None:
    """Get the data type of the dataset (classification, regression, etc.).

    This method analyzes the labels to determine if the dataset is for:
    - classification (integer or string labels)
    - regression (float labels)
    - multi-label (multiple labels per sample)
    - multi-regression (multiple float values per sample)
    """
    labels = self._extract_labels()
    if labels is None:
        self.data_type = "unknown"
        return

    if not self._is_valid_labels(labels):
        self.data_type = "unknown"
        return

    first_label = self._get_first_label(labels)
    if first_label is None:
        self.data_type = "unknown"
        return

    self.data_type = self._determine_data_type(first_label)
__getitem__
__getitem__(idx)

Get an item from the dataset.

Parameters:

Name Type Description Default
idx int

Index of the item to retrieve

required

Returns:

Type Description

The item at the specified index

Raises:

Type Description
ValueError

If dataset is a DatasetDict

Source code in dnallm/datahandling/data.py
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
def __getitem__(self, idx: int):
    """Get an item from the dataset.

    Args:
        idx: Index of the item to retrieve

    Returns:
        The item at the specified index

    Raises:
        ValueError: If dataset is a DatasetDict
    """
    if isinstance(self.dataset, DatasetDict):
        raise ValueError(
            "Dataset is a DatasetDict Object, please use "
            "`DNADataset.dataset[datatype].__getitem__(idx)` "
            "instead."
        )
    else:
        return self.dataset[idx]
__len__
__len__()

Return the length of the dataset.

Returns:

Type Description
int

Length of the dataset or total length for DatasetDict

Source code in dnallm/datahandling/data.py
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
def __len__(self) -> int:
    """Return the length of the dataset.

    Returns:
        Length of the dataset or total length for DatasetDict
    """
    if isinstance(self.dataset, DatasetDict):
        # Return total length across all splits
        return sum(len(self.dataset[dt]) for dt in self.dataset)
    else:
        return len(self.dataset)
augment_reverse_complement
augment_reverse_complement(reverse=True, complement=True)

Augment the dataset by adding reverse complement sequences.

This method doubles the dataset size.

Parameters:

Name Type Description Default
reverse bool

Whether to do reverse

True
complement bool

Whether to do complement

True
Source code in dnallm/datahandling/data.py
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
def augment_reverse_complement(
    self, reverse: bool = True, complement: bool = True
) -> None:
    """Augment the dataset by adding reverse complement sequences.

    This method doubles the dataset size.

    Args:
        reverse: Whether to do reverse
        complement: Whether to do complement
    """

    def process(ds, reverse, complement):
        # Create a dataset with an extra field for the reverse complement.
        def add_rc(example):
            example["rc_sequence"] = reverse_complement(
                example["sequence"], reverse=reverse, complement=complement
            )
            return example

        ds_with_rc = ds.map(add_rc, desc="Reverse complementary")
        # Build a new dataset where the reverse complement becomes the
        # 'sequence'
        rc_ds = ds_with_rc.map(
            lambda ex: {
                "sequence": ex["rc_sequence"],
                "labels": ex["labels"],
            },
            desc="Data augment",
        )
        ds = concatenate_datasets([ds, rc_ds])
        ds.remove_columns(["rc_sequence"])
        return ds

    if isinstance(self.dataset, DatasetDict):
        for dt in self.dataset:
            self.dataset[dt] = process(
                self.dataset[dt], reverse, complement
            )
    else:
        self.dataset = process(self.dataset, reverse, complement)
concat_reverse_complement
concat_reverse_complement(
    reverse=True, complement=True, sep=""
)

Augment each sample by concatenating the sequence with its reverse complement.

Parameters:

Name Type Description Default
reverse bool

Whether to do reverse

True
complement bool

Whether to do complement

True
sep str

Separator between the original and reverse complement sequences

''
Source code in dnallm/datahandling/data.py
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
def concat_reverse_complement(
    self, reverse: bool = True, complement: bool = True, sep: str = ""
) -> None:
    """Augment each sample by concatenating the sequence with its reverse
    complement.

    Args:
        reverse: Whether to do reverse
        complement: Whether to do complement
        sep: Separator between the original and reverse complement
            sequences
    """

    def process(ds, reverse, complement, sep):
        def concat_fn(example):
            rc = reverse_complement(
                example["sequence"], reverse=reverse, complement=complement
            )
            example["sequence"] = example["sequence"] + sep + rc
            return example

        ds = ds.map(concat_fn, desc="Data augment")
        return ds

    if isinstance(self.dataset, DatasetDict):
        for dt in self.dataset:
            self.dataset[dt] = process(
                self.dataset[dt], reverse, complement, sep
            )
    else:
        self.dataset = process(self.dataset, reverse, complement, sep)
encode_sequences
encode_sequences(
    padding="max_length",
    return_tensors="pt",
    remove_unused_columns=False,
    uppercase=False,
    lowercase=False,
    task="SequenceClassification",
    tokenizer=None,
)

Encode all sequences using the provided tokenizer.

The dataset is mapped to include tokenized fields along with the label, making it directly usable with Hugging Face Trainer.

Parameters:

Name Type Description Default
padding str

Padding strategy for sequences. Can be 'max_length' or 'longest'. Use 'longest' to pad to the length of the longest sequence in case of memory outage

'max_length'
return_tensors str

Returned tensor types, can be 'pt', 'tf', 'np', or 'jax'

'pt'
remove_unused_columns bool

Whether to remove the original 'sequence' and 'label' columns

False
uppercase bool

Whether to convert sequences to uppercase

False
lowercase bool

Whether to convert sequences to lowercase

False
task str | None

Task type for the tokenizer. If not provided, defaults to 'SequenceClassification'

'SequenceClassification'
tokenizer PreTrainedTokenizerBase | None

Tokenizer to use for encoding. If not provided, uses the instance's tokenizer

None

Raises:

Type Description
ValueError

If tokenizer is not provided

Source code in dnallm/datahandling/data.py
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def encode_sequences(
    self,
    padding: str = "max_length",
    return_tensors: str = "pt",
    remove_unused_columns: bool = False,
    uppercase: bool = False,
    lowercase: bool = False,
    task: str | None = "SequenceClassification",
    tokenizer: PreTrainedTokenizerBase | None = None,
) -> None:
    """Encode all sequences using the provided tokenizer.

    The dataset is mapped to include tokenized fields along with the
    label, making it directly usable with Hugging Face Trainer.

    Args:
        padding: Padding strategy for sequences. Can be 'max_length' or
            'longest'. Use 'longest' to pad to the length of the longest
            sequence in case of memory outage
        return_tensors: Returned tensor types, can be 'pt', 'tf', 'np', or
            'jax'
        remove_unused_columns: Whether to remove the original 'sequence'
            and 'label' columns
        uppercase: Whether to convert sequences to uppercase
        lowercase: Whether to convert sequences to lowercase
        task: Task type for the tokenizer. If not provided, defaults to
            'SequenceClassification'
        tokenizer: Tokenizer to use for encoding. If not provided, uses
            the instance's tokenizer

    Raises:
        ValueError: If tokenizer is not provided
    """
    if not self.tokenizer:
        if tokenizer:
            self.tokenizer = tokenizer
        else:
            raise ValueError("Tokenizer is required")

    # Get tokenizer configuration
    tokenizer_config = self._get_tokenizer_config()

    # Judge the task type and apply appropriate tokenization
    if task is None:
        task = "sequenceclassification"
    task = task.lower()

    if task in ["tokenclassification", "token", "ner"]:
        self._apply_token_classification_tokenization(
            tokenizer_config, padding, uppercase, lowercase
        )
    else:
        self._apply_sequence_classification_tokenization(
            tokenizer_config, padding, uppercase, lowercase
        )

    # Post-process dataset
    self._post_process_encoded_dataset(
        remove_unused_columns, return_tensors
    )
from_huggingface classmethod
from_huggingface(
    dataset_name,
    seq_col="sequence",
    label_col="labels",
    data_dir=None,
    tokenizer=None,
    max_length=512,
)

Load a dataset from the Hugging Face Hub.

Parameters:

Name Type Description Default
dataset_name str

Name of the dataset

required
seq_col str

Column name for the DNA sequence

'sequence'
label_col str

Column name for the label

'labels'
data_dir str | None

Data directory in a dataset

None
tokenizer PreTrainedTokenizerBase | None

Tokenizer for sequence encoding

None
max_length int

Max token length

512

Returns:

Type Description
DNADataset

An instance wrapping a datasets.Dataset

Source code in dnallm/datahandling/data.py
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
@classmethod
def from_huggingface(
    cls,
    dataset_name: str,
    seq_col: str = "sequence",
    label_col: str = "labels",
    data_dir: str | None = None,
    tokenizer: PreTrainedTokenizerBase | None = None,
    max_length: int = 512,
) -> "DNADataset":
    """Load a dataset from the Hugging Face Hub.

    Args:
        dataset_name: Name of the dataset
        seq_col: Column name for the DNA sequence
        label_col: Column name for the label
        data_dir: Data directory in a dataset
        tokenizer: Tokenizer for sequence encoding
        max_length: Max token length

    Returns:
        An instance wrapping a datasets.Dataset
    """
    if data_dir:
        ds = load_dataset(dataset_name, data_dir=data_dir)
    else:
        ds = load_dataset(dataset_name)
    # Rename columns if necessary
    if seq_col != "sequence":
        ds = ds.rename_column(seq_col, "sequence")
    if label_col != "labels":
        ds = ds.rename_column(label_col, "labels")
    return cls(ds, tokenizer=tokenizer, max_length=max_length)
from_modelscope classmethod
from_modelscope(
    dataset_name,
    seq_col="sequence",
    label_col="labels",
    data_dir=None,
    tokenizer=None,
    max_length=512,
)

Load a dataset from the ModelScope.

Parameters:

Name Type Description Default
dataset_name str

Name of the dataset

required
seq_col str

Column name for the DNA sequence

'sequence'
label_col str

Column name for the label

'labels'
data_dir str | None

Data directory in a dataset

None
tokenizer PreTrainedTokenizerBase | None

Tokenizer for sequence encoding

None
max_length int

Max token length

512

Returns:

Type Description
DNADataset

An instance wrapping a datasets.Dataset

Source code in dnallm/datahandling/data.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
@classmethod
def from_modelscope(
    cls,
    dataset_name: str,
    seq_col: str = "sequence",
    label_col: str = "labels",
    data_dir: str | None = None,
    tokenizer: PreTrainedTokenizerBase | None = None,
    max_length: int = 512,
) -> "DNADataset":
    """Load a dataset from the ModelScope.

    Args:
        dataset_name: Name of the dataset
        seq_col: Column name for the DNA sequence
        label_col: Column name for the label
        data_dir: Data directory in a dataset
        tokenizer: Tokenizer for sequence encoding
        max_length: Max token length

    Returns:
        An instance wrapping a datasets.Dataset
    """
    from modelscope import MsDataset

    if data_dir:
        ds = MsDataset.load(dataset_name, data_dir=data_dir)
    else:
        ds = MsDataset.load(dataset_name)
    # Rename columns if necessary
    if seq_col != "sequence":
        ds = ds.rename_column(seq_col, "sequence")
    if label_col != "labels":
        ds = ds.rename_column(label_col, "labels")
    return cls(ds, tokenizer=tokenizer, max_length=max_length)
get_split_lengths
get_split_lengths()

Get lengths of individual splits for DatasetDict.

Returns:

Type Description
dict | None

Dictionary of split names and their lengths, or None for single

dict | None

dataset

Source code in dnallm/datahandling/data.py
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
def get_split_lengths(self) -> dict | None:
    """Get lengths of individual splits for DatasetDict.

    Returns:
        Dictionary of split names and their lengths, or None for single
        dataset
    """
    if isinstance(self.dataset, DatasetDict):
        return {dt: len(self.dataset[dt]) for dt in self.dataset}
    else:
        return None
head
head(head=10, show=False)

Fetch the head n data from the dataset.

Parameters:

Name Type Description Default
head int

Number of samples to fetch

10
show bool

Whether to print the data or return it

False

Returns:

Type Description
dict[Any, Any] | None

A dictionary containing the first n samples if show=False,

dict[Any, Any] | None

otherwise None

Source code in dnallm/datahandling/data.py
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
def head(
    self, head: int = 10, show: bool = False
) -> dict[Any, Any] | None:
    """Fetch the head n data from the dataset.

    Args:
        head: Number of samples to fetch
        show: Whether to print the data or return it

    Returns:
        A dictionary containing the first n samples if show=False,
        otherwise None
    """
    import pprint

    def format_convert(data):
        df: dict[Any, Any] = {}
        length = len(data["sequence"])
        for i in range(length):
            df[i] = {}
            for key in data.keys():
                df[i][key] = data[key][i]
        return df

    dataset = self.dataset
    if isinstance(dataset, DatasetDict):
        df = {}
        for dt in dataset.keys():
            data = dataset[dt][:head]
            if show:
                print(f"Dataset: {dt}")
                pprint.pp(format_convert(data))
            else:
                df[dt] = data
        return df if not show else None
    else:
        data = dataset[:head]
        if show:
            pprint.pp(format_convert(data))
            return None
        else:
            return dict(data)
iter_batches
iter_batches(batch_size)

Generator that yields batches of examples from the dataset.

Parameters:

Name Type Description Default
batch_size int

Size of each batch

required

Yields:

Type Description

A batch of examples

Raises:

Type Description
ValueError

If dataset is a DatasetDict

Source code in dnallm/datahandling/data.py
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
def iter_batches(self, batch_size: int):
    """Generator that yields batches of examples from the dataset.

    Args:
        batch_size: Size of each batch

    Yields:
        A batch of examples

    Raises:
        ValueError: If dataset is a DatasetDict
    """
    if isinstance(self.dataset, DatasetDict):
        raise ValueError(
            "Dataset is a DatasetDict Object, please use "
            "`DNADataset.dataset[datatype].iter_batches(batch_size)` "
            "instead."
        )
    else:
        for i in range(0, len(self.dataset), batch_size):
            yield self.dataset[i : i + batch_size]
load_local_data classmethod
load_local_data(
    file_paths,
    seq_col="sequence",
    label_col="labels",
    sep=None,
    fasta_sep="|",
    multi_label_sep=None,
    tokenizer=None,
    max_length=512,
)

Load DNA sequence datasets from one or multiple local files.

Supports input formats: csv, tsv, json, parquet, arrow, dict, fasta, txt, pkl, pickle.

Parameters:

Name Type Description Default
file_paths str | list | dict

Single dataset: Provide one file path (e.g., "data.csv"). Pre-split datasets: Provide a dict like {"train": "train.csv", "test": "test.csv"}

required
seq_col str

Column name for DNA sequences

'sequence'
label_col str

Column name for labels

'labels'
sep str | None

Delimiter for CSV, TSV, or TXT

None
fasta_sep str

Delimiter for FASTA files

'|'
multi_label_sep str | None

Delimiter for multi-label sequences

None
tokenizer PreTrainedTokenizerBase | None

A tokenizer for sequence encoding

None
max_length int

Max token length

512

Returns:

Type Description
DNADataset

An instance wrapping a Dataset or DatasetDict

Raises:

Type Description
ValueError

If file type is not supported

Source code in dnallm/datahandling/data.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
@classmethod
def load_local_data(
    cls,
    file_paths: str | list | dict,
    seq_col: str = "sequence",
    label_col: str = "labels",
    sep: str | None = None,
    fasta_sep: str = "|",
    multi_label_sep: str | None = None,
    tokenizer: PreTrainedTokenizerBase | None = None,
    max_length: int = 512,
) -> "DNADataset":
    """Load DNA sequence datasets from one or multiple local files.

    Supports input formats: csv, tsv, json, parquet, arrow, dict, fasta,
    txt, pkl, pickle.

    Args:
        file_paths: Single dataset: Provide one file path
            (e.g., "data.csv").
            Pre-split datasets: Provide a dict like
            {"train": "train.csv", "test": "test.csv"}
        seq_col: Column name for DNA sequences
        label_col: Column name for labels
        sep: Delimiter for CSV, TSV, or TXT
        fasta_sep: Delimiter for FASTA files
        multi_label_sep: Delimiter for multi-label sequences
        tokenizer: A tokenizer for sequence encoding
        max_length: Max token length

    Returns:
        An instance wrapping a Dataset or DatasetDict

    Raises:
        ValueError: If file type is not supported
    """
    # Set separators
    cls.sep = sep
    cls.multi_label_sep = multi_label_sep
    # Check if input is a list or dict
    if isinstance(
        file_paths, dict
    ):  # Handling multiple files (pre-split datasets)
        ds_dict = {}
        for split, path in file_paths.items():
            ds_dict[split] = cls._load_single_data(
                path, seq_col, label_col, sep, fasta_sep, multi_label_sep
            )
        dataset = DatasetDict(ds_dict)
    else:  # Handling a single file
        dataset = cls._load_single_data(
            file_paths, seq_col, label_col, sep, fasta_sep, multi_label_sep
        )
    dataset.stats = None  # Initialize stats as None

    return cls(dataset, tokenizer=tokenizer, max_length=max_length)
plot_statistics
plot_statistics(save_path=None)

Plot statistics of the dataset.

Includes sequence length distribution (histogram), GC content distribution (box plot) for each sequence. If dataset is a DatasetDict, length plots and GC content plots from different datasets will be concatenated into a single chart, respectively. Sequence length distribution is shown as a histogram, with min and max lengths for its' limit.

Parameters:

Name Type Description Default
save_path str | None

Path to save the plots. If None, plots will be shown interactively

None

Raises:

Type Description
ValueError

If statistics have not been computed yet

Source code in dnallm/datahandling/data.py
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
def plot_statistics(self, save_path: str | None = None) -> None:
    """Plot statistics of the dataset.

    Includes sequence length distribution (histogram),
    GC content distribution (box plot) for each sequence.
    If dataset is a DatasetDict, length plots and GC content plots from
    different datasets will be concatenated into a single chart,
    respectively. Sequence length distribution is shown as a histogram,
    with min and max lengths for its' limit.

    Args:
        save_path: Path to save the plots. If None, plots will be shown
            interactively

    Raises:
        ValueError: If statistics have not been computed yet
    """
    import altair as alt

    alt.data_transformers.enable("vegafusion")

    if self.stats is None or self.stats_for_plot is None:
        raise ValueError(
            "Statistics have not been computed yet. Please call "
            "`statistics()` method first."
        )

    task_type = self.data_type or "unknown"
    df = self.stats_for_plot.copy()
    final = self._create_final_chart(df, task_type)
    self._display_or_save_chart(final, save_path)
process_missing_data
process_missing_data()

Filter out samples with missing or empty sequences or labels.

Source code in dnallm/datahandling/data.py
926
927
928
929
930
931
932
933
934
935
936
def process_missing_data(self) -> None:
    """Filter out samples with missing or empty sequences or labels."""

    def non_missing(example):
        return (
            example["sequence"]
            and example["labels"] is not None
            and example["sequence"].strip() != ""
        )

    self.dataset = self.dataset.filter(non_missing)
random_generate
random_generate(
    minl,
    maxl=0,
    samples=1,
    gc=(0, 1),
    n_ratio=0.0,
    padding_size=0,
    seed=None,
    label_func=None,
    append=False,
)

Replace the current dataset with randomly generated DNA sequences.

Parameters:

Name Type Description Default
minl int

Minimum length of the sequences

required
maxl int

Maximum length of the sequences, default is the same as minl

0
samples int

Number of sequences to generate, default 1

1
gc tuple

GC content range, default (0,1)

(0, 1)
N_ratio

Include N base in the generated sequence, default 0.0

required
padding_size int

Padding size for sequence length, default 0

0
seed int | None

Random seed, default None

None
label_func Callable | None

A function that generates a label from a sequence

None
append bool

Append the random generated data to the existing dataset or use the data as a dataset

False
Source code in dnallm/datahandling/data.py
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
def random_generate(
    self,
    minl: int,
    maxl: int = 0,
    samples: int = 1,
    gc: tuple = (0, 1),
    n_ratio: float = 0.0,
    padding_size: int = 0,
    seed: int | None = None,
    label_func: Callable | None = None,
    append: bool = False,
) -> None:
    """Replace the current dataset with randomly generated DNA sequences.

    Args:
        minl: Minimum length of the sequences
        maxl: Maximum length of the sequences, default is the same as minl
        samples: Number of sequences to generate, default 1
        gc: GC content range, default (0,1)
        N_ratio: Include N base in the generated sequence, default 0.0
        padding_size: Padding size for sequence length, default 0
        seed: Random seed, default None
        label_func: A function that generates a label from a sequence
        append: Append the random generated data to the existing dataset
            or use the data as a dataset
    """

    def process(
        minl, maxl, number, gc, n_ratio, padding_size, seed, label_func
    ):
        sequences = random_generate_sequences(
            minl=minl,
            maxl=maxl,
            samples=number,
            gc=gc,
            n_ratio=n_ratio,
            padding_size=padding_size,
            seed=seed,
        )
        labels = []
        for seq in sequences:
            labels.append(label_func(seq) if label_func else 0)
        random_ds = Dataset.from_dict({
            "sequence": sequences,
            "labels": labels,
        })
        return random_ds

    if append:
        if isinstance(self.dataset, DatasetDict):
            for dt in self.dataset:
                total_length = sum(
                    len(self.dataset[split])
                    for split in self.dataset.keys()
                )
                number = round(
                    samples * len(self.dataset[dt]) / total_length
                )
                random_ds = process(
                    minl,
                    maxl,
                    number,
                    gc,
                    n_ratio,
                    padding_size,
                    seed,
                    label_func,
                )
                self.dataset[dt] = concatenate_datasets([
                    self.dataset[dt],
                    random_ds,
                ])
        else:
            random_ds = process(
                minl,
                maxl,
                samples,
                gc,
                n_ratio,
                padding_size,
                seed,
                label_func,
            )
            self.dataset = concatenate_datasets([self.dataset, random_ds])
    else:
        self.dataset = process(
            minl,
            maxl,
            samples,
            gc,
            n_ratio,
            padding_size,
            seed,
            label_func,
        )
raw_reverse_complement
raw_reverse_complement(ratio=0.5, seed=None)

Do reverse complement of sequences in the dataset.

Parameters:

Name Type Description Default
ratio float

Ratio of sequences to reverse complement

0.5
seed int | None

Random seed for reproducibility

None
Source code in dnallm/datahandling/data.py
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
def raw_reverse_complement(
    self, ratio: float = 0.5, seed: int | None = None
) -> None:
    """Do reverse complement of sequences in the dataset.

    Args:
        ratio: Ratio of sequences to reverse complement
        seed: Random seed for reproducibility
    """

    def process(ds, ratio, seed):
        random.seed(seed)
        number = len(ds["sequence"])
        idxlist = set(random.sample(range(number), int(number * ratio)))

        def concat_fn(example, idx):
            rc = reverse_complement(example["sequence"])
            if idx in idxlist:
                example["sequence"] = rc
            return example

        # Create a dataset with random reverse complement.
        ds.map(concat_fn, with_indices=True, desc="Reverse complementary")
        return ds

    if isinstance(self.dataset, DatasetDict):
        for dt in self.dataset:
            self.dataset[dt] = process(self.dataset[dt], ratio, seed)
    else:
        self.dataset = process(self.dataset, ratio, seed)
sampling
sampling(ratio=1.0, seed=None, overwrite=False)

Randomly sample a fraction of the dataset.

Parameters:

Name Type Description Default
ratio float

Fraction of the dataset to sample. Default is 1.0 (no sampling)

1.0
seed int | None

Random seed for reproducibility

None
overwrite bool

Whether to overwrite the original dataset with the sampled one

False

Returns:

Type Description
DNADataset

A DNADataset object with sampled data

Source code in dnallm/datahandling/data.py
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
def sampling(
    self,
    ratio: float = 1.0,
    seed: int | None = None,
    overwrite: bool = False,
) -> "DNADataset":
    """Randomly sample a fraction of the dataset.

    Args:
        ratio: Fraction of the dataset to sample. Default is 1.0
            (no sampling)
        seed: Random seed for reproducibility
        overwrite: Whether to overwrite the original dataset with the
            sampled one

    Returns:
        A DNADataset object with sampled data
    """
    if ratio <= 0 or ratio > 1:
        raise ValueError("ratio must be between 0 and 1")

    dataset = self.dataset
    if isinstance(dataset, DatasetDict):
        for dt in dataset.keys():
            random.seed(seed)
            random_idx = random.sample(
                range(len(dataset[dt])), int(len(dataset[dt]) * ratio)
            )
            dataset[dt] = dataset[dt].select(random_idx)
    else:
        random_idx = random.sample(
            range(len(dataset)), int(len(dataset) * ratio)
        )
        dataset = dataset.select(random_idx)

    if overwrite:
        self.dataset = dataset
        return self
    else:
        # Create a new DNADataset object with the sampled data
        return DNADataset(dataset, self.tokenizer, self.max_length)
show
show(head=10)

Display the dataset.

Parameters:

Name Type Description Default
head int

Number of samples to display

10
Source code in dnallm/datahandling/data.py
1128
1129
1130
1131
1132
1133
1134
def show(self, head: int = 10) -> None:
    """Display the dataset.

    Args:
        head: Number of samples to display
    """
    self.head(head=head, show=True)
shuffle
shuffle(seed=None)

Shuffle the dataset.

Parameters:

Name Type Description Default
seed int | None

Random seed for reproducibility

None
Source code in dnallm/datahandling/data.py
800
801
802
803
804
805
806
def shuffle(self, seed: int | None = None) -> None:
    """Shuffle the dataset.

    Args:
        seed: Random seed for reproducibility
    """
    self.dataset.shuffle(seed=seed)
split_data
split_data(test_size=0.2, val_size=0.1, seed=None)

Split the dataset into train, test, and validation sets.

Parameters:

Name Type Description Default
test_size float

Proportion of the dataset to include in the test split

0.2
val_size float

Proportion of the dataset to include in the validation split

0.1
seed int | None

Random seed for reproducibility

None
Source code in dnallm/datahandling/data.py
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
def split_data(
    self,
    test_size: float = 0.2,
    val_size: float = 0.1,
    seed: int | None = None,
) -> None:
    """Split the dataset into train, test, and validation sets.

    Args:
        test_size: Proportion of the dataset to include in the test
            split
        val_size: Proportion of the dataset to include in the validation
            split
        seed: Random seed for reproducibility
    """
    # check if the dataset is already a DatasetDict
    if isinstance(self.dataset, DatasetDict):
        raise ValueError(
            "Dataset is already a DatasetDict, no need to split"
        )
    # First, split off test+validation from training data
    split_result = self.dataset.train_test_split(
        test_size=test_size + val_size, seed=seed
    )
    train_ds = split_result["train"]
    temp_ds = split_result["test"]
    # Further split temp_ds into test and validation sets
    if val_size > 0:
        rel_val_size = val_size / (test_size + val_size)
        temp_split = temp_ds.train_test_split(
            test_size=rel_val_size, seed=seed
        )
        test_ds = temp_split["train"]
        val_ds = temp_split["test"]
        self.dataset = DatasetDict({
            "train": train_ds,
            "test": test_ds,
            "val": val_ds,
        })
    else:
        self.dataset = DatasetDict({"train": train_ds, "test": temp_ds})
statistics
statistics()

Get statistics of the dataset.

Includes number of samples, sequence length (min, max, average, median), label distribution, GC content (by labels), nucleotide composition (by labels).

Returns:

Type Description
dict

A dictionary containing statistics of the dataset

Raises:

Type Description
ValueError

If statistics have not been computed yet

Source code in dnallm/datahandling/data.py
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
def statistics(self) -> dict:
    """Get statistics of the dataset.

    Includes number of samples, sequence length (min, max, average,
    median), label distribution, GC content (by labels), nucleotide
    composition (by labels).

    Returns:
        A dictionary containing statistics of the dataset

    Raises:
        ValueError: If statistics have not been computed yet
    """

    def prepare_dataframe(dataset) -> pd.DataFrame:
        """Convert a datasets.Dataset to pandas DataFrame if needed.

        If the input is already a pandas DataFrame, return a copy.
        """
        # avoid importing datasets at top-level to keep dependency optional
        try:
            from datasets import Dataset

            is_dataset = isinstance(dataset, Dataset)
        except Exception:
            is_dataset = False

        df: pd.DataFrame
        if is_dataset:
            df = dataset.to_pandas()
        elif isinstance(dataset, pd.DataFrame):
            df = dataset.copy()
        else:
            raise ValueError(
                "prepare_dataframe expects a datasets.Dataset or "
                "pandas.DataFrame"
            )
        return df

    def compute_basic_stats(
        df: pd.DataFrame, seq_col: str = "sequence"
    ) -> dict:
        """Compute number of samples and sequence length statistics."""
        seqs = df[seq_col].fillna("").astype(str)
        lens = seqs.str.len()
        return {
            "n_samples": len(lens),
            "min_len": int(lens.min()) if len(lens) > 0 else 0,
            "max_len": int(lens.max()) if len(lens) > 0 else 0,
            "mean_len": float(lens.mean())
            if len(lens) > 0
            else float("nan"),
            "median_len": float(lens.median())
            if len(lens) > 0
            else float("nan"),
        }

    stats = {}
    seq_col = "sequence"
    # label_col = "labels"  # Not used in current implementation
    if isinstance(self.dataset, DatasetDict):
        for split_name, split_ds in self.dataset.items():
            df = prepare_dataframe(split_ds)
            data_type = self.data_type
            basic = compute_basic_stats(df, seq_col)
            stats[split_name] = {"data_type": data_type, **basic}
    else:
        df = prepare_dataframe(self.dataset)
        data_type = self.data_type
        basic = compute_basic_stats(df, seq_col)
        stats["full"] = {"data_type": data_type, **basic}

    self.stats = stats  # Store stats in the instance for later use
    self.stats_for_plot = df

    return stats
validate_sequences
validate_sequences(
    minl=20, maxl=6000, gc=(0, 1), valid_chars="ACGTN"
)

Filter the dataset to keep sequences containing valid DNA bases or allowed length.

Parameters:

Name Type Description Default
minl int

Minimum length of the sequences

20
maxl int

Maximum length of the sequences

6000
gc tuple

GC content range between 0 and 1

(0, 1)
valid_chars str

Allowed characters in the sequences

'ACGTN'
Source code in dnallm/datahandling/data.py
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
def validate_sequences(
    self,
    minl: int = 20,
    maxl: int = 6000,
    gc: tuple = (0, 1),
    valid_chars: str = "ACGTN",
) -> None:
    """Filter the dataset to keep sequences containing valid DNA bases or
    allowed length.

    Args:
        minl: Minimum length of the sequences
        maxl: Maximum length of the sequences
        gc: GC content range between 0 and 1
        valid_chars: Allowed characters in the sequences
    """
    self.dataset = self.dataset.filter(
        lambda example: check_sequence(
            example["sequence"], minl, maxl, gc, valid_chars
        )
    )

Functions

load_preset_dataset

load_preset_dataset(dataset_name, task=None)

Load a preset dataset from Hugging Face or ModelScope.

Parameters:

Name Type Description Default
dataset_name str

Name of the dataset

required
task str | None

Task directory in a dataset

None

Returns:

Type Description
DNADataset

An instance wrapping a datasets.Dataset

Raises:

Type Description
ValueError

If dataset is not found in preset datasets

Source code in dnallm/datahandling/data.py
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
def load_preset_dataset(
    dataset_name: str, task: str | None = None
) -> "DNADataset":
    """Load a preset dataset from Hugging Face or ModelScope.

    Args:
        dataset_name: Name of the dataset
        task: Task directory in a dataset

    Returns:
        An instance wrapping a datasets.Dataset

    Raises:
        ValueError: If dataset is not found in preset datasets
    """
    from .dataset_auto import PRESET_DATASETS

    ds_info = _get_dataset_info(dataset_name, PRESET_DATASETS)
    ds = _load_dataset_from_modelscope(ds_info, task)
    ds = _standardize_column_names(ds)
    return _create_dna_dataset(ds, ds_info)

show_preset_dataset

show_preset_dataset()

Show all preset datasets available in Hugging Face or ModelScope.

Returns:

Type Description
dict

A dictionary containing dataset names and their descriptions

Source code in dnallm/datahandling/data.py
1667
1668
1669
1670
1671
1672
1673
1674
1675
def show_preset_dataset() -> dict:
    """Show all preset datasets available in Hugging Face or ModelScope.

    Returns:
        A dictionary containing dataset names and their descriptions
    """
    from .dataset_auto import PRESET_DATASETS

    return PRESET_DATASETS