Skip to content

API Reference

eris.pipeline

FeatureRelation dataclass

Describes the topological relationship between a target and a contextual feature.

Attributes:

Name Type Description
feature GenomicFeature

The contextual (passenger or flanking) GenomicFeature.

spatial Context

The spatial Context (e.g. UPSTREAM).

distance_bp int

Physical distance in base pairs.

topological_dist int

Distance in graph hops.

orientation Orientation

Relative orientation (SAME or OPPOSITE).

effect Effect

Combined Effect flags (e.g. TRUNCATED).

Source code in src/eris/pipeline.py
@dataclass(slots=True)
class FeatureRelation:
    """
    Describes the topological relationship between a target and a contextual feature.

    Attributes:
        feature: The contextual (passenger or flanking) GenomicFeature.
        spatial: The spatial Context (e.g. UPSTREAM).
        distance_bp: Physical distance in base pairs.
        topological_dist: Distance in graph hops.
        orientation: Relative orientation (SAME or OPPOSITE).
        effect: Combined Effect flags (e.g. TRUNCATED).
    """
    feature: GenomicFeature
    spatial: Context
    distance_bp: int
    topological_dist: int
    orientation: Orientation
    effect: Effect = Effect.NONE

    def get_relative_position(self, locus_start: int, locus_end: int) -> str:
        """Returns a string describing the relative position to a locus boundary."""
        g_start = self.feature.bounding_start
        g_end = self.feature.bounding_end
        g_strand = self.feature.bounding_strand

        if locus_end <= g_start:
            return "5_prime_flank" if g_strand == Strand.FORWARD else "3_prime_flank"
        elif locus_start >= g_end:
            return "3_prime_flank" if g_strand == Strand.FORWARD else "5_prime_flank"
        return "overlapping"

get_relative_position

get_relative_position(locus_start: int, locus_end: int) -> str

Returns a string describing the relative position to a locus boundary.

Source code in src/eris/pipeline.py
def get_relative_position(self, locus_start: int, locus_end: int) -> str:
    """Returns a string describing the relative position to a locus boundary."""
    g_start = self.feature.bounding_start
    g_end = self.feature.bounding_end
    g_strand = self.feature.bounding_strand

    if locus_end <= g_start:
        return "5_prime_flank" if g_strand == Strand.FORWARD else "3_prime_flank"
    elif locus_start >= g_end:
        return "3_prime_flank" if g_strand == Strand.FORWARD else "5_prime_flank"
    return "overlapping"

GenomicFeature dataclass

Represents a biological feature (e.g. a gene, MGE) which may span multiple segments.

Example

seg = LocationSegment("ctg1", 100, 500, Strand.FORWARD) feat = GenomicFeature("geneA", FeatureType.CDS, [seg]) feat.is_multi_contig False

Source code in src/eris/pipeline.py
@dataclass(slots=True)
class GenomicFeature:
    """
    Represents a biological feature (e.g. a gene, MGE) which may span multiple segments.

    Example:
        >>> seg = LocationSegment("ctg1", 100, 500, Strand.FORWARD)
        >>> feat = GenomicFeature("geneA", FeatureType.CDS, [seg])
        >>> feat.is_multi_contig
        False
    """
    id: str
    type: FeatureType
    segments: list[LocationSegment]
    qualifiers: dict[str, list[str]] = field(default_factory=dict)

    @property
    def is_multi_contig(self) -> bool:
        """True if the feature spans more than one contig."""
        return len(set(seg.contig for seg in self.segments)) > 1

    def to_biopython(self) -> SeqFeature:
        """Convert to a Biopython SeqFeature object."""
        if len(self.segments) == 1:
            loc = self.segments[0].to_biopython()
        else:
            loc = CompoundLocation([s.to_biopython() for s in self.segments])
        return SeqFeature(location=loc, type=self.type.value, id=self.id, qualifiers=self.qualifiers)

    @property
    def bounding_start(self) -> int:
        """The absolute minimum start coordinate of all segments."""
        return self.segments[0].start

    @property
    def bounding_end(self) -> int:
        """The absolute maximum end coordinate of all segments."""
        return self.segments[-1].end

    @property
    def bounding_strand(self) -> Strand:
        """The strand of the primary (first) segment."""
        return self.segments[0].strand

bounding_end property

bounding_end: int

The absolute maximum end coordinate of all segments.

bounding_start property

bounding_start: int

The absolute minimum start coordinate of all segments.

bounding_strand property

bounding_strand: Strand

The strand of the primary (first) segment.

is_multi_contig property

is_multi_contig: bool

True if the feature spans more than one contig.

to_biopython

to_biopython() -> SeqFeature

Convert to a Biopython SeqFeature object.

Source code in src/eris/pipeline.py
def to_biopython(self) -> SeqFeature:
    """Convert to a Biopython SeqFeature object."""
    if len(self.segments) == 1:
        loc = self.segments[0].to_biopython()
    else:
        loc = CompoundLocation([s.to_biopython() for s in self.segments])
    return SeqFeature(location=loc, type=self.type.value, id=self.id, qualifiers=self.qualifiers)

LocationSegment dataclass

A single continuous segment of a genomic location (e.g. an exon).

Source code in src/eris/pipeline.py
@dataclass(slots=True)
class LocationSegment:
    """A single continuous segment of a genomic location (e.g. an exon)."""
    contig: str
    start: int
    end: int
    strand: Strand

    def to_biopython(self) -> SimpleLocation:
        """Convert to a Biopython SimpleLocation object."""
        return SimpleLocation(self.start, self.end, self.strand)

to_biopython

to_biopython() -> SimpleLocation

Convert to a Biopython SimpleLocation object.

Source code in src/eris/pipeline.py
def to_biopython(self) -> SimpleLocation:
    """Convert to a Biopython SimpleLocation object."""
    return SimpleLocation(self.start, self.end, self.strand)

Locus dataclass

A single assembled genomic region containing a target of interest and its context.

The locus may be a simple interval on one contig or a complex 'stitched' path across multiple contigs in an assembly graph.

Source code in src/eris/pipeline.py
@dataclass(slots=True)
class Locus:
    """
    A single assembled genomic region containing a target of interest and its context.

    The locus may be a simple interval on one contig or a complex 'stitched' path
    across multiple contigs in an assembly graph.
    """
    id: str
    contig: str
    start: int
    end: int
    targets: list[GenomicFeature]
    passengers: list['FeatureRelation']
    upstream_flanks: list['FeatureRelation']
    downstream_flanks: list['FeatureRelation']
    fractional_depth: float = 1.0  # Tracks the sub-clonal abundance

    def extract_sequence(self, genome: 'GenomeAssembly') -> str:
        """
        Extracts the full nucleotide sequence of the locus from the assembly.

        Handles multi-contig loci by stitching segments together in graph order.
        """
        if "|" not in self.contig:
            # Single-contig locus: Exact coordinate slicing
            # (Biopython Seq objects handle the slicing natively)
            return str(genome[self.contig][self.start:self.end])

        # Multi-contig stitched locus:
        # Note: To get the nucleotide-perfect stitched string without duplicating
        # the graph overlaps, we fetch the full length of the traversed contigs.
        seq_parts = []
        for ctg in self.contig.split('|'):
            seq_parts.append(str(genome[ctg]))

        return "".join(seq_parts)

extract_sequence

extract_sequence(genome: GenomeAssembly) -> str

Extracts the full nucleotide sequence of the locus from the assembly.

Handles multi-contig loci by stitching segments together in graph order.

Source code in src/eris/pipeline.py
def extract_sequence(self, genome: 'GenomeAssembly') -> str:
    """
    Extracts the full nucleotide sequence of the locus from the assembly.

    Handles multi-contig loci by stitching segments together in graph order.
    """
    if "|" not in self.contig:
        # Single-contig locus: Exact coordinate slicing
        # (Biopython Seq objects handle the slicing natively)
        return str(genome[self.contig][self.start:self.end])

    # Multi-contig stitched locus:
    # Note: To get the nucleotide-perfect stitched string without duplicating
    # the graph overlaps, we fetch the full length of the traversed contigs.
    seq_parts = []
    for ctg in self.contig.split('|'):
        seq_parts.append(str(genome[ctg]))

    return "".join(seq_parts)

LocusBuilder

Orchestrates the assembly of loci from raw alignments and genomic context.

It uses the TopologyEngine to resolve graph-spanning alignments and then identifies flanking and passenger genes for each identified locus.

Source code in src/eris/pipeline.py
class LocusBuilder:
    """
    Orchestrates the assembly of loci from raw alignments and genomic context.

    It uses the TopologyEngine to resolve graph-spanning alignments and then
    identifies flanking and passenger genes for each identified locus.
    """
    __slots__ = ('topology_engine', 'genome', 'target_feature_type', 'max_feature_hops',
                 'locus_tolerance', 'features', 'genes')

    def __init__(self, topology_engine: 'TopologyEngine', genome: 'GenomeAssembly',
                 target_feature_type: FeatureType = FeatureType.CDS,
                 max_feature_hops: int = 3, locus_tolerance: int = 0,
                 features: dict[str, list[GenomicFeature]] = None,
                 genes: dict[str, list[Gene]] = None):
        """
        Initialize the LocusBuilder.

        Args:
            topology_engine: Engine for graph traversal.
            genome: The full assembly and metadata.
            target_feature_type: Classification for primary alignment targets.
            max_feature_hops: Max contextual genes to look for in each direction.
            locus_tolerance: bp tolerance for merging adjacent targets.
            features: Dictionary of GenomicFeatures per contig.
            genes: Dictionary of PyFGS Gene objects per contig.
        """
        self.topology_engine = topology_engine
        self.genome = genome
        self.target_feature_type = target_feature_type
        self.max_feature_hops = max_feature_hops
        self.locus_tolerance = locus_tolerance
        self.features = features or {}
        self.genes = genes or {}

    def assemble(self, alignments: dict) -> Iterable['Locus']:
        """
        The main entry point for generating loci.

        Stitches graph-spanning alignments and processes local alignments
        to produce a sequence of Locus objects.
        """
        # Now expects a list of paths (lists of AlignmentRecords) instead of pairs
        cleaned_alignments, resolved_paths = self.topology_engine.resolve_split_alignments(alignments)

        # Iterate over paths of arbitrary length
        for path in resolved_paths:
            yield self._stitch(path)

        for contig_id, batch in cleaned_alignments.items():
            contig_gene_intervals = self.topology_engine.features.get(contig_id, IntervalBatch.empty())
            for locus in self._build_local(contig_id, batch, contig_gene_intervals):
                yield locus

    def _resolve_relation(self, contig: str, idx: int, interval_batch: 'IntervalBatch',
                          spatial: Context, dist: int, topo: int, target_strand: Strand,
                          target_bounds: tuple[int, int]) -> FeatureRelation:
        """Analyzes and creates a FeatureRelation for a specific gene-target pair."""

        orig_idx = interval_batch.original_indices[idx]  # type: int
        feature = self.features[contig][orig_idx]  # type: GenomicFeature
        raw_pyfgs_gene = self.genes[contig][orig_idx]  # type: Gene
        gene_strand = Strand(interval_batch.strands[idx])

        # Determine strict relational spatial context natively using coordinate limits for local genes
        if topo == 0:
            target_interval = Interval(target_bounds[0], target_bounds[1], target_strand)
            gene_interval = Interval(interval_batch.starts[idx], interval_batch.ends[idx], gene_strand)
            spatial = target_interval.relate(gene_interval)

        effect = Effect.NONE

        if spatial in (Context.OVERLAPPING, Context.OVERLAPPING_START, Context.OVERLAPPING_END):
            effect = Effect.DISRUPTED

        # If the gene was biologically broken by the insertion
        if raw_pyfgs_gene.insertions or raw_pyfgs_gene.deletions:
            # Dynamically fetch the sequence from the genome for the mutation checker
            for mut in raw_pyfgs_gene.mutations(bytes(self.genome[contig])):  # type: Mutation
                dist_to_start = abs(mut.pos - target_bounds[0])
                dist_to_end = abs(mut.pos - target_bounds[1])

                if dist_to_start <= 5 or dist_to_end <= 5:
                    effect = Effect.TRUNCATED if effect == Effect.NONE else effect | Effect.TRUNCATED
                    break

        if target_strand == Strand.UNSTRANDED or gene_strand == Strand.UNSTRANDED:
            orientation = Orientation.NONE
        elif target_strand == gene_strand:
            orientation = Orientation.SAME
        else:
            orientation = Orientation.OPPOSITE

        return FeatureRelation(
            feature=feature, spatial=spatial, distance_bp=dist,
            topological_dist=topo, orientation=orientation, effect=effect
        )

    def _extract_flanks(self, contig: str, boundary: int, walk_direction: int,
                        context: Context, intervals: Optional['IntervalBatch'], dest_list: list,
                        target_strand: Strand, target_bounds: tuple[int, int]):
        """Walks the contig (and graph) from a boundary to find flanking features."""
        rem_hops = self.max_feature_hops
        exit_strand = Strand.REVERSE if walk_direction == -1 else Strand.FORWARD

        # 1. Local Search
        if intervals:
            if walk_direction == -1:
                idx = np.searchsorted(intervals.ends, boundary, side='right')
                for i in reversed(range(max(0, idx - self.max_feature_hops), idx)):
                    d = max(0, boundary - intervals.ends[i])
                    f = self._resolve_relation(contig, i, intervals, context, int(d), 0, target_strand, target_bounds)
                    dest_list.append(f)
                    rem_hops -= 1
            else:
                idx = np.searchsorted(intervals.starts, boundary, side='left')
                for i in range(idx, min(len(intervals), idx + self.max_feature_hops)):
                    d = max(0, intervals.starts[i] - boundary)
                    f = self._resolve_relation(contig, i, intervals, context, int(d), 0, target_strand, target_bounds)
                    dest_list.append(f)
                    rem_hops -= 1

        # 2. Graph Spillover
        if rem_hops > 0:
            for s_ctg, node_depth, batch in self.topology_engine.traverse(contig, exit_strand, rem_hops):
                for i in range(len(batch)):
                    if walk_direction == -1:
                        d = max(0, boundary + batch.starts[i] - self.genome.contig_lengths[contig])
                    else:
                        d = max(0, batch.starts[i] - boundary)
                    dest_list.append(
                        self._resolve_relation(s_ctg, i, batch, context, int(d), node_depth, target_strand, target_bounds)
                    )

    def _build_local(self, contig: str, alignment_batch: 'AlignmentBatch', gene_intervals: 'IntervalBatch') -> list[
        'Locus']:
        """Identifies loci within a single contig."""
        loci = []
        aln_intervals = alignment_batch.to_intervals()
        macro_intervals = aln_intervals.merge(tolerance=self.locus_tolerance)

        for i in range(len(macro_intervals)):
            macro = macro_intervals[i]
            target_indices = aln_intervals.query(macro.start, macro.end)
            if not (targets := [alignment_batch.get_record(idx) for idx in target_indices]):
                continue

            target_features = [
                GenomicFeature(t.q_name, self.target_feature_type,
                               [LocationSegment(contig, t.t_start, t.t_end, Strand(t.strand))])
                for t in targets
            ]

            locus = Locus(
                id=f"locus_{uuid4().hex[:8]}", contig=contig, start=macro.start, end=macro.end,
                targets=target_features, passengers=[], upstream_flanks=[], downstream_flanks=[]
            )

            # Determine macro target context bounds
            primary_strand = Strand(targets[0].strand)
            macro_bounds = (macro.start, macro.end)

            internal_indices = gene_intervals.query(macro.start, macro.end)
            for idx in internal_indices:
                locus.passengers.append(
                    self._resolve_relation(contig, idx, gene_intervals, Context.INSIDE, 0, 0, primary_strand,
                                           macro_bounds))

            if primary_strand == Strand.REVERSE:
                u_dir, u_bound = 1, macro.end
                d_dir, d_bound = -1, macro.start
            else:
                u_dir, u_bound = -1, macro.start
                d_dir, d_bound = 1, macro.end

            self._extract_flanks(contig, u_bound, u_dir, Context.UPSTREAM, gene_intervals, locus.upstream_flanks,
                                 primary_strand, macro_bounds)
            self._extract_flanks(contig, d_bound, d_dir, Context.DOWNSTREAM, gene_intervals, locus.downstream_flanks,
                                 primary_strand, macro_bounds)

            loci.append(locus)

        return loci

    def _stitch(self, fragments: list['AlignmentRecord']) -> 'Locus':
        """Stitches multiple fragments into a single multi-contig locus."""
        first = fragments[0]
        last = fragments[-1]

        # Calculate the fractional flow of the stitched path
        source_depth = self.genome.contig_depths.get(first.t_name, 1.0)
        bottleneck_depth = min(self.genome.contig_depths.get(f.t_name, 1.0) for f in fragments)
        frac_depth = round(bottleneck_depth / source_depth, 3) if source_depth > 0 else 1.0

        # 1. Build the multi-segment target feature across ALL fragments
        segments = [LocationSegment(f.t_name, f.t_start, f.t_end, Strand(f.strand)) for f in fragments]
        target_feature = GenomicFeature(
            id=f"{first.q_name}_stitched",
            type=self.target_feature_type,
            segments=segments
        )

        locus = Locus(
            id=f"locus_split_{uuid4().hex[:8]}",
            contig="|".join(f.t_name for f in fragments),
            start=first.t_start,
            end=last.t_end,
            targets=[target_feature], passengers=[],
            upstream_flanks=[], downstream_flanks=[],
            fractional_depth=frac_depth  # NEW: Assign it to the Locus
        )

        # 2. UPSTREAM FLANKS (Strictly from the first fragment)
        u_dir = -1 if first.strand == 1 else 1
        u_bound = first.t_start if first.strand == 1 else first.t_end
        u_ints = self.topology_engine.features.get(first.t_name)
        self._extract_flanks(
            first.t_name, u_bound, u_dir, Context.UPSTREAM, u_ints, locus.upstream_flanks,
            Strand(first.strand), (first.t_start, first.t_end)
        )

        # 3. DOWNSTREAM FLANKS (Strictly from the last fragment)
        v_dir = 1 if last.strand == 1 else -1
        v_bound = last.t_end if last.strand == 1 else last.t_start
        v_ints = self.topology_engine.features.get(last.t_name)
        self._extract_flanks(
            last.t_name, v_bound, v_dir, Context.DOWNSTREAM, v_ints, locus.downstream_flanks,
            Strand(last.strand), (last.t_start, last.t_end)
        )

        # 4. INSIDE PASSENGERS (Sweep across ALL fragments, including unaligned synthetic bubbles)
        for frag in fragments:
            f_ints = self.topology_engine.features.get(frag.t_name)
            if f_ints:
                internal_indices = f_ints.query(frag.t_start, frag.t_end)
                for idx in internal_indices:
                    locus.passengers.append(
                        self._resolve_relation(
                            frag.t_name, idx, f_ints, Context.INSIDE, 0, 0,
                            Strand(frag.strand), (frag.t_start, frag.t_end)
                        )
                    )

        return locus

