Skip to content

Commit

Permalink
Generalize and docstring.
Browse files Browse the repository at this point in the history
  • Loading branch information
preritdas authored Mar 12, 2023
1 parent f618540 commit 63bd85e
Showing 1 changed file with 25 additions and 18 deletions.
43 changes: 25 additions & 18 deletions mypytoolkit/threadtools.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
"""Tools for working with threads."""
from typing import Callable
from typing import Callable, Any, List, Tuple


def list_process(operation: Callable, items: list) -> list:
def list_process(operation: Callable, items: List) -> List:
"""
Uses threading to efficiently generate a list of dashed summaries instead of
generating them sequentially.
Uses threading to efficiently run `operation` on `items` individually and then
return a list of results in the same order as `items`. Best for I/O operations
like file processing, API calls, etc.
The `operation` must be an operation that takes a single string parameter and
returns a single string. This was built to bulk process window summaries and window
title generation.
Args:
operation (Callable): Must be a function that takes a single parameter, being
an individual element in `items`.
items (list): A list of elements that will each be passed to `operation`.
Returns:
list: List of return values from calling `operation` on each item in `items`.
The order is retained, meaning the order of elements in the returned list
is the same order as `items` from parameters.
Raises:
Exception: If any operation results in an exception, the whole process will
raise that Exception just as would happen if all operations were happening
iteratively in the same thread.
"""
result_queue = queue.Queue()
exceptions: list[Exception] = []

@tools.exponential_backoff(
retries=5,
base_delay=1,
exceptions=(openai.OpenAIError,),
exception_string_match="rate limit"
)
exceptions: List[Exception] = []

def store_result(text: str, index: int, result_queue: queue.Queue):
"""Summarizes the text and adds it to the queue."""
"""Runs the operation and stores the result in a queue."""
try:
result = operation(text)
except Exception as e:
Expand All @@ -32,7 +39,7 @@ def store_result(text: str, index: int, result_queue: queue.Queue):

threads = [
threading.Thread(target=store_result, args=(text, index, result_queue))
for index, text in enumerate(texts)
for index, item in enumerate(items)
]

# Start the threads
Expand All @@ -48,6 +55,6 @@ def store_result(text: str, index: int, result_queue: queue.Queue):
raise exceptions[0]

# Parse raw results
raw_results: list[tuple[int, str]] = [result_queue.get() for _ in threads]
raw_results: List[Luple[int, Any]] = [result_queue.get() for _ in threads]
raw_results.sort(key=lambda tup: tup[0]) # sort by index
return [result[1] for result in raw_results] # discard indexes

0 comments on commit 63bd85e

Please sign in to comment.