Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions prometheus_client/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ def remove(self, *labelvalues: Any) -> None:
warnings.warn(
"Removal of labels has not been implemented in multi-process mode yet.",
UserWarning)
if 'PROMETHEUS_REDIS_URL' in os.environ:
warnings.warn(
"Removal of labels has not been implemented in redis mode yet.",
UserWarning)

if not self._labelnames:
raise ValueError('No label names were set when constructing %s' % self)
Expand All @@ -226,6 +230,10 @@ def remove_by_labels(self, labels: dict[str, str]) -> None:
"Removal of labels has not been implemented in multi-process mode yet.",
UserWarning
)
if 'PROMETHEUS_REDIS_URL' in os.environ:
warnings.warn(
"Removal of labels has not been implemented in redis mode yet.",
UserWarning)

if not self._labelnames:
raise ValueError('No label names were set when constructing %s' % self)
Expand Down Expand Up @@ -258,6 +266,10 @@ def clear(self) -> None:
warnings.warn(
"Clearing labels has not been implemented in multi-process mode yet",
UserWarning)
if 'PROMETHEUS_REDIS_URL' in os.environ:
warnings.warn(
"Clearing of labels has not been implemented in redis mode yet.",
UserWarning)
with self._lock:
self._metrics = {}

Expand Down
93 changes: 93 additions & 0 deletions prometheus_client/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import os
from datetime import timedelta
from threading import Event, Thread
from typing import Any
from urllib.parse import urlsplit

from redis import Redis

# For testing, a pool of otherwise anonymous FakeRedis instances are made
# available by ID
_fake_redis_pool: dict[int, Redis] = {}


def redis_client() -> Redis:
"""
Create a redis client for PROMETHEUS_REDIS_URL.

Configure the redis database via a URL in PROMETHEUS_REDIS_URL of the form
redis://localhost:6379/0
"""
parsed_url = urlsplit(os.environ["PROMETHEUS_REDIS_URL"])
assert parsed_url.path.startswith("/")
assert parsed_url.path[1:].isdigit()
port = parsed_url.port or 6379
db = int(parsed_url.path[1:])

if parsed_url.scheme == "fakeredis":
from fakeredis import FakeRedis

if db not in _fake_redis_pool:
_fake_redis_pool[db] = FakeRedis()
return _fake_redis_pool[db]

assert parsed_url.scheme == "redis"
assert parsed_url.hostname
return Redis(host=parsed_url.hostname, port=port, db=db)


# For each process identifier, a list of keys that should be kept from expiring
_live_metrics: dict[str, set[str]] = {}


def _key_expiry() -> timedelta:
"""Return the configured expiry for multiprocess keys."""
return timedelta(seconds=int(os.environ.get("PROMETHEUS_REDIS_REFRESH_TTL", 20)))


class KeepMetricsAliveThread(Thread):
"""A daemon thread that keeps metrics from expiring as long as we live."""

stop: Event
identifier: str

def __init__(self, identifier: str, *args: Any, **kwargs: Any) -> None:
self.stop = Event()
self.identifier = identifier
super().__init__(*args, **kwargs)

def loop_wait(self, delay: float) -> bool:
return self.stop.wait(delay)

def run(self) -> None:
delay = float(os.environ.get("PROMETHEUS_REDIS_REFRESH_FREQUENCY", 10))
expiry = _key_expiry()
client = redis_client()
while not self.loop_wait(delay):
for key in _live_metrics[self.identifier]:
client.expire(key, expiry)


_daemon_threads: dict[str, KeepMetricsAliveThread] = {}


def _keep_key_from_expiring(identifier: str, key: str) -> None:
"""Stop key for process identifier from expiring as long as we are alive."""
_live_metrics.setdefault(identifier, set()).add(key)
if identifier not in _daemon_threads:
thread = KeepMetricsAliveThread(identifier=identifier, daemon=True)
thread.start()
_daemon_threads[identifier] = thread


def mark_process_dead(identifier: str | int) -> None:
"""Immediately expire all live* metrics for process identifier."""
thread = _daemon_threads.pop(str(identifier), None)
if thread is not None:
thread.stop.set()
thread.join()

keys = _live_metrics.pop(str(identifier), None)
if not keys:
return
redis_client().delete(*keys)
142 changes: 142 additions & 0 deletions prometheus_client/redis_collector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import json
from collections.abc import Iterable
from typing import cast