__init__

__init__(topology_engine: TopologyEngine, genome: GenomeAssembly, target_feature_type: FeatureType = FeatureType.CDS, max_feature_hops: int = 3, locus_tolerance: int = 0, features: dict[str, list[GenomicFeature]] = None, genes: dict[str, list[Gene]] = None)

Initialize the LocusBuilder.

Parameters:

Name Type Description Default
topology_engine TopologyEngine

Engine for graph traversal.

required
genome GenomeAssembly

The full assembly and metadata.

required
target_feature_type FeatureType

Classification for primary alignment targets.

CDS
max_feature_hops int

Max contextual genes to look for in each direction.

3
locus_tolerance int

bp tolerance for merging adjacent targets.

0
features dict[str, list[GenomicFeature]]

Dictionary of GenomicFeatures per contig.

None
genes dict[str, list[Gene]]

Dictionary of PyFGS Gene objects per contig.

None
Source code in src/eris/pipeline.py
def __init__(self, topology_engine: 'TopologyEngine', genome: 'GenomeAssembly',
             target_feature_type: FeatureType = FeatureType.CDS,
             max_feature_hops: int = 3, locus_tolerance: int = 0,
             features: dict[str, list[GenomicFeature]] = None,
             genes: dict[str, list[Gene]] = None):
    """
    Initialize the LocusBuilder.

    Args:
        topology_engine: Engine for graph traversal.
        genome: The full assembly and metadata.
        target_feature_type: Classification for primary alignment targets.
        max_feature_hops: Max contextual genes to look for in each direction.
        locus_tolerance: bp tolerance for merging adjacent targets.
        features: Dictionary of GenomicFeatures per contig.
        genes: Dictionary of PyFGS Gene objects per contig.
    """
    self.topology_engine = topology_engine
    self.genome = genome
    self.target_feature_type = target_feature_type
    self.max_feature_hops = max_feature_hops
    self.locus_tolerance = locus_tolerance
    self.features = features or {}
    self.genes = genes or {}

assemble

assemble(alignments: dict) -> Iterable[Locus]

The main entry point for generating loci.

Stitches graph-spanning alignments and processes local alignments to produce a sequence of Locus objects.

Source code in src/eris/pipeline.py
def assemble(self, alignments: dict) -> Iterable['Locus']:
    """
    The main entry point for generating loci.

    Stitches graph-spanning alignments and processes local alignments
    to produce a sequence of Locus objects.
    """
    # Now expects a list of paths (lists of AlignmentRecords) instead of pairs
    cleaned_alignments, resolved_paths = self.topology_engine.resolve_split_alignments(alignments)

    # Iterate over paths of arbitrary length
    for path in resolved_paths:
        yield self._stitch(path)

    for contig_id, batch in cleaned_alignments.items():
        contig_gene_intervals = self.topology_engine.features.get(contig_id, IntervalBatch.empty())
        for locus in self._build_local(contig_id, batch, contig_gene_intervals):
            yield locus

Pipeline

High-level eris pipeline manager.

Orchestrates the entire workflow: contig processing, target mapping, gene calling, graph building, and locus assembly. Uses a thread pool for parallel contig processing.

Source code in src/eris/pipeline.py
class Pipeline:
    """
    High-level eris pipeline manager.

    Orchestrates the entire workflow: contig processing, target mapping,
    gene calling, graph building, and locus assembly. Uses a thread pool
    for parallel contig processing.
    """
    __slots__ = ('target_db', '_gene_finder', 'max_feature_hops', 'locus_tolerance', '_executor')
    _THREAD_LOCAL = thread_local()

    def __init__(self, target_db: 'TargetDatabase', max_feature_hops: int = 3, locus_tolerance: int = 0,
                 max_workers: Optional[int] = None):
        """
        Initialize the Pipeline.

        Args:
            target_db: Database of mapping targets (e.g. antibiotic resistance genes).
            max_feature_hops: Contextual search depth.
            locus_tolerance: Merging tolerance for adjacent hits.
            max_workers: Number of threads for parallel processing.
        """
        self.target_db = target_db
        self.max_feature_hops = max_feature_hops
        self.locus_tolerance = locus_tolerance
        self._gene_finder = GeneFinder(model=Model.Complete, whole_genome=False)
        self._executor = ThreadPoolExecutor(max_workers=max_workers)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._executor.shutdown(cancel_futures=True, wait=False)
        self._gene_finder = None

    def _process_contig(self, contig: tuple[str, Seq]) -> tuple[
        str, Optional[IntervalBatch], Optional[AlignmentBatch], list[GenomicFeature], list[Gene]]:
        """Worker function for processing a single contig (Mapping + Gene Calling)."""

        if not hasattr(self._THREAD_LOCAL, "buf"):
            self._THREAD_LOCAL.buf = ThreadBuffer()

        gene_batch, aln_batch, features, genes = None, None, [], []
        contig_id, contig_seq = contig

        if alns := list(self.target_db.aligner.map(str(contig_seq), buf=self._THREAD_LOCAL.buf)):
            aln_batch = AlignmentBatch.from_mappy(contig_id, len(contig_seq), alns).swap_sides().cull_overlaps()

        if genes := self._gene_finder.find_genes(bytes(contig_seq)):
            num_genes = len(genes)
            starts = np.empty(num_genes, dtype=np.int32)
            ends = np.empty(num_genes, dtype=np.int32)
            strands = np.empty(num_genes, dtype=np.int8)

            for i, g in enumerate(genes):
                starts[i] = g.start
                ends[i] = g.end
                strands[i] = g.strand
                features.append(GenomicFeature(
                    id=f"{contig_id}_{g.start}_{g.end}",
                    type=FeatureType.CDS,
                    segments=[LocationSegment(contig_id, g.start, g.end, Strand(g.strand))]
                ))

            gene_batch = IntervalBatch(starts=starts, ends=ends, strands=strands,
                                       original_indices=np.arange(num_genes, dtype=np.int32)).sort()

        return contig_id, gene_batch, aln_batch, features, genes

    def __call__(self, genome: 'GenomeAssembly', out: Optional[Any] = None) -> Iterable['Locus']:
        """Runs the full pipeline on a genome assembly."""
        alignments, gene_intervals, gene_features, gene_cds = {}, {}, {}, {}

        for contig_id, g_batch, a_batch, features, genes in self._executor.map(self._process_contig, genome):
            if g_batch:
                gene_intervals[contig_id] = g_batch
                gene_features[contig_id] = features
                gene_cds[contig_id] = genes
                if out:
                    out.write_global_genes(contig_id, bytes(genome[contig_id]), genes)
            if a_batch:
                alignments[contig_id] = a_batch

        # for batch in alignments.values():
        #     for n, _ in enumerate(batch.q_names):
        #         print(batch.get_record(n))

        topology_engine = TopologyEngine(genome.edges, genome.contig_lengths, genome.contig_depths, gene_intervals)

        builder = LocusBuilder(
            topology_engine=topology_engine,
            genome=genome,
            target_feature_type=self.target_db.feature_type,
            max_feature_hops=self.max_feature_hops,
            locus_tolerance=self.locus_tolerance,
            features=gene_features,
            genes=gene_cds
        )

        yield from builder.assemble(alignments)

__call__

__call__(genome: GenomeAssembly, out: Optional[Any] = None) -> Iterable[Locus]

Runs the full pipeline on a genome assembly.

Source code in src/eris/pipeline.py
def __call__(self, genome: 'GenomeAssembly', out: Optional[Any] = None) -> Iterable['Locus']:
    """Runs the full pipeline on a genome assembly."""
    alignments, gene_intervals, gene_features, gene_cds = {}, {}, {}, {}

    for contig_id, g_batch, a_batch, features, genes in self._executor.map(self._process_contig, genome):
        if g_batch:
            gene_intervals[contig_id] = g_batch
            gene_features[contig_id] = features
            gene_cds[contig_id] = genes
            if out:
                out.write_global_genes(contig_id, bytes(genome[contig_id]), genes)
        if a_batch:
            alignments[contig_id] = a_batch

    # for batch in alignments.values():
    #     for n, _ in enumerate(batch.q_names):
    #         print(batch.get_record(n))

    topology_engine = TopologyEngine(genome.edges, genome.contig_lengths, genome.contig_depths, gene_intervals)

    builder = LocusBuilder(
        topology_engine=topology_engine,
        genome=genome,
        target_feature_type=self.target_db.feature_type,
        max_feature_hops=self.max_feature_hops,
        locus_tolerance=self.locus_tolerance,
        features=gene_features,
        genes=gene_cds
    )

    yield from builder.assemble(alignments)

__init__

__init__(target_db: TargetDatabase, max_feature_hops: int = 3, locus_tolerance: int = 0, max_workers: Optional[int] = None)

Initialize the Pipeline.

Parameters:

Name Type Description Default
target_db TargetDatabase

Database of mapping targets (e.g. antibiotic resistance genes).

required
max_feature_hops int

Contextual search depth.

3
locus_tolerance int

Merging tolerance for adjacent hits.

0
max_workers Optional[int]

Number of threads for parallel processing.

None
Source code in src/eris/pipeline.py
def __init__(self, target_db: 'TargetDatabase', max_feature_hops: int = 3, locus_tolerance: int = 0,
             max_workers: Optional[int] = None):
    """
    Initialize the Pipeline.

    Args:
        target_db: Database of mapping targets (e.g. antibiotic resistance genes).
        max_feature_hops: Contextual search depth.
        locus_tolerance: Merging tolerance for adjacent hits.
        max_workers: Number of threads for parallel processing.
    """
    self.target_db = target_db
    self.max_feature_hops = max_feature_hops
    self.locus_tolerance = locus_tolerance
    self._gene_finder = GeneFinder(model=Model.Complete, whole_genome=False)
    self._executor = ThreadPoolExecutor(max_workers=max_workers)

eris.graph

Edge

Bases: NamedTuple

Represents a directed link between two contigs in an assembly graph.

Attributes:

Name Type Description
u str

Source contig name.

u_strand Strand

Strand of the source contig (Strand.FORWARD or Strand.REVERSE).

v str

Destination contig name.

v_strand Strand

Strand of the destination contig.

overlap int

Length of the sequence overlap between the two contigs in base pairs.

Source code in src/eris/graph.py
class Edge(NamedTuple):
    """
    Represents a directed link between two contigs in an assembly graph.

    Attributes:
        u: Source contig name.
        u_strand: Strand of the source contig (Strand.FORWARD or Strand.REVERSE).
        v: Destination contig name.
        v_strand: Strand of the destination contig.
        overlap: Length of the sequence overlap between the two contigs in base pairs.
    """
    u: str
    u_strand: Strand
    v: str
    v_strand: Strand
    overlap: int = 0

    def reverse(self) -> 'Edge':
        """Returns a new Edge object representing the reverse traversal (v -> u)."""
        return Edge(self.v, self.v_strand, self.u, self.u_strand, self.overlap)

reverse

reverse() -> Edge

Returns a new Edge object representing the reverse traversal (v -> u).

Source code in src/eris/graph.py
def reverse(self) -> 'Edge':
    """Returns a new Edge object representing the reverse traversal (v -> u)."""
    return Edge(self.v, self.v_strand, self.u, self.u_strand, self.overlap)

Graph

Manages an assembly graph, supporting both directed and undirected traversal.

Nodes represent contigs (or fragments), and edges represent physical adjacencies (e.g., from an assembly GFA file).

Example

graph = Graph(directed=True) graph.add_edge(Edge("ctg1", Strand.FORWARD, "ctg2", Strand.FORWARD, 50)) neighbors = graph.get_neighbors("ctg1")

