Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -1930,6 +1930,28 @@ def on_up(self, host):
have_future = False
futures = set()
try:
# Guard against stale on_up destroying a healthy pool.
# Case 1: Host was replaced in metadata (different object, same endpoint).
current_host = self.metadata.get_host(host.endpoint)
if current_host is not None and current_host is not host and current_host.is_up:
log.debug("Host %s has been replaced by %s which is already up; "
"skipping stale on_up handling", host, current_host)
with host.lock:
host._currently_handling_node_up = False
return
Comment thread
bitpathfinder marked this conversation as resolved.

# Case 2: All sessions already have a healthy pool for this host.
sessions = tuple(self.sessions)
if sessions and all(
session._pools.get(host) and not session._pools[host].is_shutdown
for session in sessions):
log.debug("Host %s already has healthy pools in all sessions; "
"skipping on_up pool teardown/rebuild", host)
with host.lock:
host.set_up()
host._currently_handling_node_up = False
return

log.info("Host %s may be up; will prepare queries and open connection pool", host)

reconnector = host.get_and_set_reconnection_handler(None)
Expand Down
99 changes: 99 additions & 0 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,3 +719,102 @@ def test_no_warning_adding_lbp_ep_to_cluster_with_contact_points(self):
)

patched_logger.warning.assert_not_called()


class TestOnUpStaleHost(unittest.TestCase):
"""
Tests for on_up() not destroying a healthy pool when called with a stale
host reference after a replace-with-same-IP.
"""

def _make_cluster(self, sessions=None):
"""Create a minimal Cluster object without connecting."""
from threading import Lock
cluster = object.__new__(Cluster)
cluster.is_shutdown = False
cluster.metadata = Mock()
cluster.sessions = sessions or set()
cluster.profile_manager = Mock()
cluster.control_connection = Mock()
cluster._listeners = set()
cluster._listener_lock = Lock()
return cluster

def test_on_up_skips_when_host_replaced_in_metadata(self):
"""
If a NEW_NODE event already replaced the old host with a new one
(same endpoint, different host_id), on_up(old_host) should bail out.
"""
from cassandra.connection import DefaultEndPoint
endpoint = DefaultEndPoint('127.0.0.1')

old_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
old_host.is_up = False
old_host._currently_handling_node_up = False

new_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
new_host.is_up = True

cluster = self._make_cluster()
cluster.metadata.get_host = Mock(return_value=new_host)

cluster.on_up(old_host)

self.assertFalse(old_host._currently_handling_node_up)

def test_on_up_skips_when_all_sessions_have_healthy_pool(self):
"""
If on_add already created a healthy pool in all sessions, a subsequent
on_up should not tear them down and rebuild.
"""
from cassandra.connection import DefaultEndPoint
endpoint = DefaultEndPoint('127.0.0.1')

host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
host.is_up = False
host._currently_handling_node_up = False

mock_pool = Mock()
mock_pool.is_shutdown = False
mock_session = Mock()
mock_session._pools = {host: mock_pool}

cluster = self._make_cluster(sessions={mock_session})
cluster.metadata.get_host = Mock(return_value=host)

cluster.on_up(host)

mock_session.remove_pool.assert_not_called()
self.assertTrue(host.is_up)
self.assertFalse(host._currently_handling_node_up)

def test_on_up_proceeds_when_some_sessions_missing_pool(self):
"""
If only some sessions have a pool, on_up should proceed normally
(not short-circuit) so all sessions get reconciled.
"""
from cassandra.connection import DefaultEndPoint
endpoint = DefaultEndPoint('127.0.0.1')

host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4())
host.is_up = False
host._currently_handling_node_up = False

mock_pool = Mock()
mock_pool.is_shutdown = False
session_with_pool = Mock()
session_with_pool._pools = {host: mock_pool}

session_without_pool = Mock()
session_without_pool._pools = {}
session_without_pool.add_or_renew_pool = Mock(return_value=None)

cluster = self._make_cluster(sessions={session_with_pool, session_without_pool})
cluster.metadata.get_host = Mock(return_value=host)
cluster.profile_manager.distance = Mock(return_value=HostDistance.IGNORED)

cluster.on_up(host)

# Should have proceeded to remove_pool (normal path)
session_with_pool.remove_pool.assert_called_once_with(host)
session_without_pool.remove_pool.assert_called_once_with(host)
Loading