Skip to content

Commit

Permalink
Putting aside redis
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 7, 2024
1 parent 3d6be3c commit faf8659
Showing 1 changed file with 119 additions and 31 deletions.
150 changes: 119 additions & 31 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
import tempfile
import redis
import redis.exceptions
import random
import boto3
import atexit
Expand All @@ -15,26 +16,86 @@
workspace_s3 = boto3.client('s3')
pdf_s3 = boto3.client('s3')

def populate_queue_if_empty(queue, s3_glob_path):
# Check if the queue is empty, and if so, populate it with work items
LOCK_KEY = "queue_populating"
LOCK_TIMEOUT = 30 # seconds

def populate_queue_if_empty(queue, s3_glob_path, redis_client):
"""
Check if the queue is empty. If it is, attempt to acquire a lock to populate it.
Only one worker should populate the queue at a time.
"""
if queue.llen("work_queue") == 0:
paths = expand_s3_glob(pdf_s3, s3_glob_path)
for path in paths:
queue.rpush("work_queue", path)
print("Queue populated with initial work items.")
# Attempt to acquire the lock
lock_acquired = redis_client.set(LOCK_KEY, "locked", nx=True, ex=LOCK_TIMEOUT)
if lock_acquired:
print("Acquired lock to populate the queue.")
try:
paths = expand_s3_glob(pdf_s3, s3_glob_path)
if not paths:
print("No paths found to populate the queue.")
return
for path in paths:
queue.rpush("work_queue", path)
print("Queue populated with initial work items.")
except Exception as e:
print(f"Error populating queue: {e}")
# Optionally, handle retry logic or alerting here
finally:
# Release the lock
redis_client.delete(LOCK_KEY)
print("Released lock after populating the queue.")
else:
print("Another worker is populating the queue. Waiting for it to complete.")
# Optionally, wait until the queue is populated
wait_for_queue_population(queue)

def wait_for_queue_population(queue, wait_time=5, max_wait=60):
"""
Wait until the queue is populated by another worker.
"""
elapsed = 0
while elapsed < max_wait:
queue_length = queue.llen("work_queue")
if queue_length > 0:
print("Queue has been populated by another worker.")
return
print(f"Waiting for queue to be populated... ({elapsed + wait_time}/{max_wait} seconds)")
time.sleep(wait_time)
elapsed += wait_time
print("Timeout waiting for queue to be populated.")
sys.exit(1)

def process(item):
# Simulate processing time between 1 and 3 seconds
print(f"Processing item: {item}")
time.sleep(random.randint(1, 3))
time.sleep(0.5)
print(f"Completed processing item: {item}")

def get_redis_client(sentinel, master_name, leader_ip, leader_port, max_wait=60):
"""
Obtain a Redis client using Sentinel, with retry logic.
"""
elapsed = 0
wait_interval = 1 # seconds
while elapsed < max_wait:
try:
r = sentinel.master_for(master_name, socket_timeout=0.1, decode_responses=True)
r.ping()
print(f"Connected to Redis master at {leader_ip}:{leader_port}")
return r
except redis.exceptions.ConnectionError as e:
print(f"Attempt {elapsed + 1}: Unable to connect to Redis master at {leader_ip}:{leader_port}. Retrying in {wait_interval} second(s)...")
time.sleep(wait_interval)
elapsed += wait_interval
print(f"Failed to connect to Redis master at {leader_ip}:{leader_port} after {max_wait} seconds. Exiting.")
sys.exit(1)

def main():
parser = argparse.ArgumentParser(description='Set up Redis Sentinel-based worker queue.')
parser.add_argument('--leader-ip', help='IP address of the initial leader node')
parser.add_argument('--leader-port', type=int, default=6379, help='Port of the initial leader node')
parser.add_argument('--replica', type=int, required=True, help='Replica number (0 to N-1)')
parser.add_argument('--add-pdfs', required=True, help='S3 glob path for work items')
parser.add_argument('--add-pdfs', help='S3 glob path for work items')

args = parser.parse_args()

Expand All @@ -44,8 +105,7 @@ def main():
base_sentinel_port = 26379