Source code in src/eris/graph.py
class Graph:
    """
    Manages an assembly graph, supporting both directed and undirected traversal.

    Nodes represent contigs (or fragments), and edges represent physical
    adjacencies (e.g., from an assembly GFA file).

    Example:
        >>> graph = Graph(directed=True)
        >>> graph.add_edge(Edge("ctg1", Strand.FORWARD, "ctg2", Strand.FORWARD, 50))
        >>> neighbors = graph.get_neighbors("ctg1")
    """
    __slots__ = ('adj', 'in_adj', 'edges', '_nodes', 'directed')
    def __init__(self, edges: Iterable[Edge] = None, directed: bool = True):
        """
        Initialize the graph.

        Args:
            edges: Optional iterable of Edge objects to seed the graph.
            directed: If True, edges are strictly one-way. If False, every
                added edge u -> v implicitly adds v -> u.
        """
        # Adjacency list: maps node ID to a set of outgoing Edge objects *starting* uom that node.
        # For undirected graphs, this will include edges representing reverse traversal.
        self.adj: dict[str, set[Edge]] = defaultdict(set)
        # In-degree adjacency list for efficient reverse lookups
        self.in_adj: dict[str, set[Edge]] = defaultdict(set)
        # Set of unique Edge objects fundamentally added to the graph.
        self.edges: set[Edge] = set()
        self._nodes: set[str] = set()
        self.directed: bool = directed
        if edges is not None:
            for edge in edges:
                self.add_edge(edge)

    def __repr__(self):
        # Note: len(self.edges) counts only the *unique* edge objects added,
        # not the total number of traversable connections in the undirected case.
        return f"{'Directed' if self.directed else 'Undirected'} Graph with {len(self._nodes)} nodes and {len(self.edges)} defined edges"

    def __iter__(self):
        return iter(self.edges)

    def __len__(self):
        return len(self.edges)

    def add_node(self, node: str):
        """Adds a node to the graph if it doesn't already exist."""
        self._nodes.add(node)

    def add_edge(self, edge: Edge):
        """
        Adds an edge to the graph.

        If the graph is undirected, the reverse connection is also added
        to the adjacency lists to allow bidirectional traversal.
        """
        # Add the nodes to the set of known node IDs
        self.add_node(edge.u)
        self.add_node(edge.v)

        # This helps track the originally added edges vs implicit reverse ones
        self.edges.add(edge)  # Add the primary edge representation if it's new

        # Check if this specific edge object is already in the adjacency list for the 'u' node
        self.adj[edge.u].add(edge)
        self.in_adj[edge.v].add(edge)

        # If the graph is undirected, add reverse connectivity as well
        if not self.directed:
            # Create a conceptual reverse edge for traversal and add to the adjacency list of the 'v' node
            reverse_edge = edge.reverse()
            self.in_adj[edge.u].add(reverse_edge)
            self.adj[edge.v].add(reverse_edge)
            # Note: We do not add the reverse_edge to self.edges unless it's explicitly added later by the user

    def get_neighbors(self, node_id: str) -> set[Edge]:
        """
        Returns the set of outgoing edges for a given node ID.

        Respects graph directionality. For undirected graphs, this includes
        implicit reverse connections.
        """
        return self.adj.get(node_id, set())

__init__

__init__(edges: Iterable[Edge] = None, directed: bool = True)

Initialize the graph.

Parameters:

Name Type Description Default
edges Iterable[Edge]

Optional iterable of Edge objects to seed the graph.

None
directed bool

If True, edges are strictly one-way. If False, every added edge u -> v implicitly adds v -> u.

True
Source code in src/eris/graph.py
def __init__(self, edges: Iterable[Edge] = None, directed: bool = True):
    """
    Initialize the graph.

    Args:
        edges: Optional iterable of Edge objects to seed the graph.
        directed: If True, edges are strictly one-way. If False, every
            added edge u -> v implicitly adds v -> u.
    """
    # Adjacency list: maps node ID to a set of outgoing Edge objects *starting* uom that node.
    # For undirected graphs, this will include edges representing reverse traversal.
    self.adj: dict[str, set[Edge]] = defaultdict(set)
    # In-degree adjacency list for efficient reverse lookups
    self.in_adj: dict[str, set[Edge]] = defaultdict(set)
    # Set of unique Edge objects fundamentally added to the graph.
    self.edges: set[Edge] = set()
    self._nodes: set[str] = set()
    self.directed: bool = directed
    if edges is not None:
        for edge in edges:
            self.add_edge(edge)

add_edge

add_edge(edge: Edge)

Adds an edge to the graph.

If the graph is undirected, the reverse connection is also added to the adjacency lists to allow bidirectional traversal.

Source code in src/eris/graph.py
def add_edge(self, edge: Edge):
    """
    Adds an edge to the graph.

    If the graph is undirected, the reverse connection is also added
    to the adjacency lists to allow bidirectional traversal.
    """
    # Add the nodes to the set of known node IDs
    self.add_node(edge.u)
    self.add_node(edge.v)

    # This helps track the originally added edges vs implicit reverse ones
    self.edges.add(edge)  # Add the primary edge representation if it's new

    # Check if this specific edge object is already in the adjacency list for the 'u' node
    self.adj[edge.u].add(edge)
    self.in_adj[edge.v].add(edge)

    # If the graph is undirected, add reverse connectivity as well
    if not self.directed:
        # Create a conceptual reverse edge for traversal and add to the adjacency list of the 'v' node
        reverse_edge = edge.reverse()
        self.in_adj[edge.u].add(reverse_edge)
        self.adj[edge.v].add(reverse_edge)

add_node

add_node(node: str)

Adds a node to the graph if it doesn't already exist.

Source code in src/eris/graph.py
def add_node(self, node: str):
    """Adds a node to the graph if it doesn't already exist."""
    self._nodes.add(node)

get_neighbors

get_neighbors(node_id: str) -> set[Edge]

Returns the set of outgoing edges for a given node ID.

Respects graph directionality. For undirected graphs, this includes implicit reverse connections.

Source code in src/eris/graph.py
def get_neighbors(self, node_id: str) -> set[Edge]:
    """
    Returns the set of outgoing edges for a given node ID.

    Respects graph directionality. For undirected graphs, this includes
    implicit reverse connections.
    """
    return self.adj.get(node_id, set())

TopologyEngine

Engine for traversing assembly graphs and resolving complex alignments.

It provides tools for "stitching" together partial alignments that span multiple contigs by finding valid physical paths through the graph.

Source code in src/eris/graph.py
class TopologyEngine:
    """
    Engine for traversing assembly graphs and resolving complex alignments.

    It provides tools for "stitching" together partial alignments that span
    multiple contigs by finding valid physical paths through the graph.
    """
    __slots__ = ('_graph', 'contig_lengths', 'contig_depths', 'features', '_visited_nodes')

    def __init__(self, edges: Iterable[Edge], contig_lengths: dict[str, int], contig_depths: dict[str, float],
                 features: dict[str, IntervalBatch] = None):
        """
        Initialize the TopologyEngine.

        Args:
            edges: Assembly graph edges.
            contig_lengths: Dictionary mapping contig ID to length in bp.
            contig_depths: Dictionary mapping contig ID to read depth.
            features: Dictionary mapping contig ID to its annotated intervals.
        """
        self._graph = Graph(edges)  # Build the graph from parsed edges
        self.contig_lengths: dict[str, int] = contig_lengths
        self.contig_depths: dict[str, float] = contig_depths
        self.features: dict[str, IntervalBatch] = features or {}
        self._visited_nodes: set[tuple[str, int]] = set()

    def resolve_split_alignments(self, alignments: dict) -> tuple[dict, list[list['AlignmentRecord']]]:
        """
        Stitch together partial alignments that span multiple graph nodes.

        Uses the assembly graph to find valid paths between fragments of the
        same query that are mapped to different contigs.

        Args:
            alignments: Dictionary mapping contig ID to AlignmentBatch.

        Returns:
            tuple: (cleaned_alignments, resolved_paths)
                - cleaned_alignments: Original alignments minus those stitched.
                - resolved_paths: Lists of AlignmentRecords forming stitched paths.
        """
        partial_alns = defaultdict(list)

        # 1. Gather all partial fragments (Do NOT remove them from the main pipeline yet)
        for contig_id, batch in alignments.items():
            for i in range(len(batch)):
                rec = batch.get_record(i)
                if rec.is_partial:
                    partial_alns[rec.q_name].append(rec)

        resolved_paths = []
        used_records = set()  # Tracks (t_name, idx) of fragments successfully stitched

        # 2. Fragment Chaining via DAG DFS
        for q_name, fragments in partial_alns.items():
            # Sort strictly by 5' -> 3' query coordinates
            fragments.sort(key=lambda x: x.q_start)
            used_in_this_qname = set()

            for i in range(len(fragments)):
                if i in used_in_this_qname:
                    continue

                start_frag = fragments[i]
                # Stack: (current_frag_index, sequence_of_records, set_of_used_indices)
                stack = [(i, [start_frag], {i})]

                best_chain = []
                best_chain_len = 0
                best_chain_used = set()

                while stack:
                    curr_idx, curr_path, curr_used = stack.pop()
                    curr_frag = curr_path[-1]  # The last real alignment in the chain

                    extended = False
                    for j in range(len(fragments)):
                        if j in curr_used or j in used_in_this_qname:
                            continue

                        next_frag = fragments[j]
                        expected_gap = next_frag.q_start - curr_frag.q_end

                        # Fragments must be sequentially downstream on the query
                        if expected_gap < -50:
                            continue

                        # Ask the graph if these two fragments physically connect
                        paths = self._find_bounded_paths(
                            curr_frag.t_name, curr_frag.strand,
                            next_frag.t_name, next_frag.strand,
                            expected_gap, tolerance=2000
                        )

                        if paths:
                            path_lengths = np.array([p['length'] for p in paths])
                            bottleneck_depths = np.array([p['min_depth'] for p in paths])
                            source_depth = self.contig_depths.get(curr_frag.t_name, 1.0)

                            # CRITICAL FIX: Safe, absolute normalization for negative gaps
                            norm_factor = np.abs(expected_gap) + 50
                            len_penalty = np.maximum(1.0 - (np.abs(path_lengths - expected_gap) / norm_factor), 0)

                            depth_fraction = bottleneck_depths / source_depth
                            scores = len_penalty * depth_fraction

                            best_p_idx = np.argmax(scores)
                            if scores[best_p_idx] > 0.05:  # Biological winner

                                winning_contigs = paths[best_p_idx]['contigs']
                                extension = self._build_stitching_payload(curr_frag, next_frag, winning_contigs)

                                # Extend the path with the synthetic nodes + next_frag and recurse!
                                new_path = curr_path + extension[1:]
                                stack.append((j, new_path, curr_used | {j}))
                                extended = True

                    # If we can't extend this branch further, evaluate its total coverage
                    if not extended:
                        chain_cov = curr_path[-1].q_end - curr_path[0].q_start
                        if chain_cov > best_chain_len:
                            best_chain = curr_path
                            best_chain_len = chain_cov
                            best_chain_used = curr_used

                # If the DFS successfully chained multiple fragments together
                if len(best_chain_used) > 1:
                    resolved_paths.append(best_chain)
                    used_in_this_qname.update(best_chain_used)

                    # Mark these specific records as "consumed" by the stitcher
                    for f in best_chain:
                        if f.idx != -1:  # Ignore the synthetic nodes
                            used_records.add((f.t_name, f.idx))

        # 3. Rebuild cleaned alignments (The Safety Net)
        cleaned_alignments = {}
        for contig_id, batch in alignments.items():
            # Keep everything EXCEPT the fragments that were successfully stitched!
            mask = np.zeros(len(batch), dtype=bool)
            for i in range(len(batch)):
                if (contig_id, i) not in used_records:
                    mask[i] = True

            intact_batch = batch.filter(mask)
            if len(intact_batch) > 0:
                cleaned_alignments[contig_id] = intact_batch

        return cleaned_alignments, resolved_paths

    def _build_stitching_payload(self, h_u: 'AlignmentRecord', h_v: 'AlignmentRecord', path_contigs: list[str]) -> list[
        'AlignmentRecord']:
        """
        Converts a list of graph contig names into a continuous sequence of AlignmentRecords.

        Generates 'synthetic' records for unaligned intermediate nodes so they
        can be processed by the LocusBuilder as part of a single path.
        """
        payload = [h_u]

        # Iterate over only the intermediate contigs (excluding the h_u and h_v anchors)
        for ctg in path_contigs[1:-1]:
            ctg_len = self.contig_lengths.get(ctg, 0)

            # Create a synthetic alignment that claims the entire unaligned contig
            synthetic_rec = AlignmentRecord(
                idx=-1,  # Flag as a synthetic/mock record
                q_name=h_u.q_name,
                q_length=h_u.q_length,
                q_start=h_u.q_end,  # Conceptually sits between the anchors
                q_end=h_v.q_start,
                t_name=ctg,
                t_length=ctg_len,
                t_start=0,  # The entire contig is part of the path
                t_end=ctg_len,
                strand=Strand.FORWARD,  # Default to forward for the traversal sequence
                length=ctg_len,
                match=0,
                mismatch=0,
                quality=0,
                cigar="*"  # No CIGAR exists for synthetic nodes
            )
            payload.append(synthetic_rec)

        payload.append(h_v)

        return payload

    def _find_bounded_paths(self, start_ctg: str, start_strand: Strand, target_ctg: str,
                            target_strand: Strand, expected_len: int, tolerance: int) -> list[dict]:
        """
        Finds all physical paths between two contigs within a length constraint.

        Args:
            start_ctg: Starting contig ID.
            start_strand: Strand to exit the start contig from.
            target_ctg: Target contig ID.
            target_strand: Required strand to enter the target contig.
            expected_len: The gap distance observed in the query sequence.
            tolerance: bp tolerance for the path length matching the expected length.

        Returns:
            list[dict]: Valid paths found, with 'contigs', 'length', and 'min_depth'.
        """
        # Stack payload: (current_contig, exit_strand, path_list, accumulated_len, bottleneck_depth)
        stack = [(start_ctg, start_strand, [start_ctg], 0, float('inf'))]
        valid_paths = []

        while stack:
            curr_ctg, curr_strand, path, dist, min_dp = stack.pop()

            # Base Case: Reached the Sink anchor
            if curr_ctg == target_ctg:
                if curr_strand == target_strand:  # Did we arrive on the correct biological strand?
                    valid_paths.append({'contigs': path, 'length': dist, 'min_depth': min_dp})
                continue

            # Prune: We have wandered too far down a dead end
            if dist > expected_len + tolerance:
                continue

            # Graph Traversal
            for edge in self._graph.get_neighbors(curr_ctg):
                if edge.u_strand != curr_strand:
                    continue

                n_ctg = edge.v
                if n_ctg in path:
                    continue  # Prevent cyclic infinite loops

                n_len = self.contig_lengths.get(n_ctg, 0)
                n_dp = self.contig_depths.get(n_ctg, 1.0)
                overlap_len = getattr(edge, 'overlap', 0)

                # CRITICAL FIX: If this is the target anchor, its length doesn't belong in the gap!
                added_dist = (n_len - overlap_len) if n_ctg != target_ctg else -overlap_len

                stack.append((
                    n_ctg,
                    edge.v_strand,
                    path + [n_ctg],
                    dist + added_dist,
                    min(min_dp, n_dp)
                ))

        return valid_paths

    def traverse(self, start_node: str, exit_strand: Strand, hops_needed: int) -> list[tuple[str, int, IntervalBatch]]:
        """
        Uses a breadth-first search to traverse the assembly graph.

        Finds neighboring contigs and projects their annotated features into
        the coordinate space of the starting contig.

        Args:
            start_node: Starting contig ID.
            exit_strand: Strand to exit from.
            hops_needed: Maximum number of features (intervals) to find.

        Returns:
            list: (contig_id, hop_index, projected_IntervalBatch)
        """
        queue = [(start_node, exit_strand, hops_needed, 1, 0)]
        projected_results = []
        visited_edges = {(start_node, exit_strand)}

        while queue:
            curr_ctg, curr_exit, rem_hops, node_depth, shift = queue.pop(0)
            if rem_hops <= 0: continue

            for edge in self._graph.get_neighbors(curr_ctg):
                if edge.u_strand != curr_exit: continue

                v = edge.v
                if (v, edge.v_strand) in visited_edges: continue
                visited_edges.add((v, edge.v_strand))

                if v in self.features:
                    n_ints = self.features[v]
                    raw_indices = np.arange(0, min(len(n_ints), rem_hops)) if edge.v_strand == Strand.FORWARD \
                        else np.arange(max(0, len(n_ints) - rem_hops), len(n_ints))[::-1]

                    valid_indices = [i for i in raw_indices if
                                     (v, n_ints.original_indices[i]) not in self._visited_nodes]
                    if valid_indices:
                        for i in valid_indices:
                            self._visited_nodes.add((v, n_ints.original_indices[i]))

                        batch = n_ints.filter(valid_indices)
                        new_shift = shift + self.contig_lengths[curr_ctg]
                        flip_len = self.contig_lengths[v] if edge.v_strand == Strand.REVERSE else None

                        projected_batch = batch.project(shift=new_shift, flip_length=flip_len)
                        projected_results.append((v, node_depth, projected_batch))

                        found_count = len(valid_indices)
                        rem_hops -= found_count

                if rem_hops > 0:
                    queue.append((v, edge.v_strand, rem_hops, node_depth + 1, shift + self.contig_lengths[curr_ctg]))

        return projected_results

