Skip to content

Commit 8b31ce3

Browse files
feat(durable-buffer): Enhance durable_buffer observability with utilization and dropped/expired per-signal metrics
- Added a new storage_utilization Gauge (f64) metric to track persistent storage capacity. - Added per-signal dropped/expired metrics (dropped_log_records, dropped_spans, dropped_metric_datapoints, expired_log_records, expired_spans, expired_metric_datapoints) utilizing Quiver's native per-slot atomic tracking. - Appended new metrics to the end of DurableBufferMetrics to prevent index shifting. - Renamed recompute_queued_counters to recompute_metrics to align with the expanded scope. - Added changelog entry and unit tests verifying the metrics functionality.
1 parent f8cd17f commit 8b31ce3

8 files changed

Lines changed: 491 additions & 37 deletions

File tree

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Use this template to create a new changelog entry.
2+
# Save this file to the `.chloggen/` directory in the repository root.
3+
# Example: `.chloggen/my-feature.yaml`
4+
5+
# (Required) - Type of change. Allowed values are:
6+
# breaking: A change that breaks backwards compatibility.
7+
# deprecation: A change that deprecates a feature or component.
8+
# new_component: A change that introduces a new component.
9+
# enhancement: A change that introduces a new feature or enhancement.
10+
# bug_fix: A change that fixes a bug.
11+
change_type: enhancement
12+
13+
# (Required) - The component(s) affected by this change.
14+
# See the `config.yaml` file in this directory for a list of allowed components.
15+
component: pipeline
16+
17+
# (Required) - A brief note explaining the change.
18+
# Example: "Add support for OTLP 1.0"
19+
note: Enhance durable_buffer observability with storage utilization and per-signal data loss metrics.
20+
21+
# (Required) - Issue number(s) addressed by this change.
22+
# Example: [123, 456]
23+
issues: [3117]
24+
25+
# (Optional) - User(s) who contributed to this change.
26+
# Omit if this change was authored by one of the project's core maintainers.
27+
# Note that this only applies to the top-level CHANGELOG, since we use
28+
# GitHub's auto-generated changelog for releases.
29+
# Example: [user1, user2]
30+
# subtext: "Thanks to @user1 and @user2 for this contribution!"
31+
subtext:

rust/otap-dataflow/crates/core-nodes/src/processors/durable_buffer_processor/bundle_adapter.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ const fn to_slot_id(_signal_type: SignalType, payload_type: ArrowPayloadType) ->
8282
}
8383

8484
/// Convert signal type to OTLP slot ID (for opaque binary storage)
85-
const fn to_otlp_slot_id(signal_type: SignalType) -> SlotId {
85+
pub(crate) const fn to_otlp_slot_id(signal_type: SignalType) -> SlotId {
8686
SlotId::new(match signal_type {
8787
SignalType::Logs => otlp_slots::OTLP_LOGS,
8888
SignalType::Traces => otlp_slots::OTLP_TRACES,
@@ -113,6 +113,11 @@ pub(crate) fn signal_type_from_slot_id(slot: SlotId) -> Option<SignalType> {
113113
from_slot_id(slot).map(|(st, _)| st)
114114
}
115115

116+
pub(crate) const ARROW_LOGS_SLOT: SlotId = SlotId::new(30);
117+
pub(crate) const ARROW_TRACES_SLOT: SlotId = SlotId::new(40);
118+
pub(crate) const ARROW_METRICS_SLOT_1: SlotId = SlotId::new(10);
119+
pub(crate) const ARROW_METRICS_SLOT_2: SlotId = SlotId::new(11);
120+
116121
/// Convert a slot ID back to payload type only (Arrow format only).
117122
///
118123
/// Returns the `ArrowPayloadType` for the given slot, or `None` if the slot
@@ -1007,7 +1012,7 @@ mod tests {
10071012
// HashMap iteration order is not guaranteed, but all slots should be from same signal
10081013
#[rustfmt::skip]
10091014
let records = OtapArrowRecords::Traces(traces!(
1010-
(Spans,
1015+
(Spans,
10111016
("id", UInt16, vec![0u16, 1])),
10121017
(SpanEvents,
10131018
("id", UInt32, vec![0u32, 1]),

0 commit comments

Comments
 (0)