redis_port = base_redis_port + replica_number
# Set sentinel_port to be the same on all nodes
sentinel_port = base_sentinel_port
sentinel_port = base_sentinel_port + replica_number

if replica_number == 0:
leader_ip = args.leader_ip if args.leader_ip else '127.0.0.1'
Expand All @@ -61,6 +121,8 @@ def main():
redis_conf_path = os.path.join(temp_dir, 'redis.conf')
sentinel_conf_path = os.path.join(temp_dir, 'sentinel.conf')

print("Redis config path:", redis_conf_path)

with open(redis_conf_path, 'w') as f:
f.write(f'port {redis_port}\n')
f.write(f'dbfilename dump-{replica_number}.rdb\n')
Expand All @@ -73,7 +135,7 @@ def main():
f.write(f'replicaof {leader_ip} {leader_port}\n')

master_name = 'mymaster'
quorum = 2
quorum = 1

with open(sentinel_conf_path, 'w') as f:
f.write(f'port {sentinel_port}\n')
Expand All @@ -88,16 +150,23 @@ def main():

# Register atexit function to guarantee process termination
def terminate_processes():
print("Terminating child processes...")
redis_process.terminate()
sentinel_process.terminate()
redis_process.wait() # Ensures subprocess is cleaned up
sentinel_process.wait()
try:
redis_process.wait(timeout=5)
sentinel_process.wait(timeout=5)
except subprocess.TimeoutExpired:
print("Forcing termination of child processes.")
redis_process.kill()
sentinel_process.kill()
print("Child processes terminated.")

atexit.register(terminate_processes)

# Also handle signal-based termination
def handle_signal(signum, frame):
print(f"Received signal {signum}. Terminating processes...")
terminate_processes()
sys.exit(0)

Expand All @@ -109,28 +178,47 @@ def handle_signal(signum, frame):
# Use Sentinel to connect to the master
from redis.sentinel import Sentinel
sentinel = Sentinel([('127.0.0.1', sentinel_port)], socket_timeout=0.1)
r = sentinel.master_for(master_name, socket_timeout=0.1, decode_responses=True)

# Populate the work queue if this is the leader (replica 0)
if replica_number == 0:
populate_queue_if_empty(r, args.add_pdfs)
# Initial connection to Redis master
redis_client = get_redis_client(sentinel, master_name, leader_ip, leader_port)

# Populate the work queue if it's empty, using a distributed lock
populate_queue_if_empty(redis_client, args.add_pdfs, redis_client)

try:
while True:
# Try to get an item from the queue with a 1-minute timeout for processing
work_item = r.brpoplpush("work_queue", "processing_queue", 60)
if work_item:
try:
process(work_item)
# Remove from the processing queue if processed successfully
r.lrem("processing_queue", 1, work_item)
except Exception as e:
print(f"Error processing {work_item}: {e}")
# If an error occurs, let it be requeued after timeout

queue_length = r.llen("work_queue")
print(f"Total work items in queue: {queue_length}")
time.sleep(1)
try:
# Try to get an item from the queue with a 1-minute timeout for processing
work_item = redis_client.brpoplpush("work_queue", "processing_queue", 60)
if work_item:
try:
process(work_item)
# Remove from the processing queue if processed successfully
redis_client.lrem("processing_queue", 1, work_item)
except Exception as e:
print(f"Error processing {work_item}: {e}")
# If an error occurs, let it be requeued after timeout

queue_length = redis_client.llen("work_queue")
print(f"Total work items in queue: {queue_length}")

time.sleep(0.1)

except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError) as e:
print("Lost connection to Redis. Attempting to reconnect using Sentinel...")
# Attempt to reconnect using Sentinel
while True:
try:
redis_client = get_redis_client(sentinel, master_name, leader_ip, leader_port)
print("Reconnected to Redis master.")
break # Exit the reconnection loop and resume work
except redis.exceptions.ConnectionError:
print("Reconnection failed. Retrying in 5 seconds...")
time.sleep(5)
except Exception as e:
print(f"Unexpected error: {e}")
handle_signal(None, None)

except KeyboardInterrupt:
handle_signal(None, None)

Expand Down

0 comments on commit faf8659

Please sign in to comment.