__init__

__init__(edges: Iterable[Edge], contig_lengths: dict[str, int], contig_depths: dict[str, float], features: dict[str, IntervalBatch] = None)

Initialize the TopologyEngine.

Parameters:

Name Type Description Default
edges Iterable[Edge]

Assembly graph edges.

required
contig_lengths dict[str, int]

Dictionary mapping contig ID to length in bp.

required
contig_depths dict[str, float]

Dictionary mapping contig ID to read depth.

required
features dict[str, IntervalBatch]

Dictionary mapping contig ID to its annotated intervals.

None
Source code in src/eris/graph.py
def __init__(self, edges: Iterable[Edge], contig_lengths: dict[str, int], contig_depths: dict[str, float],
             features: dict[str, IntervalBatch] = None):
    """
    Initialize the TopologyEngine.

    Args:
        edges: Assembly graph edges.
        contig_lengths: Dictionary mapping contig ID to length in bp.
        contig_depths: Dictionary mapping contig ID to read depth.
        features: Dictionary mapping contig ID to its annotated intervals.
    """
    self._graph = Graph(edges)  # Build the graph from parsed edges
    self.contig_lengths: dict[str, int] = contig_lengths
    self.contig_depths: dict[str, float] = contig_depths
    self.features: dict[str, IntervalBatch] = features or {}
    self._visited_nodes: set[tuple[str, int]] = set()

resolve_split_alignments

resolve_split_alignments(alignments: dict) -> tuple[dict, list[list[AlignmentRecord]]]

Stitch together partial alignments that span multiple graph nodes.

Uses the assembly graph to find valid paths between fragments of the same query that are mapped to different contigs.

Parameters:

Name Type Description Default
alignments dict

Dictionary mapping contig ID to AlignmentBatch.

required

Returns:

Name Type Description
tuple tuple[dict, list[list[AlignmentRecord]]]

(cleaned_alignments, resolved_paths) - cleaned_alignments: Original alignments minus those stitched. - resolved_paths: Lists of AlignmentRecords forming stitched paths.

Source code in src/eris/graph.py
def resolve_split_alignments(self, alignments: dict) -> tuple[dict, list[list['AlignmentRecord']]]:
    """
    Stitch together partial alignments that span multiple graph nodes.

    Uses the assembly graph to find valid paths between fragments of the
    same query that are mapped to different contigs.

    Args:
        alignments: Dictionary mapping contig ID to AlignmentBatch.

    Returns:
        tuple: (cleaned_alignments, resolved_paths)
            - cleaned_alignments: Original alignments minus those stitched.
            - resolved_paths: Lists of AlignmentRecords forming stitched paths.
    """
    partial_alns = defaultdict(list)

    # 1. Gather all partial fragments (Do NOT remove them from the main pipeline yet)
    for contig_id, batch in alignments.items():
        for i in range(len(batch)):
            rec = batch.get_record(i)
            if rec.is_partial:
                partial_alns[rec.q_name].append(rec)

    resolved_paths = []
    used_records = set()  # Tracks (t_name, idx) of fragments successfully stitched

    # 2. Fragment Chaining via DAG DFS
    for q_name, fragments in partial_alns.items():
        # Sort strictly by 5' -> 3' query coordinates
        fragments.sort(key=lambda x: x.q_start)
        used_in_this_qname = set()

        for i in range(len(fragments)):
            if i in used_in_this_qname:
                continue

            start_frag = fragments[i]
            # Stack: (current_frag_index, sequence_of_records, set_of_used_indices)
            stack = [(i, [start_frag], {i})]

            best_chain = []
            best_chain_len = 0
            best_chain_used = set()

            while stack:
                curr_idx, curr_path, curr_used = stack.pop()
                curr_frag = curr_path[-1]  # The last real alignment in the chain

                extended = False
                for j in range(len(fragments)):
                    if j in curr_used or j in used_in_this_qname:
                        continue

                    next_frag = fragments[j]
                    expected_gap = next_frag.q_start - curr_frag.q_end

                    # Fragments must be sequentially downstream on the query
                    if expected_gap < -50:
                        continue

                    # Ask the graph if these two fragments physically connect
                    paths = self._find_bounded_paths(
                        curr_frag.t_name, curr_frag.strand,
                        next_frag.t_name, next_frag.strand,
                        expected_gap, tolerance=2000
                    )

                    if paths:
                        path_lengths = np.array([p['length'] for p in paths])
                        bottleneck_depths = np.array([p['min_depth'] for p in paths])
                        source_depth = self.contig_depths.get(curr_frag.t_name, 1.0)

                        # CRITICAL FIX: Safe, absolute normalization for negative gaps
                        norm_factor = np.abs(expected_gap) + 50
                        len_penalty = np.maximum(1.0 - (np.abs(path_lengths - expected_gap) / norm_factor), 0)

                        depth_fraction = bottleneck_depths / source_depth
                        scores = len_penalty * depth_fraction

                        best_p_idx = np.argmax(scores)
                        if scores[best_p_idx] > 0.05:  # Biological winner

                            winning_contigs = paths[best_p_idx]['contigs']
                            extension = self._build_stitching_payload(curr_frag, next_frag, winning_contigs)

                            # Extend the path with the synthetic nodes + next_frag and recurse!
                            new_path = curr_path + extension[1:]
                            stack.append((j, new_path, curr_used | {j}))
                            extended = True

                # If we can't extend this branch further, evaluate its total coverage
                if not extended:
                    chain_cov = curr_path[-1].q_end - curr_path[0].q_start
                    if chain_cov > best_chain_len:
                        best_chain = curr_path
                        best_chain_len = chain_cov
                        best_chain_used = curr_used

            # If the DFS successfully chained multiple fragments together
            if len(best_chain_used) > 1:
                resolved_paths.append(best_chain)
                used_in_this_qname.update(best_chain_used)

                # Mark these specific records as "consumed" by the stitcher
                for f in best_chain:
                    if f.idx != -1:  # Ignore the synthetic nodes
                        used_records.add((f.t_name, f.idx))

    # 3. Rebuild cleaned alignments (The Safety Net)
    cleaned_alignments = {}
    for contig_id, batch in alignments.items():
        # Keep everything EXCEPT the fragments that were successfully stitched!
        mask = np.zeros(len(batch), dtype=bool)
        for i in range(len(batch)):
            if (contig_id, i) not in used_records:
                mask[i] = True

        intact_batch = batch.filter(mask)
        if len(intact_batch) > 0:
            cleaned_alignments[contig_id] = intact_batch

    return cleaned_alignments, resolved_paths

traverse

traverse(start_node: str, exit_strand: Strand, hops_needed: int) -> list[tuple[str, int, IntervalBatch]]

Uses a breadth-first search to traverse the assembly graph.

Finds neighboring contigs and projects their annotated features into the coordinate space of the starting contig.

Parameters:

Name Type Description Default
start_node str

Starting contig ID.

required
exit_strand Strand

Strand to exit from.

required
hops_needed int

Maximum number of features (intervals) to find.

required

Returns:

Name Type Description
list list[tuple[str, int, IntervalBatch]]

(contig_id, hop_index, projected_IntervalBatch)

Source code in src/eris/graph.py
def traverse(self, start_node: str, exit_strand: Strand, hops_needed: int) -> list[tuple[str, int, IntervalBatch]]:
    """
    Uses a breadth-first search to traverse the assembly graph.

    Finds neighboring contigs and projects their annotated features into
    the coordinate space of the starting contig.

    Args:
        start_node: Starting contig ID.
        exit_strand: Strand to exit from.
        hops_needed: Maximum number of features (intervals) to find.

    Returns:
        list: (contig_id, hop_index, projected_IntervalBatch)
    """
    queue = [(start_node, exit_strand, hops_needed, 1, 0)]
    projected_results = []
    visited_edges = {(start_node, exit_strand)}

    while queue:
        curr_ctg, curr_exit, rem_hops, node_depth, shift = queue.pop(0)
        if rem_hops <= 0: continue

        for edge in self._graph.get_neighbors(curr_ctg):
            if edge.u_strand != curr_exit: continue

            v = edge.v
            if (v, edge.v_strand) in visited_edges: continue
            visited_edges.add((v, edge.v_strand))

            if v in self.features:
                n_ints = self.features[v]
                raw_indices = np.arange(0, min(len(n_ints), rem_hops)) if edge.v_strand == Strand.FORWARD \
                    else np.arange(max(0, len(n_ints) - rem_hops), len(n_ints))[::-1]

                valid_indices = [i for i in raw_indices if
                                 (v, n_ints.original_indices[i]) not in self._visited_nodes]
                if valid_indices:
                    for i in valid_indices:
                        self._visited_nodes.add((v, n_ints.original_indices[i]))

                    batch = n_ints.filter(valid_indices)
                    new_shift = shift + self.contig_lengths[curr_ctg]
                    flip_len = self.contig_lengths[v] if edge.v_strand == Strand.REVERSE else None

                    projected_batch = batch.project(shift=new_shift, flip_length=flip_len)
                    projected_results.append((v, node_depth, projected_batch))

                    found_count = len(valid_indices)
                    rem_hops -= found_count

            if rem_hops > 0:
                queue.append((v, edge.v_strand, rem_hops, node_depth + 1, shift + self.contig_lengths[curr_ctg]))

    return projected_results

eris.io

Module to handle query (contigs) and target (features) IO.

GenomeAssembly dataclass

Container for a genome assembly, including contigs and their graph topology.

Handles FASTA and GFA formats, with support for transparent decompression.

Example

assembly = GenomeAssembly.from_file("assembly.gfa.gz") for contig_id, seq in assembly: print(contig_id, len(seq))

Source code in src/eris/io.py
@dataclass(slots=True, frozen=True)
class GenomeAssembly:
    """
    Container for a genome assembly, including contigs and their graph topology.

    Handles FASTA and GFA formats, with support for transparent decompression.

    Example:
        >>> assembly = GenomeAssembly.from_file("assembly.gfa.gz")
        >>> for contig_id, seq in assembly:
        >>>     print(contig_id, len(seq))
    """
    _SEQUENCE_FILE_REGEX = re_compile(
        r'\.('
        r'(?P<fasta>f(asta|a|na|fn|as|aa))|'
        r'(?P<gfa>gfa)|'
        r')\.?(?P<compression>(gz|bz2|xz))?$'
    )
    _OPENERS = {'gz': gzopen, 'bz2': bzopen, 'xz': lzopen}
    id: str
    contigs: dict[str, Seq]
    edges: list[Edge]
    contig_depths: dict[str, float]
    contig_lengths: dict[str, int]

    def __len__(self):
        """Total number of base pairs in the assembly."""
        return sum(len(i) for i in self.contigs.values())

    def __iter__(self) -> Iterator[tuple[str, Seq]]:
        """Iterate over contig IDs and sequences."""
        return iter(self.contigs.items())

    def __str__(self):
        return self.id

    def __getitem__(self, item: str) -> 'Seq':
        """Access a contig sequence by its ID."""
        return self.contigs[item]

    @classmethod
    def from_file(cls, file: Union[str, Path]):
        """
        Load an assembly from a FASTA or GFA file.

        Args:
            file: Path to the file. Supports .gz, .bz2, and .xz compression.
        """
        file = Path(file) # type: Path
        if not (m := cls._SEQUENCE_FILE_REGEX.search(file.name)):
            raise NotImplementedError(f'Unsupported format: {file}')
        reader = FastaIterator if m.group('fasta') else GfaReader
        with cls._OPENERS.get(m.group('compression'), open)(file, mode='rt') as handle:
            return cls.from_stream(handle, reader, file.name.rstrip(m.group()))

    @classmethod
    def from_stream(cls, handle: IO[str], reader, id_: str = None):
        """Load an assembly from an open file stream using the specified reader."""
        contigs, edges, depths, lengths = {}, [], {}, {}
        for record in reader(handle):
            if isinstance(record, SeqRecord):
                contigs[record.id] = record.seq
                depths[record.id] = record.annotations.get('DP', record.annotations.get('depth', 1.0))
                lengths[record.id] =  len(record)
            elif isinstance(record, Edge):
                edges.append(record)
        return cls(id_ or handle.name, contigs, edges, depths, lengths)

__getitem__

__getitem__(item: str) -> Seq

Access a contig sequence by its ID.

Source code in src/eris/io.py
def __getitem__(self, item: str) -> 'Seq':
    """Access a contig sequence by its ID."""
    return self.contigs[item]

__iter__

__iter__() -> Iterator[tuple[str, Seq]]

Iterate over contig IDs and sequences.

Source code in src/eris/io.py
def __iter__(self) -> Iterator[tuple[str, Seq]]:
    """Iterate over contig IDs and sequences."""
    return iter(self.contigs.items())

__len__

__len__()

Total number of base pairs in the assembly.

Source code in src/eris/io.py
def __len__(self):
    """Total number of base pairs in the assembly."""
    return sum(len(i) for i in self.contigs.values())

from_file classmethod

from_file(file: Union[str, Path])

Load an assembly from a FASTA or GFA file.

Parameters:

Name Type Description Default
file Union[str, Path]

Path to the file. Supports .gz, .bz2, and .xz compression.

required
Source code in src/eris/io.py
@classmethod
def from_file(cls, file: Union[str, Path]):
    """
    Load an assembly from a FASTA or GFA file.

    Args:
        file: Path to the file. Supports .gz, .bz2, and .xz compression.
    """
    file = Path(file) # type: Path
    if not (m := cls._SEQUENCE_FILE_REGEX.search(file.name)):
        raise NotImplementedError(f'Unsupported format: {file}')
    reader = FastaIterator if m.group('fasta') else GfaReader
    with cls._OPENERS.get(m.group('compression'), open)(file, mode='rt') as handle:
        return cls.from_stream(handle, reader, file.name.rstrip(m.group()))

from_stream classmethod

from_stream(handle: IO[str], reader, id_: str = None)

Load an assembly from an open file stream using the specified reader.

Source code in src/eris/io.py
@classmethod
def from_stream(cls, handle: IO[str], reader, id_: str = None):
    """Load an assembly from an open file stream using the specified reader."""
    contigs, edges, depths, lengths = {}, [], {}, {}
    for record in reader(handle):
        if isinstance(record, SeqRecord):
            contigs[record.id] = record.seq
            depths[record.id] = record.annotations.get('DP', record.annotations.get('depth', 1.0))
            lengths[record.id] =  len(record)
        elif isinstance(record, Edge):
            edges.append(record)
    return cls(id_ or handle.name, contigs, edges, depths, lengths)

GfaReader

Bases: _Handle

Reader for Graphical Fragment Assembly (GFA) files.

Parses Segment (S) and Link (L) lines into SeqRecord and Edge objects.

