Skip to content

cli

Module for managing the CLI layer on top of the API; also contains the CLI entry point under main().

ProgressBar

A CLI progress bar that wraps an iterable and calls a function for each item.

This class acts as an iterator, yielding results from the callable while printing a dynamic progress bar to stderr.

Parameters:

Name Type Description Default
iterable Iterable

An iterable of items to process.

required
callable Callable

A function to call for each item from the iterable.

required
desc str

A description to display before the progress bar.

'Processing items'
bar_length int

The character length of the progress bar.

40
bar_character str

The character used to fill the progress bar.

'█'
silence bool

Silence the bar

False
Source code in eris/cli.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
class ProgressBar:
    """
    A CLI progress bar that wraps an iterable and calls a function for each item.

    This class acts as an iterator, yielding results from the callable while
    printing a dynamic progress bar to stderr.

    Args:
        iterable (Iterable): An iterable of items to process.
        callable (Callable): A function to call for each item from the iterable.
        desc (str): A description to display before the progress bar.
        bar_length (int): The character length of the progress bar.
        bar_character (str): The character used to fill the progress bar.
        silence (bool): Silence the bar
    """
    def __init__(self, iterable: Iterable, callable: Callable[[Any], Any], desc: str = "Processing items",
                 bar_length: int = 40, bar_character: str = '█', silence: bool = False):
        # Eagerly consume the iterable to get a total count for the progress bar.
        assert len(bar_character) == 1, "Bar character must be a single character"
        self.items = list(iterable)
        self.total = len(self.items)
        self.callable = callable
        self.desc = desc
        self.bar_length = bar_length
        self.bar_character = bar_character
        self.silence = silence
        self._iterator: Iterable = None
        self.start_time: float = None
        self._processed_count: int = 0

    def _format_time(self, seconds: float) -> str:
        """Formats seconds into a HH:MM:SS string."""
        if not isinstance(seconds, (int, float)) or seconds < 0:
            return "00:00:00"
        m, s = divmod(seconds, 60)
        h, m = divmod(m, 60)
        return f"{int(h):02d}:{int(m):02d}:{int(s):02d}"

    def __iter__(self):
        self._iterator = iter(self.items)
        self.start_time = time.time()
        self._processed_count = 0
        if not self.silence:
            self._update_progress()  # Display the initial (0%) bar
        return self

    def __next__(self):
        # The for-loop protocol will handle the StopIteration from next()
        item = next(self._iterator)

        # The core of the wrapper: call the provided callable for one item.
        result = self.callable(item)
        self._processed_count += 1
        if not self.silence:
            self._update_progress()
        return result

    def _update_progress(self):
        """Calculates and prints the progress bar to stderr."""
        if self.total == 0:
            return

        percent_complete = self._processed_count / self.total
        filled_length = int(self.bar_length * percent_complete)
        bar = self.bar_character * filled_length + '-' * (self.bar_length - filled_length)

        elapsed_time = time.time() - self.start_time

        # Calculate Estimated Time of Arrival (ETA)
        if self._processed_count > 0:
            avg_time_per_item = elapsed_time / self._processed_count
            remaining_items = self.total - self._processed_count
            eta = avg_time_per_item * remaining_items
        else:
            eta = float('inf')

        # Format time strings
        elapsed_str = self._format_time(elapsed_time)
        eta_str = self._format_time(eta) if eta != float('inf') else '??:??:??'

        # Use carriage return '\r' to stay on the same line
        progress_line = (f'\r{self.desc}: {int(percent_complete * 100):>3}%|{bar}| '
                         f'{self._processed_count}/{self.total} '
                         f'[{elapsed_str}<{eta_str}]')

        stderr.write(progress_line)

        # When the loop is finished, print a newline to move to the next line
        if self._processed_count == self.total:
            stderr.write('\n')

        stderr.flush()

    def __len__(self):
        return self.total

ResultWriter

A class to handle the writing of results to multiple types of files; to be used with the CLI.

