Skip to content

Commit

Permalink
Can use remote s3 files, and local workspace now
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Jan 28, 2025
1 parent 50f9a6a commit 48447b6
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions olmocr/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import datetime
import tempfile
import random
import shutil
import re
import glob
import torch
Expand All @@ -32,7 +33,7 @@
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from concurrent.futures.process import BrokenProcessPool

from olmocr.work_queue import S3WorkQueue, LocalWorkQueue
from olmocr.work_queue import WorkQueue, S3WorkQueue, LocalWorkQueue
from olmocr.s3_utils import expand_s3_glob, get_s3_bytes, get_s3_bytes_with_backoff, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory
from olmocr.data.renderpdf import render_pdf_to_base64png
from olmocr.filter.filter import PdfFilter, Language
Expand Down Expand Up @@ -404,7 +405,7 @@ def build_dolma_document(pdf_orig_path, page_results):
return dolma_doc


async def worker(args, work_queue: S3WorkQueue, semaphore, worker_id):
async def worker(args, work_queue: WorkQueue, semaphore, worker_id):
while True:
# Wait until allowed to proceed
await semaphore.acquire()
Expand Down Expand Up @@ -447,10 +448,13 @@ async def worker(args, work_queue: S3WorkQueue, semaphore, worker_id):
tf.flush()

# Define the output S3 path using the work_hash
output_s3_path = os.path.join(args.workspace, 'results', f'output_{work_item.hash}.jsonl')
output_final_path = os.path.join(args.workspace, 'results', f'output_{work_item.hash}.jsonl')

bucket, key = parse_s3_path(output_s3_path)
workspace_s3.upload_file(tf.name, bucket, key)
if output_final_path.startswith("s3://"):
bucket, key = parse_s3_path(output_final_path)
workspace_s3.upload_file(tf.name, bucket, key)
else:
shutil.copyfile(tf.name, output_final_path)

# Update finished token counts from successful documents
metrics.add_metrics(finished_input_tokens=sum(doc["metadata"]["total-input-tokens"] for doc in dolma_docs),
Expand Down

0 comments on commit 48447b6

Please sign in to comment.