Source code in src/eris/io.py
class GfaReader(_Handle):
    """
    Reader for Graphical Fragment Assembly (GFA) files.

    Parses Segment (S) and Link (L) lines into SeqRecord and Edge objects.
    """

    @staticmethod
    def _parse_segment(parts: list[str]):
        name = parts[0]
        seq = parts[1]
        tags = {}
        for item in parts[2:]:
            tag, typ, val = item.split(':', maxsplit=2)
            if typ == 'f':
                val = float(val)
            elif typ == 'i':
                val = int(val)
            tags[tag] = val
        return SeqRecord(seq=Seq(seq), id=name, name=name, annotations=tags)

    @staticmethod
    def _parse_link(parts: list[str]):
        u = parts[0]
        u_strand = Strand(parts[1])
        v = parts[2]
        v_strand = Strand(parts[3])
        cigar = Cigar(parts[4])
        overlap = next((n for op, n, _, _, _ in cigar if op == 'M'), 0)
        return Edge(u, u_strand, v, v_strand, overlap)

    @classmethod
    def _parse_line(cls, line):
        if line.startswith('S\t'):
            return cls._parse_segment(line[2:].rstrip().split('\t'))
        elif line.startswith('L\t'):
            return cls._parse_link(line[2:].rstrip().split('\t'))
        else:
            return None

    def __next__(self):
        while True:  # Will naturally raise StopIteration when the handle is exhausted
            if (parsed := self._parse_line(next(self._handle))) is not None:
                return parsed

    def __iter__(self):
        for line in self._handle:
            if (parsed := self._parse_line(line)) is not None:
                yield parsed

GfaWriter

Bases: _Handle

Writer for GFA format files.

Source code in src/eris/io.py
class GfaWriter(_Handle):
    """Writer for GFA format files."""

    def write(self, item: Union[Edge, SeqRecord]) -> int:
        """Writes an Edge (Link) or SeqRecord (Segment) to the file."""
        if isinstance(item, Edge):
            return self._handle.write(f"L\t{item.u}\t{item.u_strand}\t{item.v}\t{item.u_strand}\t*\n")
        elif isinstance(item, SeqRecord):
            return self._handle.write(f"S\t{item.id}\t{item.seq}\n")
        raise TypeError(f"Unsupported type: {type(item)}")

write

write(item: Union[Edge, SeqRecord]) -> int

Writes an Edge (Link) or SeqRecord (Segment) to the file.

Source code in src/eris/io.py
def write(self, item: Union[Edge, SeqRecord]) -> int:
    """Writes an Edge (Link) or SeqRecord (Segment) to the file."""
    if isinstance(item, Edge):
        return self._handle.write(f"L\t{item.u}\t{item.u_strand}\t{item.v}\t{item.u_strand}\t*\n")
    elif isinstance(item, SeqRecord):
        return self._handle.write(f"S\t{item.id}\t{item.seq}\n")
    raise TypeError(f"Unsupported type: {type(item)}")

OutputManager

Manages all file outputs and stream lifecycles for the eris pipeline.

Handles writing of the TSV report, GFF3 annotations, protein FASTA, and locus nucleotide sequences.

Example

with OutputManager("my_sample") as out: out.write_locus_relations(locus)

Source code in src/eris/io.py
class OutputManager:
    """
    Manages all file outputs and stream lifecycles for the eris pipeline.

    Handles writing of the TSV report, GFF3 annotations, protein FASTA,
    and locus nucleotide sequences.

    Example:
        >>> with OutputManager("my_sample") as out:
        >>>     out.write_locus_relations(locus)
    """

    def __init__(self, prefix: Optional[str], write_gff: bool = True, write_faa: bool = True):
        """
        Initialize the OutputManager.

        Args:
            prefix: Filename prefix for all output files. If None, TSV is sent to stdout.
            write_gff: Whether to output an assembly-wide GFF3 file.
            write_faa: Whether to output an assembly-wide protein FASTA file.
        """
        self.prefix = prefix
        self.write_gff = write_gff
        self.write_faa = write_faa
        self._stack = ExitStack()

    def __enter__(self):
        # 1. Setup TSV Report (Stdout or File)
        if self.prefix:
            self.tsv_handle = self._stack.enter_context(open(f"{self.prefix}_report.tsv", "w"))
        else:
            self.tsv_handle = stdout

        # Write TSV Header dynamically from the dataclass
        self.tsv_handle.write(ReportRow.header())

        # 2. Setup Optional PyFGS Writers
        self.gff_writer = None
        self.faa_writer = None

        if self.prefix and self.write_gff:
            self.gff_writer = self._stack.enter_context(Gff3Writer(f"{self.prefix}_assembly.gff"))

        if self.prefix and self.write_faa:
            self.faa_writer = self._stack.enter_context(FaaWriter(f"{self.prefix}_proteins.faa"))

        # 3. Setup Locus FASTA writer
        self.locus_fasta = None
        if self.prefix:
            self.locus_fasta = self._stack.enter_context(open(f"{self.prefix}_loci.fasta", "w"))

        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._stack.__exit__(exc_type, exc_val, exc_tb)

    # --- Helper methods ---

    def write_locus_relations(self, locus: 'Locus'):
        """Writes all contextual relationships (passengers, flanks) for a locus to the TSV report."""
        for relation in locus.passengers + locus.upstream_flanks + locus.downstream_flanks:
            self._write_tsv_row(locus, relation)

    def _write_tsv_row(self, locus: 'Locus', relation: 'FeatureRelation'):
        """Constructs a ReportRow dataclass and writes it to the TSV handle."""
        row = ReportRow(
            locus_id=locus.id,
            target=",".join(t.id for t in locus.targets),
            gene_id=relation.feature.id,
            context=relation.spatial.name,
            dist_bp=relation.distance_bp,
            topo_hops=relation.topological_dist,
            orientation=relation.orientation.value,
            effect=relation.effect.name,
            fractional_depth=locus.fractional_depth
        )
        self.tsv_handle.write(row.to_tsv())

    def write_global_genes(self, contig_id: str, sequence: Union[str, bytes], genes: list['Gene']):
        """Writes assembly-wide gene calls to the GFF3 and FAA files."""
        if self.gff_writer:
            self.gff_writer.write_record(genes, contig_id, sequence)
        if self.faa_writer:
            self.faa_writer.write_record(genes, contig_id)

    def write_locus_fasta(self, locus_id: str, sequence: str):
        """Writes a single locus nucleotide sequence to the loci FASTA file."""
        if self.locus_fasta:
            self.locus_fasta.write(f">{locus_id}\n{sequence}\n")

__init__

__init__(prefix: Optional[str], write_gff: bool = True, write_faa: bool = True)

Initialize the OutputManager.

Parameters:

Name Type Description Default
prefix Optional[str]

Filename prefix for all output files. If None, TSV is sent to stdout.

required
write_gff bool

Whether to output an assembly-wide GFF3 file.

True
write_faa bool

Whether to output an assembly-wide protein FASTA file.

True
Source code in src/eris/io.py
def __init__(self, prefix: Optional[str], write_gff: bool = True, write_faa: bool = True):
    """
    Initialize the OutputManager.

    Args:
        prefix: Filename prefix for all output files. If None, TSV is sent to stdout.
        write_gff: Whether to output an assembly-wide GFF3 file.
        write_faa: Whether to output an assembly-wide protein FASTA file.
    """
    self.prefix = prefix
    self.write_gff = write_gff
    self.write_faa = write_faa
    self._stack = ExitStack()

write_global_genes

write_global_genes(contig_id: str, sequence: Union[str, bytes], genes: list[Gene])

Writes assembly-wide gene calls to the GFF3 and FAA files.

Source code in src/eris/io.py
def write_global_genes(self, contig_id: str, sequence: Union[str, bytes], genes: list['Gene']):
    """Writes assembly-wide gene calls to the GFF3 and FAA files."""
    if self.gff_writer:
        self.gff_writer.write_record(genes, contig_id, sequence)
    if self.faa_writer:
        self.faa_writer.write_record(genes, contig_id)

write_locus_fasta

write_locus_fasta(locus_id: str, sequence: str)

Writes a single locus nucleotide sequence to the loci FASTA file.

Source code in src/eris/io.py
def write_locus_fasta(self, locus_id: str, sequence: str):
    """Writes a single locus nucleotide sequence to the loci FASTA file."""
    if self.locus_fasta:
        self.locus_fasta.write(f">{locus_id}\n{sequence}\n")

write_locus_relations

write_locus_relations(locus: Locus)

Writes all contextual relationships (passengers, flanks) for a locus to the TSV report.

Source code in src/eris/io.py
def write_locus_relations(self, locus: 'Locus'):
    """Writes all contextual relationships (passengers, flanks) for a locus to the TSV report."""
    for relation in locus.passengers + locus.upstream_flanks + locus.downstream_flanks:
        self._write_tsv_row(locus, relation)

ReportRow dataclass

Represents a single structural variant or passenger gene record in the eris TSV report.

Attributes:

Name Type Description
locus_id str

Unique identifier for the assembled structural variant locus.

target str

The mobile genetic element(s) or query sequences found in this locus.

gene_id str

The identifier of the contextual passenger or flanking gene.

context str

The spatial relationship (e.g., INSIDE, UPSTREAM) of the gene to the target.

dist_bp int

Distance in base pairs between the gene and the target element.

topo_hops int

Number of graph nodes traversed to find this gene (0 if on the same contig).

orientation str

Strand orientation of the gene relative to the target (same or opposite).

effect str

Biological impact of the insertion on the gene (e.g., TRUNCATED, NONE).

fractional_depth float

The relative read depth of the variant path compared to the source contig, indicating sub-clonal abundance.

Source code in src/eris/io.py
@dataclass(slots=True, frozen=True)
class ReportRow:
    """
    Represents a single structural variant or passenger gene record in the eris TSV report.

    Attributes:
        locus_id: Unique identifier for the assembled structural variant locus.
        target: The mobile genetic element(s) or query sequences found in this locus.
        gene_id: The identifier of the contextual passenger or flanking gene.
        context: The spatial relationship (e.g., INSIDE, UPSTREAM) of the gene to the target.
        dist_bp: Distance in base pairs between the gene and the target element.
        topo_hops: Number of graph nodes traversed to find this gene (0 if on the same contig).
        orientation: Strand orientation of the gene relative to the target (same or opposite).
        effect: Biological impact of the insertion on the gene (e.g., TRUNCATED, NONE).
        fractional_depth: The relative read depth of the variant path compared to the source contig, indicating sub-clonal abundance.
    """
    locus_id: str
    target: str
    gene_id: str
    context: str
    dist_bp: int
    topo_hops: int
    orientation: str
    effect: str
    fractional_depth: float

    @classmethod
    def header(cls) -> str:
        """Returns the TSV header string dynamically generated from the dataclass fields."""
        return "\t".join(cls.__annotations__.keys()) + "\n"

    def to_tsv(self) -> str:
        """Formats the row data into a tab-separated string."""
        return f"{self.locus_id}\t{self.target}\t{self.gene_id}\t{self.context}\t{self.dist_bp}\t{self.topo_hops}\t{self.orientation}\t{self.effect}\t{self.fractional_depth}\n"

header classmethod

header() -> str

Returns the TSV header string dynamically generated from the dataclass fields.

Source code in src/eris/io.py
@classmethod
def header(cls) -> str:
    """Returns the TSV header string dynamically generated from the dataclass fields."""
    return "\t".join(cls.__annotations__.keys()) + "\n"

to_tsv

to_tsv() -> str

Formats the row data into a tab-separated string.

Source code in src/eris/io.py
def to_tsv(self) -> str:
    """Formats the row data into a tab-separated string."""
    return f"{self.locus_id}\t{self.target}\t{self.gene_id}\t{self.context}\t{self.dist_bp}\t{self.topo_hops}\t{self.orientation}\t{self.effect}\t{self.fractional_depth}\n"

TargetDatabase

Manages a database of nucleotide target sequences (e.g. MGEs, ARGs).

Wraps a minimap2 (mappy) index for efficient searching.

Example

db = TargetDatabase("targets.fasta") aligner = db.aligner

Source code in src/eris/io.py
class TargetDatabase:
    """
    Manages a database of nucleotide target sequences (e.g. MGEs, ARGs).

    Wraps a minimap2 (mappy) index for efficient searching.

    Example:
        >>> db = TargetDatabase("targets.fasta")
        >>> aligner = db.aligner
    """
    __slots__ = ('path', 'feature_type', 'indexing_threads', '_aligner')

    def __init__(self, path: Union[str, Path], feature_type: FeatureType = FeatureType.CDS, indexing_threads: int = 3):
        """
        Initialize the TargetDatabase.

        Args:
            path: Path to the FASTA or MMI file.
            feature_type: Classification for these targets.
            indexing_threads: Threads to use for on-the-fly indexing.
        """
        self.path = Path(path)
        self.feature_type = feature_type
        self.indexing_threads = indexing_threads  # Only needed if not already indexed
        self._aligner = None

        if not self.path.exists():
            raise FileNotFoundError(f"Target database not found: {self.path}")

    @property
    def aligner(self) -> Aligner:
        """Lazy-loaded mappy.Aligner instance."""
        if self._aligner is None:
            self._aligner = Aligner(fn_idx_in=str(self.path), n_threads=self.indexing_threads)
            if not self._aligner:
                raise ValueError(f"Minimap2 failed to load database: {self.path}")
        return self._aligner

aligner property

aligner: Aligner

Lazy-loaded mappy.Aligner instance.

__init__

__init__(path: Union[str, Path], feature_type: FeatureType = FeatureType.CDS, indexing_threads: int = 3)

Initialize the TargetDatabase.

Parameters:

Name Type Description Default
path Union[str, Path]

Path to the FASTA or MMI file.

required
feature_type FeatureType

Classification for these targets.

CDS
indexing_threads int

Threads to use for on-the-fly indexing.

3
Source code in src/eris/io.py
def __init__(self, path: Union[str, Path], feature_type: FeatureType = FeatureType.CDS, indexing_threads: int = 3):
    """
    Initialize the TargetDatabase.

    Args:
        path: Path to the FASTA or MMI file.
        feature_type: Classification for these targets.
        indexing_threads: Threads to use for on-the-fly indexing.
    """
    self.path = Path(path)
    self.feature_type = feature_type
    self.indexing_threads = indexing_threads  # Only needed if not already indexed
    self._aligner = None

    if not self.path.exists():
        raise FileNotFoundError(f"Target database not found: {self.path}")

eris.alignment

AlignmentBatch dataclass

A high-performance batch of alignments stored in NumPy arrays.

Uses a Structure-of-Arrays (SoA) layout to enable fast vectorized operations and efficient filtering of large alignment sets.

Example

batch = AlignmentBatch.from_mappy("read1", 100, alignments) filtered = batch.filter(batch.qualities > 30) scores = batch.scores

