Skip to content

Commit

Permalink
Fixing a few stats things
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 18, 2024
1 parent e499413 commit f287f24
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 20 deletions.
54 changes: 35 additions & 19 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from functools import partial
from dataclasses import dataclass
from typing import Optional, Tuple, List, Dict, Set
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed

from pdelfin.s3_queue import S3WorkQueue, WorkItem
from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, get_s3_bytes_with_backoff, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory
Expand Down Expand Up @@ -567,10 +567,7 @@ def submit_beaker_job(args):
print(f"Experiment URL: https://beaker.org/ex/{experiment_data.id}")


def print_stats(args):
import concurrent.futures
from tqdm import tqdm

def print_stats(args):
# Get total work items and completed items
index_file_s3_path = os.path.join(args.workspace, "work_index_list.csv.zstd")
output_glob = os.path.join(args.workspace, "results", "*.jsonl")
Expand All @@ -592,39 +589,61 @@ def process_output_file(s3_path):
doc_count = 0
total_input_tokens = 0
total_output_tokens = 0
total_pages = 0
processed_paths = set()

for line in data.decode('utf-8').splitlines():
if line.strip():
doc = json.loads(line)
doc_count += 1
total_input_tokens += doc["metadata"]["total-input-tokens"]
total_output_tokens += doc["metadata"]["total-output-tokens"]
total_pages += doc["metadata"]["pdf-total-pages"]
processed_paths.add(doc["metadata"]["Source-File"])

return doc_count, total_input_tokens, total_output_tokens
return doc_count, total_input_tokens, total_output_tokens, total_pages, processed_paths
except Exception as e:
logger.warning(f"Error processing {s3_path}: {e}")
return 0, 0, 0
return 0, 0, 0, 0, set()

print("\nProcessing output files...")
docs_total = 0
input_tokens_total = 0
output_tokens_total = 0
pages_total = 0
all_processed_paths = set()
original_paths = set()

# First collect all original PDF paths
for line in work_queue_lines:
if line.strip():
paths = line.strip().split(',')
original_paths.update(paths[1:])

with concurrent.futures.ThreadPoolExecutor() as executor:
with ThreadPoolExecutor() as executor:
futures = {executor.submit(process_output_file, item): item for item in done_work_items}

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
doc_count, input_tokens, output_tokens = future.result()
for future in tqdm(as_completed(futures), total=len(futures)):
doc_count, input_tokens, output_tokens, pages, processed_paths = future.result()
docs_total += doc_count
input_tokens_total += input_tokens
output_tokens_total += output_tokens
pages_total += pages
all_processed_paths.update(processed_paths)

skipped_paths = original_paths - all_processed_paths

print(f"\nResults:")
print(f"Total documents processed: {docs_total:,}")
print(f"Total input tokens: {input_tokens_total:,}")
print(f"Total output tokens: {output_tokens_total:,}")
print(f"Average input tokens per doc: {input_tokens_total/max(1,docs_total):,.1f}")
print(f"Total documents skipped: {len(skipped_paths):,}")
print(f"Total pages processed: {pages_total:,}")

print(f"\nTotal output tokens: {output_tokens_total:,}")

print(f"\nAverage pages per doc: {pages_total/max(1,docs_total):,.1f}")
print(f"Average output tokens per doc: {output_tokens_total/max(1,docs_total):,.1f}")
print(f"Average output tokens per page: {output_tokens_total/max(1,pages_total):,.1f}")


async def main():
parser = argparse.ArgumentParser(description='Manager for running millions of PDFs through a batch inference pipeline')
Expand All @@ -649,7 +668,7 @@ async def main():
# Beaker/job running stuff
parser.add_argument('--beaker', action='store_true', help='Submit this job to beaker instead of running locally')
parser.add_argument('--beaker_workspace', help='Beaker workspace to submit to', default='ai2/pdelfin')
parser.add_argument('--beaker_cluster', help='Beaker clusters you want to run on', default=["ai2/jupiter-cirrascale-2", "ai2/pluto-cirrascale", "ai2/saturn-cirrascale", "ai2/augusta-google-1"])
parser.add_argument('--beaker_cluster', help='Beaker clusters you want to run on', default=["ai2/jupiter-cirrascale-2", "ai2/pluto-cirrascale", "ai2/neptune-cirrascale", "ai2/saturn-cirrascale", "ai2/augusta-google-1"])
parser.add_argument('--beaker_gpus', type=int, default=1, help="Number of gpu replicas to run")
parser.add_argument('--beaker_priority', type=str, default="normal", help="Beaker priority level for the job")
args = parser.parse_args()
Expand Down Expand Up @@ -771,12 +790,9 @@ async def main():
asyncio.run(main())

# TODO
# X Refactor the work queue into its own file so it's reusable and generic, and it makes temporary work files (prevent issue where if a work item is done, then it stalls because queue was just emptied)
# X Fix the queue release mechanism so that it just does a timeout, based on zero queue size only, so you don't block things
# - Add logging of failed pages and have the stats function read them
# X Add the page rotation check and mechanism
# - Fallback to different method if < 2% of pages are failed, make that configurable
# - Sglang commit a fix for the context length issue
# - pypdf fix for the 'v' error
# - Get a solid benchmark on the stream vs non stream approach
# X sglang error on s3://ai2-s2-pdfs/73ee/35e7ed5c2fb113ceba652284aaa51db7c2fc.pdf-2
# X Client error on attempt 0 for s3://ai2-s2-pdfs/e13c/9e03ce463ba53bfb15b26dbfd55c0bbc5568.pdf-1: 400, message='Bad Request',
# - Fix loading of the model checkpoints, it's so flakey now, maybe use datasets
2 changes: 1 addition & 1 deletion pdelfin/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
_MINOR = "1"
# On main and in a nightly release the patch should be one ahead of the last
# released build.
_PATCH = "25"
_PATCH = "26"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = ""
Expand Down

0 comments on commit f287f24

Please sign in to comment.