Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5433,7 +5433,20 @@ def _retry(self, reuse_connection, consistency_level, host, delay):
if self._metrics is not None:
self._metrics.on_retry()
if consistency_level is not None:
self.message.consistency_level = consistency_level
# Never downgrade from serial to non-serial consistency, as that
# would break serial read (Paxos) guarantees.
if not ConsistencyLevel.is_serial(consistency_level):
original_cl = self.message.consistency_level
if ConsistencyLevel.is_serial(original_cl):
log.debug(
"Retry policy attempted to downgrade serial consistency %s to %s; "
"keeping original consistency level.",
ConsistencyLevel.value_to_name.get(original_cl, original_cl),
ConsistencyLevel.value_to_name.get(consistency_level, consistency_level))
else:
self.message.consistency_level = consistency_level
else:
self.message.consistency_level = consistency_level
Comment on lines +5438 to +5449
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is too over complicated, can you make it a single if-else please

Copy link
Copy Markdown
Author

@mykaul mykaul May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do. Actually AI had a single if and I thought it be better to split.


# don't retry on the event loop thread
self.session.cluster.scheduler.schedule(delay, self._retry_task, reuse_connection, host)
Expand Down
2 changes: 1 addition & 1 deletion cassandra/policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ def make_query_plan(self, working_keyspace=None, query=None):
else:
replicas = self._cluster_metadata.get_replicas(keyspace, query.routing_key)

Comment thread
sylwiaszunejko marked this conversation as resolved.
if self.shuffle_replicas and not query.is_lwt():
if self.shuffle_replicas and not query.is_lwt() and not ConsistencyLevel.is_serial(query.consistency_level):
shuffle(replicas)

def yield_in_order(hosts):
Expand Down
58 changes: 58 additions & 0 deletions tests/unit/test_policies.py
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,35 @@ def _assert_shuffle(self, patched_shuffle, cluster, keyspace, routing_key):
assert patched_shuffle.call_count == 1


@patch('cassandra.policies.shuffle')
def test_no_shuffle_for_serial_consistency(self, patched_shuffle):
"""
Test to validate that replicas are not shuffled when the statement
has SERIAL or LOCAL_SERIAL consistency level, since such statements
should be routed like LWT requests.
@jira_ticket PYTHON-1394
@expected_result shuffle should not be called for serial consistency

@test_category policy
"""
for cl in (ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL):
for cluster in (self._prepare_cluster_with_vnodes(), self._prepare_cluster_with_tablets()):
patched_shuffle.reset_mock()
hosts = cluster.metadata.all_hosts()
child_policy = Mock()
child_policy.make_query_plan.return_value = hosts
child_policy.distance.return_value = HostDistance.LOCAL

policy = TokenAwarePolicy(child_policy, shuffle_replicas=True)
policy.populate(cluster, hosts)

query = Statement(routing_key='routing_key')
query.consistency_level = cl
list(policy.make_query_plan('keyspace', query))
assert patched_shuffle.call_count == 0, \
"shuffle should not be called for consistency level %s" % cl


class ConvictionPolicyTest(unittest.TestCase):
def test_not_implemented(self):
"""
Expand Down Expand Up @@ -1389,6 +1418,35 @@ def test_unavailable(self):
assert retry == RetryPolicy.RETRY
assert consistency == ConsistencyLevel.ONE

def test_serial_consistency_not_downgraded(self):
"""
Test that SERIAL/LOCAL_SERIAL consistency is never downgraded
to a non-serial consistency level by the retry policy.
@jira_ticket PYTHON-1394
@expected_result retry policy should rethrow or retry on next host
without downgrading serial consistency

@test_category policy
"""
policy = DowngradingConsistencyRetryPolicy()

for cl in (ConsistencyLevel.SERIAL, ConsistencyLevel.LOCAL_SERIAL):
# on_read_timeout should rethrow for serial consistency
retry, consistency = policy.on_read_timeout(
query=None, consistency=cl, required_responses=3,
received_responses=1, data_retrieved=True, retry_num=0)
assert retry == RetryPolicy.RETHROW, \
"Expected RETHROW for serial consistency %s on read timeout" % cl
assert consistency is None

# on_unavailable should retry on next host without downgrading
retry, consistency = policy.on_unavailable(
query=None, consistency=cl, required_replicas=3,
alive_replicas=1, retry_num=0)
assert retry == RetryPolicy.RETRY_NEXT_HOST, \
"Expected RETRY_NEXT_HOST for serial consistency %s on unavailable" % cl
assert consistency is None


class ExponentialRetryPolicyTest(unittest.TestCase):
def test_calculate_backoff(self):
Expand Down
Loading