Source code in src/eris/alignment.py
@dataclass(frozen=True, slots=True)
class AlignmentBatch:
    """
    A high-performance batch of alignments stored in NumPy arrays.

    Uses a Structure-of-Arrays (SoA) layout to enable fast vectorized operations
    and efficient filtering of large alignment sets.

    Example:
        >>> batch = AlignmentBatch.from_mappy("read1", 100, alignments)
        >>> filtered = batch.filter(batch.qualities > 30)
        >>> scores = batch.scores
    """
    q_names: np.ndarray
    q_lengths: np.ndarray
    q_starts: np.ndarray
    q_ends: np.ndarray
    t_names: np.ndarray
    t_lengths: np.ndarray
    t_starts: np.ndarray
    t_ends: np.ndarray
    strands: np.ndarray
    lengths: np.ndarray
    matches: np.ndarray
    mismatches: np.ndarray
    qualities: np.ndarray
    cigars: np.ndarray

    def __len__(self) -> int:
        return len(self.q_starts)

    @property
    def scores(self) -> np.ndarray:
        """Vectorized alignment score (matches - mismatches)."""
        return self.matches - self.mismatches

    @classmethod
    def from_mappy(cls, q_name: str, q_length: int, alignments: Iterable[Alignment]) -> Self:
        """
        Create an AlignmentBatch from mappy Alignment objects.

        Args:
            q_name: Name of the query sequence.
            q_length: Length of the query sequence.
            alignments: Iterable of mappy.Alignment objects.

        Returns:
            AlignmentBatch: A new batch containing the alignments.
        """
        # OPTIMIZATION: List comprehension + zip is significantly faster than 14 .append() calls per hit
        data = [
            (q_name, q_length, h.q_st, h.q_en, h.ctg, h.ctg_len, h.r_st, h.r_en,
             h.strand, h.blen, h.mlen, h.NM, h.mapq, h.cigar_str)
            for h in alignments
        ]

        if not data:
            raise ValueError("Cannot initialize AlignmentBatch with empty alignments")

        qn, ql, qs, qe, tn, tl, ts, te, st, bl, ml, nm, mq, cg = zip(*data)

        return cls(
            q_names=np.array(qn, dtype=object),
            q_lengths=np.array(ql, dtype=np.int32),
            q_starts=np.array(qs, dtype=np.int32),
            q_ends=np.array(qe, dtype=np.int32),
            t_names=np.array(tn, dtype=object),
            t_lengths=np.array(tl, dtype=np.int32),
            t_starts=np.array(ts, dtype=np.int32),
            t_ends=np.array(te, dtype=np.int32),
            strands=np.array(st, dtype=np.int8),
            lengths=np.array(bl, dtype=np.int32),
            matches=np.array(ml, dtype=np.int32),
            mismatches=np.array(nm, dtype=np.int32),
            qualities=np.array(mq, dtype=np.int32),
            cigars=np.array(cg, dtype=object)
        )

    @classmethod
    def concat(cls, batches: Iterable['AlignmentBatch']) -> Self:
        """Concatenate multiple AlignmentBatch objects into one."""
        batches = list(batches)
        if not batches:
            raise ValueError("Cannot concatenate an empty iterable of batches")

        kwargs = {}
        for field_name in cls.__dataclass_fields__:
            first_val = getattr(batches[0], field_name)
            if isinstance(first_val, np.ndarray):
                kwargs[field_name] = np.concatenate([getattr(b, field_name) for b in batches])
            else:
                if any(getattr(b, field_name) != first_val for b in batches):
                    raise ValueError(f"Cannot concatenate batches with mismatched '{field_name}' values")
                kwargs[field_name] = first_val

        return cls(**kwargs)

    def filter(self, mask: np.ndarray) -> 'AlignmentBatch':
        """Return a new batch containing only elements where mask is True."""
        return AlignmentBatch(
            q_names=self.q_names[mask],
            q_lengths=self.q_lengths[mask],
            q_starts=self.q_starts[mask],
            q_ends=self.q_ends[mask],
            t_names=self.t_names[mask],
            t_lengths=self.t_lengths[mask],
            t_starts=self.t_starts[mask],
            t_ends=self.t_ends[mask],
            strands=self.strands[mask],
            lengths=self.lengths[mask],
            matches=self.matches[mask],
            mismatches=self.mismatches[mask],
            qualities=self.qualities[mask],
            cigars=self.cigars[mask]
        )

    def filter_out(self, mask: np.ndarray) -> 'AlignmentBatch':
        """Returns a new batch excluding the masked items."""
        return self.filter(~mask)

    def cull_overlaps(self, max_overlap_fraction: float = 0.1) -> 'AlignmentBatch':
        """
        Greedily culls overlapping alignments on the target.

        Prioritizes alignments with higher match scores. Useful for resolving
        competing alignments for the same query region.

        Args:
            max_overlap_fraction: Maximum allowed overlap fraction before culling.
        """
        n = len(self)
        if n < 2: return self
        kept_mask = np.zeros(n, dtype=bool)
        kept_intervals = defaultdict(list)
        for idx in np.argsort(self.scores, kind='stable')[::-1]:
            t_name = self.t_names[idx]
            s, e = self.t_starts[idx], self.t_ends[idx]
            length = e - s
            if length <= 0: continue
            kept = kept_intervals[t_name]

            overlap_found = False
            for ks, ke in kept:
                overlap = min(e, ke) - max(s, ks)
                if overlap > 0 and (overlap / length) > max_overlap_fraction:
                    overlap_found = True
                    break
            if overlap_found: continue

            kept.append((s, e))
            kept_mask[idx] = True
        return self.filter(kept_mask)

    def swap_sides(self) -> 'AlignmentBatch':
        """
        Swaps query and target roles in the alignment records.

        This is used when query sequences (contigs) are being treated as targets
        and vice-versa, which is common in reciprocal mapping.
        """
        return AlignmentBatch(
            q_names=self.t_names,
            q_lengths=self.t_lengths,
            q_starts=self.t_starts,
            q_ends=self.t_ends,
            t_names=self.q_names,
            t_lengths=self.q_lengths,
            t_starts=self.q_starts,
            t_ends=self.q_ends,
            strands=self.strands,
            lengths=self.lengths,
            matches=self.matches,
            mismatches=self.mismatches,
            qualities=self.qualities,
            cigars=np.array([c.translate(Cigar._SWAP_MAP) for c in self.cigars], dtype=object)
        )

    def split(self, by_query: bool = False) -> Iterable[tuple[str, 'AlignmentBatch']]:
        """Splits a batch into separate batches by target or query name."""
        key_array = self.q_names if by_query else self.t_names
        for key in np.unique(key_array):
            yield key, self.filter(key_array == key)

    def to_intervals(self, by_query: bool = False) -> IntervalBatch:
        """
        Converts the batch into a high-performance IntervalBatch.

        Args:
            by_query: If True, use query coordinates. Otherwise, use target coordinates.
        """
        starts = self.q_starts if by_query else self.t_starts
        ends = self.q_ends if by_query else self.t_ends

        return IntervalBatch(
            starts=starts.copy(),
            ends=ends.copy(),
            strands=self.strands.copy(),
            # CRITICAL: Ensures we can map relational queries back to this alignment record!
            original_indices=np.arange(len(self), dtype=np.int32)
        )

    def get_record(self, idx: int) -> AlignmentRecord:
        """Retrieve a single AlignmentRecord by its batch index."""
        if idx < 0 or idx >= len(self):
            raise IndexError("Batch index out of range")

        return AlignmentRecord(
            idx=idx,
            q_name=self.q_names[idx],
            q_length=self.q_lengths[idx],
            q_start=self.q_starts[idx],
            q_end=self.q_ends[idx],
            t_name=self.t_names[idx],
            t_length=self.t_lengths[idx],
            t_start=self.t_starts[idx],
            t_end=self.t_ends[idx],
            strand=self.strands[idx],
            length=self.lengths[idx],
            match=self.matches[idx],
            mismatch=self.mismatches[idx],
            quality=self.qualities[idx],
            cigar=self.cigars[idx]
        )

scores property

scores: ndarray

Vectorized alignment score (matches - mismatches).

concat classmethod

concat(batches: Iterable[AlignmentBatch]) -> Self

Concatenate multiple AlignmentBatch objects into one.

Source code in src/eris/alignment.py
@classmethod
def concat(cls, batches: Iterable['AlignmentBatch']) -> Self:
    """Concatenate multiple AlignmentBatch objects into one."""
    batches = list(batches)
    if not batches:
        raise ValueError("Cannot concatenate an empty iterable of batches")

    kwargs = {}
    for field_name in cls.__dataclass_fields__:
        first_val = getattr(batches[0], field_name)
        if isinstance(first_val, np.ndarray):
            kwargs[field_name] = np.concatenate([getattr(b, field_name) for b in batches])
        else:
            if any(getattr(b, field_name) != first_val for b in batches):
                raise ValueError(f"Cannot concatenate batches with mismatched '{field_name}' values")
            kwargs[field_name] = first_val

    return cls(**kwargs)

cull_overlaps

cull_overlaps(max_overlap_fraction: float = 0.1) -> AlignmentBatch

Greedily culls overlapping alignments on the target.

Prioritizes alignments with higher match scores. Useful for resolving competing alignments for the same query region.

Parameters:

Name Type Description Default
max_overlap_fraction float

Maximum allowed overlap fraction before culling.

0.1
Source code in src/eris/alignment.py
def cull_overlaps(self, max_overlap_fraction: float = 0.1) -> 'AlignmentBatch':
    """
    Greedily culls overlapping alignments on the target.

    Prioritizes alignments with higher match scores. Useful for resolving
    competing alignments for the same query region.

    Args:
        max_overlap_fraction: Maximum allowed overlap fraction before culling.
    """
    n = len(self)
    if n < 2: return self
    kept_mask = np.zeros(n, dtype=bool)
    kept_intervals = defaultdict(list)
    for idx in np.argsort(self.scores, kind='stable')[::-1]:
        t_name = self.t_names[idx]
        s, e = self.t_starts[idx], self.t_ends[idx]
        length = e - s
        if length <= 0: continue
        kept = kept_intervals[t_name]

        overlap_found = False
        for ks, ke in kept:
            overlap = min(e, ke) - max(s, ks)
            if overlap > 0 and (overlap / length) > max_overlap_fraction:
                overlap_found = True
                break
        if overlap_found: continue

        kept.append((s, e))
        kept_mask[idx] = True
    return self.filter(kept_mask)

filter

filter(mask: ndarray) -> AlignmentBatch

Return a new batch containing only elements where mask is True.

Source code in src/eris/alignment.py
def filter(self, mask: np.ndarray) -> 'AlignmentBatch':
    """Return a new batch containing only elements where mask is True."""
    return AlignmentBatch(
        q_names=self.q_names[mask],
        q_lengths=self.q_lengths[mask],
        q_starts=self.q_starts[mask],
        q_ends=self.q_ends[mask],
        t_names=self.t_names[mask],
        t_lengths=self.t_lengths[mask],
        t_starts=self.t_starts[mask],
        t_ends=self.t_ends[mask],
        strands=self.strands[mask],
        lengths=self.lengths[mask],
        matches=self.matches[mask],
        mismatches=self.mismatches[mask],
        qualities=self.qualities[mask],
        cigars=self.cigars[mask]
    )

filter_out

filter_out(mask: ndarray) -> AlignmentBatch

Returns a new batch excluding the masked items.

Source code in src/eris/alignment.py
def filter_out(self, mask: np.ndarray) -> 'AlignmentBatch':
    """Returns a new batch excluding the masked items."""
    return self.filter(~mask)

from_mappy classmethod

from_mappy(q_name: str, q_length: int, alignments: Iterable[Alignment]) -> Self

Create an AlignmentBatch from mappy Alignment objects.

Parameters:

Name Type Description Default
q_name str

Name of the query sequence.

required
q_length int

Length of the query sequence.

required
alignments Iterable[Alignment]

Iterable of mappy.Alignment objects.

required

Returns:

Name Type Description
AlignmentBatch Self

A new batch containing the alignments.

Source code in src/eris/alignment.py
@classmethod
def from_mappy(cls, q_name: str, q_length: int, alignments: Iterable[Alignment]) -> Self:
    """
    Create an AlignmentBatch from mappy Alignment objects.

    Args:
        q_name: Name of the query sequence.
        q_length: Length of the query sequence.
        alignments: Iterable of mappy.Alignment objects.

    Returns:
        AlignmentBatch: A new batch containing the alignments.
    """
    # OPTIMIZATION: List comprehension + zip is significantly faster than 14 .append() calls per hit
    data = [
        (q_name, q_length, h.q_st, h.q_en, h.ctg, h.ctg_len, h.r_st, h.r_en,
         h.strand, h.blen, h.mlen, h.NM, h.mapq, h.cigar_str)
        for h in alignments
    ]

    if not data:
        raise ValueError("Cannot initialize AlignmentBatch with empty alignments")

    qn, ql, qs, qe, tn, tl, ts, te, st, bl, ml, nm, mq, cg = zip(*data)

    return cls(
        q_names=np.array(qn, dtype=object),
        q_lengths=np.array(ql, dtype=np.int32),
        q_starts=np.array(qs, dtype=np.int32),
        q_ends=np.array(qe, dtype=np.int32),
        t_names=np.array(tn, dtype=object),
        t_lengths=np.array(tl, dtype=np.int32),
        t_starts=np.array(ts, dtype=np.int32),
        t_ends=np.array(te, dtype=np.int32),
        strands=np.array(st, dtype=np.int8),
        lengths=np.array(bl, dtype=np.int32),
        matches=np.array(ml, dtype=np.int32),
        mismatches=np.array(nm, dtype=np.int32),
        qualities=np.array(mq, dtype=np.int32),
        cigars=np.array(cg, dtype=object)
    )

get_record

get_record(idx: int) -> AlignmentRecord

Retrieve a single AlignmentRecord by its batch index.

Source code in src/eris/alignment.py
def get_record(self, idx: int) -> AlignmentRecord:
    """Retrieve a single AlignmentRecord by its batch index."""
    if idx < 0 or idx >= len(self):
        raise IndexError("Batch index out of range")

    return AlignmentRecord(
        idx=idx,
        q_name=self.q_names[idx],
        q_length=self.q_lengths[idx],
        q_start=self.q_starts[idx],
        q_end=self.q_ends[idx],
        t_name=self.t_names[idx],
        t_length=self.t_lengths[idx],
        t_start=self.t_starts[idx],
        t_end=self.t_ends[idx],
        strand=self.strands[idx],
        length=self.lengths[idx],
        match=self.matches[idx],
        mismatch=self.mismatches[idx],
        quality=self.qualities[idx],
        cigar=self.cigars[idx]
    )

split

split(by_query: bool = False) -> Iterable[tuple[str, AlignmentBatch]]

Splits a batch into separate batches by target or query name.

Source code in src/eris/alignment.py
def split(self, by_query: bool = False) -> Iterable[tuple[str, 'AlignmentBatch']]:
    """Splits a batch into separate batches by target or query name."""
    key_array = self.q_names if by_query else self.t_names
    for key in np.unique(key_array):
        yield key, self.filter(key_array == key)

swap_sides

swap_sides() -> AlignmentBatch

Swaps query and target roles in the alignment records.

This is used when query sequences (contigs) are being treated as targets and vice-versa, which is common in reciprocal mapping.

Source code in src/eris/alignment.py
def swap_sides(self) -> 'AlignmentBatch':
    """
    Swaps query and target roles in the alignment records.

    This is used when query sequences (contigs) are being treated as targets
    and vice-versa, which is common in reciprocal mapping.
    """
    return AlignmentBatch(
        q_names=self.t_names,
        q_lengths=self.t_lengths,
        q_starts=self.t_starts,
        q_ends=self.t_ends,
        t_names=self.q_names,
        t_lengths=self.q_lengths,
        t_starts=self.q_starts,
        t_ends=self.q_ends,
        strands=self.strands,
        lengths=self.lengths,
        matches=self.matches,
        mismatches=self.mismatches,
        qualities=self.qualities,
        cigars=np.array([c.translate(Cigar._SWAP_MAP) for c in self.cigars], dtype=object)
    )

to_intervals

to_intervals(by_query: bool = False) -> IntervalBatch

Converts the batch into a high-performance IntervalBatch.

Parameters:

Name Type Description Default
by_query bool

If True, use query coordinates. Otherwise, use target coordinates.

False
Source code in src/eris/alignment.py
def to_intervals(self, by_query: bool = False) -> IntervalBatch:
    """
    Converts the batch into a high-performance IntervalBatch.

    Args:
        by_query: If True, use query coordinates. Otherwise, use target coordinates.
    """
    starts = self.q_starts if by_query else self.t_starts
    ends = self.q_ends if by_query else self.t_ends

    return IntervalBatch(
        starts=starts.copy(),
        ends=ends.copy(),
        strands=self.strands.copy(),
        # CRITICAL: Ensures we can map relational queries back to this alignment record!
        original_indices=np.arange(len(self), dtype=np.int32)
    )

AlignmentRecord dataclass

A lightweight, self-aware view of a single alignment record.

Attributes:

Name Type Description
idx int

Original index within the source AlignmentBatch.