Source code in eris/cli.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class ResultWriter:
    """
    A class to handle the writing of results to multiple types of files; to be used with the CLI.
    """
    def __init__(self, *outputs: tuple[str, Union[str, Path, IO]], suffix: str = _SUFFIX,
                 no_tsv_header: bool = False, tsv_header: str = '#'):
        """
        :param suffix: Suffix to append to output filenames
        :param no_tsv_header: Suppress header in TSV output (default: False)
        """
        self._files = {}
        self._handles = {}
        self.suffix: str = suffix
        self.tsv_header: str = tsv_header
        self._header_written: bool = no_tsv_header
        for format, argument in outputs:
            if argument is not None:
                if argument in {'-', 'stdout'}:
                    self._handles[format] = stdout
                    # self._handle_locks[format] = Lock()
                elif isinstance(argument, str):
                    self._files[format] = Path(argument)
                elif isinstance(argument, Path):
                    self._files[format] = argument
                elif isinstance(argument, IOBase):
                    self._handles[format] = argument
                    # self._handle_locks[format] = Lock()
                else:
                    raise TypeError(f"{format=} {argument=} must be a string, Path or IO, not {type(argument)}")

        if not self._files and not self._handles:
            raise ValueError("No outputs specified")

    def __len__(self):
        return len(self._files) + len(self._handles)

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()
        # Wait for all writes to finish before exiting context
        # self.pool.shutdown(wait=True)

    def __del__(self):
        self.close()
        # Avoid __del__ for resource management; __exit__ is preferred. If it is called, try to clean up.
        # self.pool.shutdown(wait=False)

    def close(self):
        """Close the handles that aren't stdout or stderr"""
        for handle in self._handles.values():
            if handle.name not in {'<stdout>', '<stderr>'}:  # These handles cannot be opened or closed
                handle.close()

    def _write(self, fmt: str, out: Union[str, Path, IO], result, open_mode: str) -> int:
        if fmt in self._handles:
            # Acquire lock for the specific handle to ensure thread-safe writing
            # with self._handle_locks[out]:
            #     # Logic for non-repeating TSV header
            #     if fmt == 'tsv' and not self._header_written:
            #         out.write(self.tsv_header)
            #         self._header_written = True
            #     out.write(format(result, fmt))
            # Logic for non-repeating TSV header
            if fmt == 'tsv' and not self._header_written:
                out.write(self.tsv_header)
                self._header_written = True
            return out.write(format(result, fmt))
        else:  # This logic is for self.files
            with open(out / f'{result.genome_id}{self.suffix}.{fmt}', mode=open_mode) as handle:
                return handle.write(format(result, fmt))

    def write(self, result: Union['Result', None], open_mode: str = 'wt'):
        """
        Write the typing result to files or file handles
        :param result: A Result instance or None
        :param open_mode: The mode to open the files in
        """
        if result is None:
            return None
        # Use the pool to write the result, iterating over the handles and files dictionaries
        # for fmt, out in chain(self._handles.items(), self._files.items()):
        #     self.pool.submit(self._write, result, fmt, out, open_mode)
        for fmt, out in chain(self._handles.items(), self._files.items()):
            self._write(fmt, out, result, open_mode)

__init__(*outputs, suffix=_SUFFIX, no_tsv_header=False, tsv_header='#')

:param suffix: Suffix to append to output filenames :param no_tsv_header: Suppress header in TSV output (default: False)

Source code in eris/cli.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def __init__(self, *outputs: tuple[str, Union[str, Path, IO]], suffix: str = _SUFFIX,
             no_tsv_header: bool = False, tsv_header: str = '#'):
    """
    :param suffix: Suffix to append to output filenames
    :param no_tsv_header: Suppress header in TSV output (default: False)
    """
    self._files = {}
    self._handles = {}
    self.suffix: str = suffix
    self.tsv_header: str = tsv_header
    self._header_written: bool = no_tsv_header
    for format, argument in outputs:
        if argument is not None:
            if argument in {'-', 'stdout'}:
                self._handles[format] = stdout
                # self._handle_locks[format] = Lock()
            elif isinstance(argument, str):
                self._files[format] = Path(argument)
            elif isinstance(argument, Path):
                self._files[format] = argument
            elif isinstance(argument, IOBase):
                self._handles[format] = argument
                # self._handle_locks[format] = Lock()
            else:
                raise TypeError(f"{format=} {argument=} must be a string, Path or IO, not {type(argument)}")

    if not self._files and not self._handles:
        raise ValueError("No outputs specified")

close()

Close the handles that aren't stdout or stderr

Source code in eris/cli.py
71
72
73
74
75
def close(self):
    """Close the handles that aren't stdout or stderr"""
    for handle in self._handles.values():
        if handle.name not in {'<stdout>', '<stderr>'}:  # These handles cannot be opened or closed
            handle.close()

write(result, open_mode='wt')

Write the typing result to files or file handles :param result: A Result instance or None :param open_mode: The mode to open the files in

Source code in eris/cli.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def write(self, result: Union['Result', None], open_mode: str = 'wt'):
    """
    Write the typing result to files or file handles
    :param result: A Result instance or None
    :param open_mode: The mode to open the files in
    """
    if result is None:
        return None
    # Use the pool to write the result, iterating over the handles and files dictionaries
    # for fmt, out in chain(self._handles.items(), self._files.items()):
    #     self.pool.submit(self._write, result, fmt, out, open_mode)
    for fmt, out in chain(self._handles.items(), self._files.items()):
        self._write(fmt, out, result, open_mode)