from .metrics_core import Metric
from .redis import redis_client
from .registry import Collector, CollectorRegistry
from .samples import Sample
from .values import MULTIPROCESS_MODE_T


class RedisCollector(Collector):
"""Collector for redis mode."""

def __init__(self, registry: CollectorRegistry | None) -> None:
self._client = redis_client()
if registry:
registry.register(self)

def _iter_values(self) -> Iterable[tuple[bytes, str]]:
cursor = 0
while True:
cursor, keys = self._client.scan(cursor=cursor, match="value:*")
values = self._client.mget(keys)
yield from zip(keys, values)
if cursor == 0:
break

def collect(self) -> Iterable[Metric]:
metrics: dict[str, Metric] = {}
histograms: set[str] = set()
multiprocess: dict[str, MULTIPROCESS_MODE_T] = {}

for key, value_s in self._iter_values():
# FIXME: Catch ValueError here, just in case?
prefix_b, typ_b, multiprocess_mode_b, mmap_key = key.split(b":", 3)
assert prefix_b == b"value"
value = float(value_s)

metric_name, name, labels, help_text = json.loads(mmap_key)

metric = metrics.get(metric_name)
if metric is None:
typ = typ_b.decode()
metric = Metric(metric_name, help_text, typ)
metrics[metric_name] = metric

if typ in ("histogram", "gaugehistogram"):
histograms.add(metric_name)

multiprocess_mode = cast(
MULTIPROCESS_MODE_T, multiprocess_mode_b.decode()
)
if typ in ("gauge", "gaugehistogram") and multiprocess_mode:
multiprocess[metric_name] = multiprocess_mode

metric.add_sample(name, labels, value)

for name, multiprocess_mode in multiprocess.items():
self._accumulate_multiprocess(metrics[name], multiprocess_mode)

for name in histograms:
self._fix_histogram(metrics[name])

return metrics.values()

def _accumulate_multiprocess(
self, metric: Metric, multiprocess_mode: MULTIPROCESS_MODE_T
) -> None:
"""Merge metrics from multiple processes using multiprocess_mode."""
# We deal with live/dead with Redis expiry
if multiprocess_mode.startswith("live"):
multiprocess_mode = cast(
MULTIPROCESS_MODE_T, multiprocess_mode[len("live") :]
)
if multiprocess_mode == "all":
return

by_label: dict[tuple[tuple[str, ...], str], Sample] = {}

for sample in metric.samples:
labels = sample.labels.copy()
labels.pop("pid")
key = (tuple(labels.values()), sample.name)
value = sample.value
if key in by_label:
current_value = by_label[key].value
if multiprocess_mode == "min" and value > current_value:
continue
if multiprocess_mode == "max" and value < current_value:
continue
if multiprocess_mode == "sum":
value += current_value
if multiprocess_mode == "mostrecent":
raise NotImplementedError(
"The 'mostrecent' modes are not supported in RedisCollector"
)
by_label[key] = Sample(sample.name, labels, value)

metric.samples = list(by_label.values())

def _fix_histogram(self, metric: Metric) -> None:
"""
Fix-up histogram samples.

Sort the buckets as expected by a client, and accumulate the values.
The Histogram class is optimized to only increment the bucket that a
value first appears in, not larger ones that would also contain it.
"""
by_label: dict[tuple[tuple[str, ...], str], list[Sample]] = {}

# Organize into lists of samples by label
for sample in metric.samples:
if "le" in sample.labels:
labels_without_le = sample.labels.copy()
labels_without_le.pop("le")
key = (tuple(labels_without_le.values()), sample.name)
else:
key = (tuple(sample.labels.values()), sample.name)
by_label.setdefault(key, []).append(sample)

metric.samples = []

for (labels, name), samples in sorted(by_label.items()):
if name.endswith("_bucket"):
# Sort buckets within each label
samples.sort(key=lambda sample: float(sample.labels["le"]))

# Accumulate values into larger buckets
value = 0.0
for sample in samples:
value += sample.value
metric.samples.append(Sample(sample.name, sample.labels, value))

labels_without_le = sample.labels.copy()
labels_without_le.pop("le")
metric.samples.append(
Sample(f"{metric.name}_count", labels_without_le, value)
)

else:
metric.samples.extend(samples)
Loading