Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Base Worker shutdown fix #54

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Base Worker shutdown fix
  • Loading branch information
p1c2u committed Nov 7, 2018
commit 20835bef21c5076a3eaed36a6af4bfe67749656f
2 changes: 1 addition & 1 deletion development.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
coverage==4.4.1
mock==1.0.1
moto==1.3.4
moto==1.3.7
nose==1.3.0
pre-commit==0.7.6
sure==1.2.2
Expand Down
1 change: 1 addition & 0 deletions pyqs/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ def __init__(self, *args, **kwargs):
def shutdown(self):
logger.info("Received shutdown signal, shutting down PID {}!".format(os.getpid()))
self.should_exit.set()
self.join()

def parent_is_alive(self):
if os.getppid() == 1:
Expand Down
113 changes: 58 additions & 55 deletions tests/test_manager_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import boto3
from mock import patch, Mock, MagicMock
from moto import mock_sqs, mock_sqs_deprecated
from nose.tools import timed

from pyqs.main import main, _main
from pyqs.worker import ManagerWorker
Expand Down Expand Up @@ -145,36 +146,6 @@ def test_master_spawns_worker_processes():
manager.stop()


@mock_sqs
@mock_sqs_deprecated
def test_master_replaces_reader_processes():
"""
Test managing process replaces reader children
"""

# Setup SQS Queue
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="tester")

# Setup Manager
manager = ManagerWorker(queue_prefixes=["tester"], worker_concurrency=1, interval=1, batchsize=10)
manager.start()

# Get Reader PID
pid = manager.reader_children[0].pid

# Kill Reader and wait to replace
manager.reader_children[0].shutdown()
time.sleep(0.1)
manager.replace_workers()

# Check Replacement
manager.reader_children[0].pid.shouldnt.equal(pid)

# Cleanup
manager.stop()


@mock_sqs
@mock_sqs_deprecated
def test_master_counts_processes():
Expand Down Expand Up @@ -210,31 +181,63 @@ def test_master_counts_processes():

@mock_sqs
@mock_sqs_deprecated
def test_master_replaces_worker_processes():
"""
Test managing process replaces worker processes
"""
# Setup SQS Queue
conn = boto3.client('sqs', region_name='us-east-1')
conn.create_queue(QueueName="tester")

# Setup Manager
manager = ManagerWorker(queue_prefixes=["tester"], worker_concurrency=1, interval=1, batchsize=10)
manager.start()

# Get Worker PID
pid = manager.worker_children[0].pid

# Kill Worker and wait to replace
manager.worker_children[0].shutdown()
time.sleep(0.1)
manager.replace_workers()

# Check Replacement
manager.worker_children[0].pid.shouldnt.equal(pid)

# Cleanup
manager.stop()
class TestMasterWorker:

def setup(self):
# For debugging test
import sys
logger = logging.getLogger("pyqs")
logger.setLevel(logging.DEBUG)
stdout_handler = logging.StreamHandler(sys.stdout)
logger.addHandler(stdout_handler)

# Setup SQS Queue
self.conn = boto3.client('sqs', region_name='us-east-1')
self.conn.create_queue(QueueName="tester")

# Setup Manager
self.manager = ManagerWorker(
queue_prefixes=["tester"], worker_concurrency=1, interval=1,
batchsize=10,
)
self.manager.start()

def teardown(self):
self.manager.stop()

@timed(60)
def test_master_replaces_reader_processes(self):
"""
Test managing process replaces reader children
"""
# Get Reader PID
pid = self.manager.reader_children[0].pid

# Kill Reader and wait to replace
self.manager.reader_children[0].shutdown()
time.sleep(0.1)
self.manager.reader_children[0].is_alive().should.equal(False)
self.manager.replace_workers()

# Check Replacement
self.manager.reader_children[0].pid.shouldnt.equal(pid)

@timed(60)
def test_master_replaces_worker_processes(self):
"""
Test managing process replaces worker processes
"""
# Get Worker PID
pid = self.manager.worker_children[0].pid

# Kill Worker and wait to replace
self.manager.worker_children[0].shutdown()
time.sleep(0.1)
self.manager.worker_children[0].is_alive().should.equal(False)
self.manager.replace_workers()

# Check Replacement
self.manager.worker_children[0].pid.shouldnt.equal(pid)


@mock_sqs
Expand Down