q_name str

Query sequence name.

q_length int

Total length of the query sequence.

q_start int

Start position on the query.

q_end int

End position on the query.

t_name str

Target (reference) sequence name.

t_length int

Total length of the target sequence.

t_start int

Start position on the target.

t_end int

End position on the target.

strand Strand

Alignment orientation (Strand.FORWARD or Strand.REVERSE).

length int

Total alignment block length.

match int

Number of matching bases.

mismatch int

Number of mismatches (NM tag).

quality int

Mapping quality (MAPQ).

cigar str

CIGAR string.

Source code in src/eris/alignment.py
@dataclass(slots=True, frozen=True)
class AlignmentRecord:
    """
    A lightweight, self-aware view of a single alignment record.

    Attributes:
        idx: Original index within the source AlignmentBatch.
        q_name: Query sequence name.
        q_length: Total length of the query sequence.
        q_start: Start position on the query.
        q_end: End position on the query.
        t_name: Target (reference) sequence name.
        t_length: Total length of the target sequence.
        t_start: Start position on the target.
        t_end: End position on the target.
        strand: Alignment orientation (Strand.FORWARD or Strand.REVERSE).
        length: Total alignment block length.
        match: Number of matching bases.
        mismatch: Number of mismatches (NM tag).
        quality: Mapping quality (MAPQ).
        cigar: CIGAR string.
    """
    idx: int
    q_name: str
    q_length: int
    q_start: int
    q_end: int
    t_name: str
    t_length: int
    t_start: int
    t_end: int
    strand: Strand
    length: int
    match: int
    mismatch: int
    quality: int
    cigar: str  # Kept as str to match the array type, can be wrapped in Cigar() if needed

    @property
    def is_partial(self) -> bool:
        """True if the alignment covers less than 90% of the query sequence."""
        return self.length < (self.q_length * 0.9)

    @property
    def hangs_5p(self) -> bool:
        """True if the alignment starts within 15bp of the target start."""
        return self.t_start <= 15

    @property
    def hangs_3p(self) -> bool:
        """True if the alignment ends within 15bp of the target end."""
        return (self.t_length - self.t_end) <= 15

hangs_3p property

hangs_3p: bool

True if the alignment ends within 15bp of the target end.

hangs_5p property

hangs_5p: bool

True if the alignment starts within 15bp of the target start.

is_partial property

is_partial: bool

True if the alignment covers less than 90% of the query sequence.

Cigar

Represents a CIGAR (Compact Idiosyncratic Gapped Alignment Report) string.

This class provides tools for parsing and iterating over CIGAR operations, calculating consumed lengths on query and target sequences.

Example

cigar = Cigar("10M2I5M") list(cigar) [('M', 10, 10, 10, 10), ('I', 2, 12, 10, 10), ('M', 5, 17, 15, 15)]

Source code in src/eris/alignment.py
class Cigar:
    """
    Represents a CIGAR (Compact Idiosyncratic Gapped Alignment Report) string.

    This class provides tools for parsing and iterating over CIGAR operations,
    calculating consumed lengths on query and target sequences.

    Example:
        >>> cigar = Cigar("10M2I5M")
        >>> list(cigar)
        [('M', 10, 10, 10, 10), ('I', 2, 12, 10, 10), ('M', 5, 17, 15, 15)]
    """
    _OPS = {
        'M': (True, True),
        'I': (True, False),
        'D': (False, True),
        'N': (False, True),
        'S': (True, False),
        'H': (False, False),
        'P': (False, False),
        '=': (True, True),
        'X': (True, True),
        'B': (False, False),
    }
    _REGEX = re_compile(r'(\d+)([MIDNSHP=XB])')
    _SWAP_MAP = str.maketrans('ID', 'DI')

    __slots__ = ('_data',)

    def __init__(self, data: str):
        """
        Initialize a Cigar object.

        Args:
            data: The CIGAR string (e.g., "100M2I10D").
        """
        self._data = data

    def __str__(self) -> str:
        return self._data

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}({self._data!r})"

    def __iter__(self) -> Generator[tuple[str, int, int, int, int], None, None]:
        """
        Iterates over the CIGAR string and yields parsed operations.

        Yields:
            tuple: (op, count, query_consumed, target_consumed, aln_consumed)
                - op: The CIGAR operation character (e.g., 'M', 'I', 'D').
                - count: The number of bases for this operation.
                - query_consumed: Cumulative bases consumed in the query.
                - target_consumed: Cumulative bases consumed in the target.
                - aln_consumed: Cumulative aligned (matching/mismatching) bases.
        """
        q_len, t_len, aln_len = 0, 0, 0
        for match in self._REGEX.finditer(self._data):
            n = int(match.group(1))
            op = match.group(2)
            consume_query, consume_target = self._OPS[op]

            if consume_query: q_len += n
            if consume_target: t_len += n
            if consume_query and consume_target: aln_len += n

            yield op, n, q_len, t_len, aln_len

__init__

__init__(data: str)

Initialize a Cigar object.

Parameters:

Name Type Description Default
data str

The CIGAR string (e.g., "100M2I10D").

required
Source code in src/eris/alignment.py
def __init__(self, data: str):
    """
    Initialize a Cigar object.

    Args:
        data: The CIGAR string (e.g., "100M2I10D").
    """
    self._data = data

__iter__

__iter__() -> Generator[tuple[str, int, int, int, int], None, None]

Iterates over the CIGAR string and yields parsed operations.

Yields:

Name Type Description
tuple tuple[str, int, int, int, int]

(op, count, query_consumed, target_consumed, aln_consumed) - op: The CIGAR operation character (e.g., 'M', 'I', 'D'). - count: The number of bases for this operation. - query_consumed: Cumulative bases consumed in the query. - target_consumed: Cumulative bases consumed in the target. - aln_consumed: Cumulative aligned (matching/mismatching) bases.

Source code in src/eris/alignment.py
def __iter__(self) -> Generator[tuple[str, int, int, int, int], None, None]:
    """
    Iterates over the CIGAR string and yields parsed operations.

    Yields:
        tuple: (op, count, query_consumed, target_consumed, aln_consumed)
            - op: The CIGAR operation character (e.g., 'M', 'I', 'D').
            - count: The number of bases for this operation.
            - query_consumed: Cumulative bases consumed in the query.
            - target_consumed: Cumulative bases consumed in the target.
            - aln_consumed: Cumulative aligned (matching/mismatching) bases.
    """
    q_len, t_len, aln_len = 0, 0, 0
    for match in self._REGEX.finditer(self._data):
        n = int(match.group(1))
        op = match.group(2)
        consume_query, consume_target = self._OPS[op]

        if consume_query: q_len += n
        if consume_target: t_len += n
        if consume_query and consume_target: aln_len += n

        yield op, n, q_len, t_len, aln_len

eris.interval

Genomic interval representation with strand and context, plus batched interval operations.

Interval

Bases: NamedTuple

A single genomic interval defined by start, end, and strand.

Uses 0-based coordinate system (start inclusive, end exclusive).

Example

itv = Interval(10, 20, Strand.FORWARD) 15 in itv True itv.shift(5) Interval(start=15, end=25, strand=)

Source code in src/eris/interval.py
class Interval(NamedTuple):
    """
    A single genomic interval defined by start, end, and strand.

    Uses 0-based coordinate system (start inclusive, end exclusive).

    Example:
        >>> itv = Interval(10, 20, Strand.FORWARD)
        >>> 15 in itv
        True
        >>> itv.shift(5)
        Interval(start=15, end=25, strand=<Strand.FORWARD: 1>)
    """
    start: int
    end: int
    strand: Strand = Strand.UNSTRANDED

    def __contains__(self, item: IntervalLike) -> bool:
        """Check if an coordinate or another interval is fully contained within this one."""
        if isinstance(item, int): return self.start <= item < self.end
        item = Interval.from_item(item)
        return self.start <= item.start and self.end >= item.end

    def __add__(self, other: IntervalLike) -> Interval:
        """Returns the minimal bounding interval covering both self and other."""
        other = Interval.from_item(other)
        new_strand = self.strand if self.strand == other.strand else 0
        return Interval(min(self.start, other.start), max(self.end, other.end), new_strand)

    def __radd__(self, other: IntervalLike) -> Interval:
        return self.__add__(other)

    def shift(self, x: int, y: int = None) -> Interval:
        """
        Shift the interval by a fixed distance.

        Args:
            x: Distance to shift the start.
            y: Optional distance to shift the end. If None, same as x.
        """
        return Interval(self.start + x, self.end + (y if y is not None else x), self.strand)

    def reverse_complement(self, length: int = None) -> Interval:
        """
        Returns the interval coordinates on the opposite strand.

        Args:
            length: The length of the parent sequence (e.g. contig).
        """
        if length is None: length = self.end
        return Interval(length - self.end, length - self.start, self.strand * -1)

    def relate(self, other: IntervalLike) -> Context:
        """
        Calculate the spatial relationship between this interval and another.

        Returns a Context enum value (e.g. UPSTREAM, INSIDE, OVERLAPPING).
        """
        other = Interval.from_item(other)
        return Context(_core_relate(self.start, self.end, self.strand, other.start, other.end))

    @classmethod
    def from_match(cls, item: Match, strand: Strand = Strand.UNSTRANDED) -> Interval:
        """Create from a regex Match object."""
        return cls(item.start(), item.end(), strand)

    @classmethod
    def from_int(cls, item: int, strand: Strand = Strand.UNSTRANDED, length: int = None) -> Interval:
        """Create a 1bp interval from an integer."""
        if item < 0 and length is not None: item += length
        return cls(item, item + 1, strand)

    @classmethod
    def from_slice(cls, item: slice, strand: Strand = Strand.UNSTRANDED, length: int = None) -> Interval:
        """Create from a Python slice object."""
        start, stop, step = item.start, item.stop, item.step
        if start is None: start = 0
        if stop is None and length is not None: stop = length
        if stop is None: raise ValueError("Cannot create Interval from slice with None stop without 'length'")
        if step == -1: return cls(stop + 1, start + 1, strand)
        return cls(start, stop, strand)

    @classmethod
    def from_location(cls, item: SimpleLocation) -> Self:
        """Create from a Biopython SimpleLocation."""
        return cls(item.start, item.end, item.strand)

    @classmethod
    def from_gene(cls, item: Gene) -> Self:
        """Create from a PyFGS Gene object."""
        return cls(item.start, item.end, Strand(item.strand))

    @classmethod
    def from_item(cls, item: IntervalLike, strand: Strand = Strand.UNSTRANDED, length: int = None) -> Self:
        """Universal coercion from various objects to an Interval."""
        if isinstance(item, cls): return item
        if interval := getattr(item, 'interval', None): return interval
        if isinstance(item, Match): return cls.from_match(item, strand)
        if isinstance(item, int): return cls.from_int(item, strand, length)
        if isinstance(item, slice): return cls.from_slice(item, strand, length)
        if isinstance(item, SimpleLocation): return cls.from_location(item)
        if isinstance(item, Gene): return cls.from_gene(item)
        raise TypeError(f"Cannot coerce {type(item)} to Interval")

__add__

__add__(other: IntervalLike) -> Interval

Returns the minimal bounding interval covering both self and other.

Source code in src/eris/interval.py
def __add__(self, other: IntervalLike) -> Interval:
    """Returns the minimal bounding interval covering both self and other."""
    other = Interval.from_item(other)
    new_strand = self.strand if self.strand == other.strand else 0
    return Interval(min(self.start, other.start), max(self.end, other.end), new_strand)

__contains__

__contains__(item: IntervalLike) -> bool

Check if an coordinate or another interval is fully contained within this one.

Source code in src/eris/interval.py
def __contains__(self, item: IntervalLike) -> bool:
    """Check if an coordinate or another interval is fully contained within this one."""
    if isinstance(item, int): return self.start <= item < self.end
    item = Interval.from_item(item)
    return self.start <= item.start and self.end >= item.end

from_gene classmethod

from_gene(item: Gene) -> Self

Create from a PyFGS Gene object.

Source code in src/eris/interval.py
@classmethod
def from_gene(cls, item: Gene) -> Self:
    """Create from a PyFGS Gene object."""
    return cls(item.start, item.end, Strand(item.strand))

from_int classmethod

from_int(item: int, strand: Strand = Strand.UNSTRANDED, length: int = None) -> Interval

Create a 1bp interval from an integer.

Source code in src/eris/interval.py
@classmethod
def from_int(cls, item: int, strand: Strand = Strand.UNSTRANDED, length: int = None) -> Interval:
    """Create a 1bp interval from an integer."""
    if item < 0 and length is not None: item += length
    return cls(item, item + 1, strand)

from_item classmethod

from_item(item: IntervalLike, strand: Strand = Strand.UNSTRANDED, length: int = None) -> Self

Universal coercion from various objects to an Interval.

Source code in src/eris/interval.py
@classmethod
def from_item(cls, item: IntervalLike, strand: Strand = Strand.UNSTRANDED, length: int = None) -> Self:
    """Universal coercion from various objects to an Interval."""
    if isinstance(item, cls): return item
    if interval := getattr(item, 'interval', None): return interval
    if isinstance(item, Match): return cls.from_match(item, strand)
    if isinstance(item, int): return cls.from_int(item, strand, length)
    if isinstance(item, slice): return cls.from_slice(item, strand, length)
    if isinstance(item, SimpleLocation): return cls.from_location(item)
    if isinstance(item, Gene): return cls.from_gene(item)
    raise TypeError(f"Cannot coerce {type(item)} to Interval")

from_location classmethod

from_location(item: SimpleLocation) -> Self

Create from a Biopython SimpleLocation.

Source code in src/eris/interval.py
@classmethod
def from_location(cls, item: SimpleLocation) -> Self:
    """Create from a Biopython SimpleLocation."""
    return cls(item.start, item.end, item.strand)

from_match classmethod

from_match(item: Match, strand: Strand = Strand.UNSTRANDED) -> Interval

Create from a regex Match object.

Source code in src/eris/interval.py
@classmethod
def from_match(cls, item: Match, strand: Strand = Strand.UNSTRANDED) -> Interval:
    """Create from a regex Match object."""
    return cls(item.start(), item.end(), strand)

from_slice classmethod

from_slice(item: slice, strand: Strand = Strand.UNSTRANDED, length: int = None) -> Interval

Create from a Python slice object.

Source code in src/eris/interval.py
@classmethod
def from_slice(cls, item: slice, strand: Strand = Strand.UNSTRANDED, length: int = None) -> Interval:
    """Create from a Python slice object."""
    start, stop, step = item.start, item.stop, item.step
    if start is None: start = 0
    if stop is None and length is not None: stop = length
    if stop is None: raise ValueError("Cannot create Interval from slice with None stop without 'length'")
    if step == -1: return cls(stop + 1, start + 1, strand)
    return cls(start, stop, strand)

relate

relate(other: IntervalLike) -> Context

Calculate the spatial relationship between this interval and another.

Returns a Context enum value (e.g. UPSTREAM, INSIDE, OVERLAPPING).

Source code in src/eris/interval.py
def relate(self, other: IntervalLike) -> Context:
    """
    Calculate the spatial relationship between this interval and another.

    Returns a Context enum value (e.g. UPSTREAM, INSIDE, OVERLAPPING).
    """
    other = Interval.from_item(other)
    return Context(_core_relate(self.start, self.end, self.strand, other.start, other.end))

reverse_complement

reverse_complement(length: int = None) -> Interval

Returns the interval coordinates on the opposite strand.

Parameters:

Name Type Description Default
length int

The length of the parent sequence (e.g. contig).

None
Source code in src/eris/interval.py
def reverse_complement(self, length: int = None) -> Interval:
    """
    Returns the interval coordinates on the opposite strand.

    Args:
        length: The length of the parent sequence (e.g. contig).
    """
    if length is None: length = self.end
    return Interval(length - self.end, length - self.start, self.strand * -1)

