Skip to content

Commit

Permalink
Doing some more stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 7, 2024
1 parent 923231e commit 57186c7
Showing 1 changed file with 14 additions and 2 deletions.
16 changes: 14 additions & 2 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
import subprocess
import atexit
import hashlib

from tqdm import tqdm

Expand All @@ -25,6 +26,13 @@
pdf_s3 = boto3.client('s3')


def compute_workgroup_sha1(work_group: list[str]) -> str:
sha1 = hashlib.sha1()
# Ensure consistent ordering by sorting the list
for pdf in sorted(work_group):
sha1.update(pdf.encode('utf-8'))
return sha1.hexdigest()

if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Manager for running millions of PDFs through a batch inference pipeline')
parser.add_argument('workspace', help='The S3 path where work will be done e.g., s3://bucket/prefix/')
Expand Down Expand Up @@ -52,6 +60,8 @@
pdf_session = boto3.Session(profile_name=args.pdf_profile)
pdf_s3 = pdf_session.client("s3")

index_file_s3_path = os.path.join(args.workspace, "pdf_index_list.csv.zstd")

# Check list of pdfs and that it matches what's in the workspace
if args.pdfs:
if args.pdfs.startswith("s3://"):
Expand All @@ -67,7 +77,6 @@
all_pdfs = set(all_pdfs)
logger.info(f"Found {len(all_pdfs):,} total pdf paths")

index_file_s3_path = os.path.join(args.workspace, "pdf_index_list.csv.zstd")
existing_lines = download_zstd_csv(workspace_s3, index_file_s3_path)

# Parse existing work items into groups
Expand Down Expand Up @@ -143,9 +152,12 @@ def signal_handler(sig, frame):
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# TODO
# Read in the work queue from s3
work_queue = download_zstd_csv(workspace_s3, index_file_s3_path)
work_queue = {compute_workgroup_sha1(pdfs): pdfs for pdfs in work_queue}

# Read in the done items from the s3 workspace
done_work_items = expand_s3_glob(workspace_s3, f"{args.workspace}/dolma_documents/*.jsonl")

# TODO
# Spawn up to N workers to do:
Expand Down

0 comments on commit 57186c7

Please sign in to comment.