shift

shift(x: int, y: int = None) -> Interval

Shift the interval by a fixed distance.

Parameters:

Name Type Description Default
x int

Distance to shift the start.

required
y int

Optional distance to shift the end. If None, same as x.

None
Source code in src/eris/interval.py
def shift(self, x: int, y: int = None) -> Interval:
    """
    Shift the interval by a fixed distance.

    Args:
        x: Distance to shift the start.
        y: Optional distance to shift the end. If None, same as x.
    """
    return Interval(self.start + x, self.end + (y if y is not None else x), self.strand)

IntervalBatch dataclass

High-performance batch of genomic intervals, powered by NumPy.

Uses strict Structure-of-Arrays (SoA) layout with automatic dtype enforcement. Optimized for spatial queries (overlap, nearest-neighbor) and vectorized coordinate transformations.

Example

batch = IntervalBatch.from_intervals([Interval(0, 10), Interval(20, 30)]) hits = batch.query(5, 25) # Returns indices of overlapping intervals merged = batch.merge(tolerance=5)

Source code in src/eris/interval.py
@dataclass(frozen=True, slots=True)
class IntervalBatch:
    """
    High-performance batch of genomic intervals, powered by NumPy.

    Uses strict Structure-of-Arrays (SoA) layout with automatic dtype enforcement.
    Optimized for spatial queries (overlap, nearest-neighbor) and vectorized
    coordinate transformations.

    Example:
        >>> batch = IntervalBatch.from_intervals([Interval(0, 10), Interval(20, 30)])
        >>> hits = batch.query(5, 25)  # Returns indices of overlapping intervals
        >>> merged = batch.merge(tolerance=5)
    """
    starts: np.ndarray
    ends: np.ndarray
    strands: np.ndarray
    original_indices: Optional[np.ndarray] = None

    def __post_init__(self):
        if self.original_indices is None:
            object.__setattr__(self, 'original_indices', np.arange(len(self.starts), dtype=np.int32))

    @classmethod
    def empty(cls) -> Self:
        """Create an empty IntervalBatch."""
        return cls(
            np.empty(0, dtype=np.int32),
            np.empty(0, dtype=np.int32),
            np.empty(0, dtype=np.int8),
            np.empty(0, dtype=np.int32)
        )

    @classmethod
    def from_intervals(cls, intervals: Iterable[Interval]) -> Self:
        """Create a batch from an iterable of Interval objects."""
        # OPTIMIZATION: Fast C-level list comprehension + zip extraction
        data = [(i.start, i.end, i.strand) for i in intervals]
        if not data: return cls.empty()

        s, e, st = zip(*data)
        return cls(
            np.array(s, dtype=np.int32),
            np.array(e, dtype=np.int32),
            np.array(st, dtype=np.int8)
        )

    def max_len(self) -> int:
        """Returns the length of the longest interval in the batch."""
        return np.max(self.ends - self.starts) if len(self) > 0 else 0

    def __len__(self) -> int:
        return len(self.starts)

    def __getitem__(self, item):
        """
        Access intervals by index.

        If index is an integer, returns a single Interval object.
        If index is a slice or mask, returns a new IntervalBatch.
        """
        if isinstance(item, (int, np.integer)):
            if item < 0: item += len(self)
            if item < 0 or item >= len(self): raise IndexError("Batch index out of range")
            return Interval(self.starts[item], self.ends[item], self.strands[item])

        return IntervalBatch(
            self.starts[item],
            self.ends[item],
            self.strands[item],
            self.original_indices[item]
        )

    def copy(self):
        """Create a deep copy of the batch arrays."""
        return IntervalBatch(
            self.starts.copy(),
            self.ends.copy(),
            self.strands.copy(),
            self.original_indices.copy()
        )

    @classmethod
    def concat(cls, batches: Iterable[Self]) -> Self:
        """Concatenate multiple IntervalBatch objects."""
        batches = list(batches)
        if not batches: raise ValueError("Cannot concatenate empty list of batches")
        return cls(
            np.concatenate([b.starts for b in batches]),
            np.concatenate([b.ends for b in batches]),
            np.concatenate([b.strands for b in batches]),
            np.concatenate([b.original_indices for b in batches])
        )

    def sort(self) -> IntervalBatch:
        """Returns a new cleanly sorted IntervalBatch (by start then end)."""
        if len(self) < 2 or _is_sorted_kernel(self.starts, self.ends):
            return self

        order = np.lexsort((self.ends, self.starts))
        return IntervalBatch(
            self.starts[order],
            self.ends[order],
            self.strands[order],
            self.original_indices[order]
        )

    def filter(self, mask: IntervalLike) -> IntervalBatch:
        """Return a new batch containing only intervals matched by the mask."""
        if isinstance(mask, (slice, int, np.integer)):
            if isinstance(mask, (int, np.integer)): mask = [mask]
            return self[mask]
        return self[np.asarray(mask)]

    @property
    def centers(self) -> np.ndarray:
        """Vectorized calculation of interval midpoints."""
        return (self.starts + self.ends) / 2

    @property
    def lengths(self) -> np.ndarray:
        """Vectorized calculation of interval lengths."""
        return self.ends - self.starts

    def query(self, start: int, end: int) -> np.ndarray:
        """
        Find indices of intervals that overlap with the given range.

        Args:
            start: Query range start.
            end: Query range end.

        Returns:
            np.ndarray: Indices of overlapping intervals.
        """
        if len(self) == 0: return np.empty(0, dtype=np.int32)
        return _query_kernel(self.starts, self.ends, start, end, self.max_len())

    def merge(self, tolerance: int = 0) -> IntervalBatch:
        """
        Merge overlapping or adjacent intervals into single bounding boxes.

        Args:
            tolerance: Max gap between intervals to consider them adjacent.
        """
        if len(self) == 0:
            return self
        out = _merge_kernel(self.starts, self.ends, self.strands, tolerance)
        return type(self)(out[0], out[1], out[2])

    def project(self, shift: int, flip_length: Optional[int] = None) -> IntervalBatch:
        """
        Apply a coordinate transformation to all intervals in the batch.

        Args:
            shift: Distance to add to all coordinates.
            flip_length: If provided, coordinates are mirrored within this length
                (e.g. for projecting onto the reverse strand of a contig).
        """
        if flip_length is not None:
            new_starts = flip_length - self.ends
            new_ends = flip_length - self.starts
            new_strands = self.strands * -1
        else:
            new_starts = self.starts.copy()
            new_ends = self.ends.copy()
            new_strands = self.strands.copy()

        new_starts += shift
        new_ends += shift

        return IntervalBatch(
            starts=new_starts,
            ends=new_ends,
            strands=new_strands,
            original_indices=self.original_indices.copy()
        )

centers property

centers: ndarray

Vectorized calculation of interval midpoints.

lengths property

lengths: ndarray

Vectorized calculation of interval lengths.

__getitem__

__getitem__(item)

Access intervals by index.

If index is an integer, returns a single Interval object. If index is a slice or mask, returns a new IntervalBatch.

Source code in src/eris/interval.py
def __getitem__(self, item):
    """
    Access intervals by index.

    If index is an integer, returns a single Interval object.
    If index is a slice or mask, returns a new IntervalBatch.
    """
    if isinstance(item, (int, np.integer)):
        if item < 0: item += len(self)
        if item < 0 or item >= len(self): raise IndexError("Batch index out of range")
        return Interval(self.starts[item], self.ends[item], self.strands[item])

    return IntervalBatch(
        self.starts[item],
        self.ends[item],
        self.strands[item],
        self.original_indices[item]
    )

concat classmethod

concat(batches: Iterable[Self]) -> Self

Concatenate multiple IntervalBatch objects.

Source code in src/eris/interval.py
@classmethod
def concat(cls, batches: Iterable[Self]) -> Self:
    """Concatenate multiple IntervalBatch objects."""
    batches = list(batches)
    if not batches: raise ValueError("Cannot concatenate empty list of batches")
    return cls(
        np.concatenate([b.starts for b in batches]),
        np.concatenate([b.ends for b in batches]),
        np.concatenate([b.strands for b in batches]),
        np.concatenate([b.original_indices for b in batches])
    )

copy

copy()

Create a deep copy of the batch arrays.

Source code in src/eris/interval.py
def copy(self):
    """Create a deep copy of the batch arrays."""
    return IntervalBatch(
        self.starts.copy(),
        self.ends.copy(),
        self.strands.copy(),
        self.original_indices.copy()
    )

empty classmethod

empty() -> Self

Create an empty IntervalBatch.

Source code in src/eris/interval.py
@classmethod
def empty(cls) -> Self:
    """Create an empty IntervalBatch."""
    return cls(
        np.empty(0, dtype=np.int32),
        np.empty(0, dtype=np.int32),
        np.empty(0, dtype=np.int8),
        np.empty(0, dtype=np.int32)
    )

filter

filter(mask: IntervalLike) -> IntervalBatch

Return a new batch containing only intervals matched by the mask.

Source code in src/eris/interval.py
def filter(self, mask: IntervalLike) -> IntervalBatch:
    """Return a new batch containing only intervals matched by the mask."""
    if isinstance(mask, (slice, int, np.integer)):
        if isinstance(mask, (int, np.integer)): mask = [mask]
        return self[mask]
    return self[np.asarray(mask)]

from_intervals classmethod

from_intervals(intervals: Iterable[Interval]) -> Self

Create a batch from an iterable of Interval objects.

Source code in src/eris/interval.py
@classmethod
def from_intervals(cls, intervals: Iterable[Interval]) -> Self:
    """Create a batch from an iterable of Interval objects."""
    # OPTIMIZATION: Fast C-level list comprehension + zip extraction
    data = [(i.start, i.end, i.strand) for i in intervals]
    if not data: return cls.empty()

    s, e, st = zip(*data)
    return cls(
        np.array(s, dtype=np.int32),
        np.array(e, dtype=np.int32),
        np.array(st, dtype=np.int8)
    )

max_len

max_len() -> int

Returns the length of the longest interval in the batch.

Source code in src/eris/interval.py
def max_len(self) -> int:
    """Returns the length of the longest interval in the batch."""
    return np.max(self.ends - self.starts) if len(self) > 0 else 0

merge

merge(tolerance: int = 0) -> IntervalBatch

Merge overlapping or adjacent intervals into single bounding boxes.

Parameters:

Name Type Description Default
tolerance int

Max gap between intervals to consider them adjacent.

0
Source code in src/eris/interval.py
def merge(self, tolerance: int = 0) -> IntervalBatch:
    """
    Merge overlapping or adjacent intervals into single bounding boxes.

    Args:
        tolerance: Max gap between intervals to consider them adjacent.
    """
    if len(self) == 0:
        return self
    out = _merge_kernel(self.starts, self.ends, self.strands, tolerance)
    return type(self)(out[0], out[1], out[2])

project

project(shift: int, flip_length: Optional[int] = None) -> IntervalBatch

Apply a coordinate transformation to all intervals in the batch.

Parameters:

Name Type Description Default
shift int

Distance to add to all coordinates.

required
flip_length Optional[int]

If provided, coordinates are mirrored within this length (e.g. for projecting onto the reverse strand of a contig).

None
Source code in src/eris/interval.py
def project(self, shift: int, flip_length: Optional[int] = None) -> IntervalBatch:
    """
    Apply a coordinate transformation to all intervals in the batch.

    Args:
        shift: Distance to add to all coordinates.
        flip_length: If provided, coordinates are mirrored within this length
            (e.g. for projecting onto the reverse strand of a contig).
    """
    if flip_length is not None:
        new_starts = flip_length - self.ends
        new_ends = flip_length - self.starts
        new_strands = self.strands * -1
    else:
        new_starts = self.starts.copy()
        new_ends = self.ends.copy()
        new_strands = self.strands.copy()

    new_starts += shift
    new_ends += shift

    return IntervalBatch(
        starts=new_starts,
        ends=new_ends,
        strands=new_strands,
        original_indices=self.original_indices.copy()
    )

query

query(start: int, end: int) -> np.ndarray

Find indices of intervals that overlap with the given range.

Parameters:

Name Type Description Default
start int

Query range start.

required
end int

Query range end.

required

Returns:

Type Description
ndarray

np.ndarray: Indices of overlapping intervals.

Source code in src/eris/interval.py
def query(self, start: int, end: int) -> np.ndarray:
    """
    Find indices of intervals that overlap with the given range.

    Args:
        start: Query range start.
        end: Query range end.

    Returns:
        np.ndarray: Indices of overlapping intervals.
    """
    if len(self) == 0: return np.empty(0, dtype=np.int32)
    return _query_kernel(self.starts, self.ends, start, end, self.max_len())

sort

sort() -> IntervalBatch

Returns a new cleanly sorted IntervalBatch (by start then end).

Source code in src/eris/interval.py
def sort(self) -> IntervalBatch:
    """Returns a new cleanly sorted IntervalBatch (by start then end)."""
    if len(self) < 2 or _is_sorted_kernel(self.starts, self.ends):
        return self

    order = np.lexsort((self.ends, self.starts))
    return IntervalBatch(
        self.starts[order],
        self.ends[order],
        self.strands[order],
        self.original_indices[order]
    )

eris.constants

Constants shared across modules

Context

Bases: IntEnum

Spatial relationship between two genomic intervals.

Used to describe the position of a 'passenger' or 'flank' gene relative to a primary target (e.g. a mobile element insertion site).

Source code in src/eris/constants.py
class Context(IntEnum):
    """
    Spatial relationship between two genomic intervals.

    Used to describe the position of a 'passenger' or 'flank' gene relative
    to a primary target (e.g. a mobile element insertion site).
    """
    UPSTREAM = auto()
    DOWNSTREAM = auto()
    INSIDE = auto()
    OVERLAPPING = auto()
    OVERLAPPING_START = auto()
    OVERLAPPING_END = auto()

Effect

Bases: Flag

Biological impact of a structural variant on a genomic feature.

Can be combined using bitwise OR (|) to represent multiple concurrent effects.

Source code in src/eris/constants.py
class Effect(Flag):
    """
    Biological impact of a structural variant on a genomic feature.

    Can be combined using bitwise OR (|) to represent multiple concurrent effects.
    """
    NONE = auto()
    UPREGULATED = auto()
    TRUNCATED = auto()
    DISRUPTED = auto()

FeatureType

Bases: StrEnum

Genomic feature types commonly encountered in genome annotation.

Source code in src/eris/constants.py
class FeatureType(StrEnum):
    """Genomic feature types commonly encountered in genome annotation."""
    CDS = "CDS"
    MOBILE_ELEMENT = "mobile_element"
    REGULATORY = "regulatory"
    REPEAT_REGION = "repeat_region"

Orientation

Bases: StrEnum

The relative strand orientation between two genomic features.

Source code in src/eris/constants.py
class Orientation(StrEnum):
    """The relative strand orientation between two genomic features."""
    SAME = "same strand"
    OPPOSITE = "opposite strand"
    NONE = "-"

Strand

Bases: IntEnum

Integer representation of genomic strand orientation.

Supports conversion from common string formats (+, -, 1, -1).

Source code in src/eris/constants.py
class Strand(IntEnum):
    """
    Integer representation of genomic strand orientation.

    Supports conversion from common string formats (+, -, 1, -1).
    """
    FORWARD = 1
    REVERSE = -1
    UNSTRANDED = 0

    @classmethod
    def _missing_(cls, value):
        if isinstance(value, bytes):
            value = value.decode('ascii')
        if isinstance(value, str):
            if value == '+' or value == '1' or value == '+1':
                return Strand.FORWARD
            elif value == '-' or value == '-1':
                return Strand.REVERSE
        return Strand.UNSTRANDED

    def __str__(self):
        if self == Strand.FORWARD: return '+'
        if self == Strand.REVERSE: return '-'
        return '.'