Compliance that doesn't run in real time isn't compliance.
Once your agents start touching real data, customer profiles in a SQL database, invoices in a document store, internal Slack archives in a vector index, the question stops being "can the agent do this?" and becomes "was this lawful, right now, with proof?" That second question is what regulators ask. It's what auditors ask. It's what your security team asks the morning after an incident. The only honest answer is one your system produces mechanically, with provenance, in microseconds.
Most teams treat data oversight as a policy document, an annual review, and a checklist. That model works when humans make every decision. It breaks when agents issue thousands of data-access decisions per minute. The only enforcement that actually enforces an agent system is enforcement that runs in line with every action, fast enough not to break the loop, with hard rejection and complete lineage. The layer that does this has a real name in distributed systems: the control plane. It sits in front of every data touch and makes runtime decisions. The data plane moves bytes; the control plane decides whether the bytes were allowed to move.
What follows is the practical build-out: real-time data classification, policy decision points that run in microseconds, lineage tracking across structured and unstructured sources, privacy-preserving retrieval, multi-store right-to-erasure, residency routing, and the audit trail regulators actually want to see. Working code for every piece.
The real-time budget (this is the design constraint)
Before any specific mechanism, the most important constraint: every control-plane check has a strict latency budget. An agent loop that takes thirty seconds because every retrieval calls out to a slow policy service is unusable, and worse, when latency rises engineers turn the checks off. The budget below is what production systems actually hit:
| Stage | Latency budget | How it stays fast |
|---|---|---|
| Classify request (what is the user asking for?) | < 5 ms | Pre-computed labels on indexed data; no LLM call |
| Policy decision (is this allowed?) | < 1 ms | Pull policies into in-memory rule list; no network |
| Retrieval with classification filter | < 50 ms | Filter at index level, not post-hoc |
| PII redaction on retrieved text | < 20 ms | Compiled regex; ML detection only on flagged passages |
| Audit log write | < 1 ms blocking | Async write to durable storage; agent doesn't wait |
| Total per data-access decision | < 80 ms | Fits inside any reasonable agent loop |
Hit these budgets or your system gets bypassed in production. Every piece of code below is designed to fit within them.
The architecture: a Policy Decision Point in front of every data touch
Here's the shape that works. Every data-access path in your agent system, whether a SQL query, a vector retrieval, a document fetch, or an external API call, goes through a Policy Decision Point (PDP). The PDP is a deterministic function: given (subject, action, resource, context), it returns allow or deny in microseconds. The agent never talks to a data store directly; it talks to a Compliance Gate that wraps every store.
┌──────────┐ request ┌──────────────────────┐
│ │ ─────────────► │ │
│ Agent │ │ Compliance Gate │
│ │ ◄───────────── │ (PDP + classifier │
└──────────┘ filtered │ + redactor + audit)│
results └──────┬───────┬───────┘
│ │
┌────────┴──┐ ┌──┴────────┐ ┌──────┐
│ SQL DB │ │ Vector │ │ Docs │
│ structured│ │ store │ │ store│
└───────────┘ └───────────┘ └──────┘
Three things make this work in real time. Policies live in memory on every agent worker, not in a remote service; updates ship via change feed. Classification labels are precomputed on data at ingest time, not at query time. The PDP itself is a pure function with no I/O, evaluable in well under a millisecond.
The Policy Decision Point
A policy engine evaluates rules of the form: given who is asking, what they're trying to do, what they're trying to access, and the context of the request, allow or deny. Production systems usually use Open Policy Agent (OPA) with policies written in Rego. The same logic in pure Python, fast enough to run in line with every retrieval, looks like this:
from dataclasses import dataclass
from typing import Callable
@dataclass(frozen=True)
class AccessRequest:
subject: dict # {"agent_id": ..., "role": ..., "tenant": ...}
action: str # "read", "join", "aggregate", "export"
resource: dict # {"type": "row|doc|chunk", "classification": ..., "tags": ...}
context: dict # {"region": "EU", "purpose": ..., "user_consent": True}
@dataclass(frozen=True)
class Decision:
allow: bool
reason: str
obligations: tuple = () # actions to perform if allowing (e.g., "redact_pii")
class PolicyEngine:
"""In-memory policy decision point. Sub-millisecond evaluation.
Rules are evaluated in order; the first non-None decision wins.
Default is deny: a missing decision means refuse access."""
def __init__(self):
self.rules: list[Callable[[AccessRequest], Decision | None]] = []
def add_rule(self, rule: Callable):
self.rules.append(rule)
def evaluate(self, req: AccessRequest) -> Decision:
for rule in self.rules:
decision = rule(req)
if decision is not None:
return decision
return Decision(allow=False, reason="default_deny")
# Example policy stack for a customer-support system
def restricted_data_blocks_unless_consented(req):
"""Restricted-class data requires explicit user consent in context."""
if req.resource.get("classification") == "restricted":
if not req.context.get("user_consent"):
return Decision(False, "restricted:no_consent")
return None
def eu_residency_blocks_cross_region(req):
"""EU-tagged data may not be read from non-EU regions."""
if "eu_residency" in req.resource.get("tags", []):
if req.context.get("region") != "EU":
return Decision(False, "residency:eu_only")
return None
def tenant_isolation(req):
"""Agents may only access data belonging to their own tenant."""
if req.subject.get("tenant") != req.resource.get("tenant"):
return Decision(False, "tenant:cross_tenant_blocked")
return None
def pii_requires_redaction(req):
"""Reading PII data is allowed but emits a redaction obligation."""
if "pii" in req.resource.get("tags", []) and req.action == "read":
return Decision(True, "pii:read_with_redaction", obligations=("redact_pii",))
return None
def support_role_baseline_allow(req):
"""Support agents reading public/internal data: allow."""
if req.subject.get("role") == "support" and req.action == "read":
if req.resource.get("classification") in ("public", "internal"):
return Decision(True, "support:read_allowed")
return None
# Wire them up: order matters. Hard denials first, then conditional allows.
engine = PolicyEngine()
engine.add_rule(tenant_isolation)
engine.add_rule(eu_residency_blocks_cross_region)
engine.add_rule(restricted_data_blocks_unless_consented)
engine.add_rule(pii_requires_redaction)
engine.add_rule(support_role_baseline_allow)
What this gives you in practice. The evaluate() call is pure Python with no I/O, so it runs in roughly 5-50 microseconds even on a busy server. Rules are ordered so hard denials fire before conditional allows, which means a single denial short-circuits the whole evaluation. The default-deny tail catches any request that no rule matches: silence does not mean permission. Obligations let an allow carry conditions, and the gate is required to honor them: if a decision says obligations=("redact_pii",), the gate must run the redactor before returning the data.
Real-time data classification
The PDP only works if the data arriving at it has been classified. You cannot ask the LLM to classify data at query time, that's slow, expensive, and non-deterministic. Classification happens at ingest time, with a small, fast classifier, and the labels become metadata on every row, document, and vector chunk. The label set should be small and orthogonal:
- Classification level (public / internal / confidential / restricted) , the broadest gate.
- Sensitivity tags (pii, phi, financial, credentials) , drive obligations like redaction.
- Residency tags (eu_residency, us_only, no_export) , drive geographic routing.
- Retention tags (delete_after_30d, legal_hold) , drive lifecycle policies.
- Consent tags (consent_required, consent_revoked) , drive per-subject access.
import re
from dataclasses import dataclass, field
@dataclass
class ClassificationResult:
classification: str # 'public' | 'internal' | 'confidential' | 'restricted'
tags: set # {'pii', 'eu_residency', ...}
confidence: float = 1.0
matched_rules: list = field(default_factory=list)
class DataClassifier:
"""Fast rule-based classifier. Runs at ingest time, results stored as metadata.
For unstructured text, regex + named-entity patterns; for structured data,
column-level rules. Falls back to an ML detector only when rules are unsure."""
# Compiled once at class load time so each call is cheap
PII_PATTERNS = {
"email": re.compile(r'\b[\w.+-]+@[\w-]+\.[\w.-]+\b'),
"ssn_us": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
"phone_us": re.compile(r'\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b'),
"credit_card": re.compile(r'\b(?:\d[ -]*?){13,16}\b'),
"ip_addr": re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b'),
}
PHI_KEYWORDS = frozenset({
"diagnosis", "prescription", "treatment", "medical record",
"patient id", "icd-10",
})
CREDENTIAL_PATTERNS = {
"aws_key": re.compile(r'AKIA[0-9A-Z]{16}'),
"private_key": re.compile(r'-----BEGIN (?:RSA |EC |DSA )?PRIVATE KEY-----'),
"bearer_token": re.compile(r'\bey[A-Za-z0-9_-]{20,}\.[A-Za-z0-9_-]+\.[A-Za-z0-9_-]+'),
}
def classify_text(self, text: str, source_hints: dict = None) -> ClassificationResult:
tags = set()
matched = []
source_hints = source_hints or {}
# Carry forward any hints from the source itself
# (e.g., source_hints={"region": "eu-west-1"} -> eu_residency tag)
if source_hints.get("region", "").startswith("eu-"):
tags.add("eu_residency")
# Credential patterns: highest sensitivity, classify as restricted
for name, pat in self.CREDENTIAL_PATTERNS.items():
if pat.search(text):
tags.update({"credentials", "pii"})
matched.append(f"credential:{name}")
return ClassificationResult("restricted", tags, 1.0, matched)
# PII patterns
pii_hits = 0
for name, pat in self.PII_PATTERNS.items():
if pat.search(text):
pii_hits += 1
matched.append(f"pii:{name}")
if pii_hits:
tags.add("pii")
# PHI: keyword-based detection (would be ML in production)
lower = text.lower()
if any(kw in lower for kw in self.PHI_KEYWORDS):
tags.add("phi")
matched.append("phi:keyword")
# Decide classification level from tags
if "phi" in tags or pii_hits >= 2:
level = "confidential"
elif "pii" in tags:
level = "internal"
else:
level = source_hints.get("default_classification", "public")
return ClassificationResult(level, tags, 0.95, matched)
A few details worth pulling out. Patterns are compiled at class load time: re-compiling regex per call is the single biggest perf trap in Python text processing. The result includes matched_rules, so when an auditor asks "why was this row classified restricted?", the answer is mechanical, not a guess. Source hints carry forward: if data came from an EU region, the residency tag is set automatically. The classifier returns confidence; in production, hits below 0.9 trigger an async ML detector for review, while the deterministic path stays fast.
Lineage tracking across structured and unstructured sources
For audit, "the agent said X" is unprovable. What you need is "the agent said X based on rows 1234 and 1238 from the customer table, document doc_5512 version 7, and chunk 42 of doc_8801." That's lineage: the chain of every data touch that contributed to a response. It must travel with the response automatically; if it relies on the agent to remember to record it, it will be missing exactly when you need it.
from contextvars import ContextVar
from dataclasses import dataclass, field
from time import time
import uuid
@dataclass(frozen=True)
class DataTouch:
source: str # 'sql:customers' | 'docs:tickets' | 'vec:engineering_kb'
record_id: str # row PK, doc id, or chunk id
version: str # for time-travel: snapshot or commit hash
classification: str
tags: tuple
purpose: str # business reason logged for this touch
timestamp: float
@dataclass
class LineageContext:
request_id: str
agent_id: str
user_id: str | None
touches: list = field(default_factory=list)
def record(self, touch: DataTouch) -> None:
self.touches.append(touch)
def summarize(self) -> dict:
return {
"request_id": self.request_id,
"agent_id": self.agent_id,
"user_id": self.user_id,
"sources_touched": sorted({t.source for t in self.touches}),
"records": [(t.source, t.record_id, t.version) for t in self.touches],
"max_classification": max(
(t.classification for t in self.touches),
key=lambda c: ["public", "internal", "confidential", "restricted"].index(c),
default="public",
),
"all_tags": sorted({tag for t in self.touches for tag in t.tags}),
}
# A context variable lets every layer of the agent system see the active
# lineage without having to thread it through every function call.
_active_lineage: ContextVar[LineageContext | None] = ContextVar("lineage", default=None)
def begin_request(agent_id: str, user_id: str | None) -> LineageContext:
ctx = LineageContext(
request_id=str(uuid.uuid4()),
agent_id=agent_id,
user_id=user_id,
)
_active_lineage.set(ctx)
return ctx
def record_touch(source, record_id, version, classification, tags, purpose):
"""Called by every data store wrapper. Cannot be skipped without removing
the wrapper, which is detectable in code review."""
ctx = _active_lineage.get()
if ctx is None: return
ctx.record(DataTouch(
source=source, record_id=record_id, version=version,
classification=classification, tags=tuple(tags),
purpose=purpose, timestamp=time(),
))
The use of a ContextVar matters. It propagates automatically through async tasks and threads, so every wrapped store, the SQL adapter, the vector retriever, the document fetcher, can call record_touch() without any explicit plumbing. Lineage stays attached to the request even when the agent fans out work across multiple specialists. At response time, summarize() produces a record you can attach to the response itself, log to the audit chain from the Trust chapter, or hand to a regulator.
The Compliance Gate: classification-aware retrieval
The gate is what wraps every store. It receives a request, evaluates policy, filters the store query by classification and tenant, runs any obligations the policy attached, and records lineage. The agent never bypasses this; the store object itself is private behind the gate.
class ComplianceGate:
"""Wraps a data store with policy + classification filter + redaction + audit."""
def __init__(self, store, policy: PolicyEngine, redactor, audit):
self._store = store
self.policy = policy
self.redactor = redactor
self.audit = audit
def retrieve(self, query, subject, context, source_name) -> list:
# 1. Issue the request to the index, passing classification filters that
# eliminate forbidden classes BEFORE results leave the store. This is
# safer than filtering after retrieval (a leaky pipeline can't leak
# what was never returned) and faster (less data crosses the wire).
candidate_classes = self._allowed_classes_for(subject, context)
candidates = self._store.search(
query=query,
tenant=subject["tenant"],
allowed_classifications=candidate_classes,
top_k=20,
)
# 2. Per-record policy check. Cheap because classification is precomputed.
results = []
for rec in candidates:
req = AccessRequest(
subject=subject,
action="read",
resource={
"type": rec.kind,
"classification": rec.classification,
"tags": rec.tags,
"tenant": rec.tenant,
},
context=context,
)
decision = self.policy.evaluate(req)
if not decision.allow:
self.audit.write(subject["agent_id"], "data.denied", {
"source": source_name, "record": rec.id, "reason": decision.reason,
})
continue
# 3. Honor obligations attached to the decision (e.g., redact PII).
content = rec.content
if "redact_pii" in decision.obligations:
content = self.redactor.redact(content)
# 4. Record lineage. Always, automatically, no agent involvement.
record_touch(
source=source_name, record_id=rec.id, version=rec.version,
classification=rec.classification, tags=rec.tags,
purpose=context.get("purpose", "unspecified"),
)
results.append(rec.with_content(content))
self.audit.write(subject["agent_id"], "data.retrieved", {
"source": source_name, "count": len(results),
"classifications": sorted({r.classification for r in results}),
})
return results
def _allowed_classes_for(self, subject, context):
"""Quickly compute which classification levels the subject can possibly
see, so the index filter can drop the rest. Saves bandwidth and
avoids round-trips for forbidden data."""
levels = ["public", "internal", "confidential", "restricted"]
for i, lvl in enumerate(levels):
req = AccessRequest(
subject=subject, action="read",
resource={"classification": lvl, "tags": [], "tenant": subject["tenant"]},
context=context,
)
if not self.policy.evaluate(req).allow:
return levels[:i]
return levels
The most important architectural decision here: classification filtering happens at the index level, not after retrieval. A vector store that returns 100 chunks and then has 80 dropped is leaking metadata about what exists; a vector store that returns 20 because the filter pushed down is the correct shape. The same applies to SQL (use row-level security) and document stores (use ACL-aware queries). Post-retrieval filtering is a fallback, not the primary defense.
PII redaction with bounded latency
When the policy says "allow with redaction," the gate runs the redactor on the retrieved text. The same regex set used for classification doubles as the redaction set, but in production you want a faster path: classify once at ingest, store a "redaction map" alongside the document (just a list of character ranges), and at retrieval time apply the precomputed redaction without re-running the regex.
class PIIRedactor:
"""Redact PII tokens in a string. Fast path uses precomputed ranges;
slow path falls back to regex when the ranges aren't available."""
REPLACEMENTS = {
"email": "[EMAIL]",
"ssn_us": "[SSN]",
"phone_us": "[PHONE]",
"credit_card": "[CARD]",
"ip_addr": "[IP]",
}
def __init__(self, classifier: DataClassifier):
self.classifier = classifier
def redact(self, text: str, precomputed_ranges: list = None) -> str:
if precomputed_ranges:
# Fast path: just apply the ranges. O(n) in length of text.
return self._apply_ranges(text, precomputed_ranges)
# Slow path: regex sweep. Still O(n) per pattern, but with constant overhead.
out = text
for name, pat in self.classifier.PII_PATTERNS.items():
out = pat.sub(self.REPLACEMENTS.get(name, "[REDACTED]"), out)
for name, pat in self.classifier.CREDENTIAL_PATTERNS.items():
out = pat.sub("[CREDENTIAL]", out)
return out
def _apply_ranges(self, text, ranges):
# ranges = [(start, end, kind), ...] sorted by start, non-overlapping
out = []
cursor = 0
for start, end, kind in ranges:
out.append(text[cursor:start])
out.append(self.REPLACEMENTS.get(kind, "[REDACTED]"))
cursor = end
out.append(text[cursor:])
return "".join(out)
For 1KB of text with 20 PII tokens, the precomputed-ranges path runs in well under a millisecond. The regex fallback runs in 5-15ms depending on text length. Both fit the budget; the precomputed path is what you ship for hot retrieval paths.
Right-to-erasure across heterogeneous stores (GDPR Article 17)
When a customer asks to be forgotten, you have to delete their data from every store the agent system knows about, and you have to do it in a coordinated way that doesn't leave dangling references. This is the hardest practical compliance problem in agent systems, because the data is not just in one database, it's also in vector embeddings, in cached responses, in document indexes, in the audit log itself.
The pattern that works is a coordinator that issues an erasure transaction across all stores, collects deletion proofs from each, and writes a single signed erasure record. If any store fails to delete, the transaction is marked partial and retried; the record is not closed until every store has confirmed.
from dataclasses import dataclass, field
from typing import Protocol
class ErasableStore(Protocol):
name: str
def find_subject_records(self, subject_id: str) -> list[str]: ...
def delete_records(self, record_ids: list[str]) -> DeletionProof: ...
@dataclass(frozen=True)
class DeletionProof:
store: str
record_count: int
record_ids_hash: str # SHA-256 of sorted record IDs (for audit, not lookup)
completed_at: float
@dataclass
class ErasureRequest:
subject_id: str # the user being erased
requested_by: str # operator or self-service ID
legal_basis: str # 'gdpr_article_17' | 'ccpa_dsr_delete' | ...
reason: str # free-text from the request
initiated_at: float
@dataclass
class ErasureRecord:
request: ErasureRequest
proofs: list = field(default_factory=list)
failures: dict = field(default_factory=dict)
def complete(self) -> bool:
return len(self.failures) == 0 and len(self.proofs) > 0
class ErasureCoordinator:
def __init__(self, stores: list[ErasableStore], audit, max_retries: int = 3):
self.stores = stores
self.audit = audit
self.max_retries = max_retries
def erase(self, request: ErasureRequest) -> ErasureRecord:
record = ErasureRecord(request=request)
self.audit.write("system", "erasure.initiated", {
"subject": request.subject_id, "basis": request.legal_basis,
})
for store in self.stores:
self._erase_one_store(store, request, record)
# Re-run with retries for any failures
for attempt in range(self.max_retries):
if not record.failures:
break
failed_stores = [s for s in self.stores if s.name in record.failures]
for store in failed_stores:
record.failures.pop(store.name, None)
self._erase_one_store(store, request, record)
if record.complete():
self.audit.write("system", "erasure.completed", {
"subject": request.subject_id,
"stores": [p.store for p in record.proofs],
"total_records": sum(p.record_count for p in record.proofs),
})
else:
self.audit.write("system", "erasure.partial", {
"subject": request.subject_id,
"failed_stores": list(record.failures.keys()),
})
return record
def _erase_one_store(self, store, request, record):
try:
ids = store.find_subject_records(request.subject_id)
if not ids:
record.proofs.append(DeletionProof(store.name, 0, "", time()))
return
proof = store.delete_records(ids)
record.proofs.append(proof)
except Exception as e:
record.failures[store.name] = str(e)
Two practical points worth knowing. The vector store is the trickiest piece: deleting a row in SQL is one operation, but deleting from a vector index often requires re-indexing the surrounding chunks if you're using compressed indexes (PQ, IVF). Most modern vector databases (Pinecone, Weaviate, Qdrant) have native delete-by-metadata, but verify yours does before you promise erasure timelines. The audit log itself is treated specially: GDPR allows retaining audit records of the erasure even after the data is gone, since the audit serves a different lawful basis (compliance proof). Don't delete the erasure record when you delete the subject; that defeats the entire point.
Data residency: routing without leaks
EU data must stay in the EU. An agent that routes a query to a US-hosted LLM violates GDPR even if the agent itself is EU-hosted, because the prompt going to the LLM contained the EU data. The fix is a residency-aware router that picks the model endpoint based on data tags in the request.
class ResidencyRouter:
"""Picks an LLM endpoint based on the residency tags carried by the request.
Default-deny: if no endpoint is found that satisfies all required residencies,
raise rather than silently routing somewhere safe-ish."""
def __init__(self, endpoints: list[dict]):
# endpoints = [{"url": ..., "regions": {"EU"}, "model": "..."}, ...]
self.endpoints = endpoints
def route(self, required_residencies: set, model_class: str) -> dict:
for ep in self.endpoints:
if ep["model"] != model_class: continue
# Endpoint is acceptable iff its regions are a SUPERSET of
# the required residency tags.
if required_residencies.issubset(ep["regions"]):
return ep
raise RuntimeError(
f"no endpoint satisfies residency {required_residencies}"
f" for model {model_class}"
)
# Use it like this in the agent's call site:
required = {"EU"} if "eu_residency" in lineage_tags else set()
endpoint = router.route(required, model_class="chat-large")
response = call_llm(endpoint["url"], prompt)
The default-deny behavior is the important part. If the router cannot find an EU endpoint and the data carries an eu_residency tag, the right answer is to fail loudly, not to route to a US endpoint with a comment that says "TODO: add EU support." Failing loudly puts the issue on the on-call engineer's desk; failing silently puts it on the regulator's desk eighteen months later.
Common ways this goes wrong
| Failure mode | What goes wrong | Mitigation |
|---|---|---|
| Linkage attacks | Two queries that are individually compliant produce identifiable data when joined (e.g., anonymized health records + zip code = re-identification) | Track aggregation in the lineage; deny joins that cross sensitivity classes; differential privacy on aggregates |
| Cache poisoning | A response cached for one user gets served to another with different permissions | Cache key includes (user, tenant, classification ceiling); never key on prompt alone |
| Embedding leakage | Vector embeddings of PII can be reversed to recover the source text (membership inference attacks) | Don't embed restricted data; use separate indexes per classification; rotate embedding models |
| Prompt as exfiltration channel | Compromised agent embeds restricted data in its outbound API calls (e.g., as part of a search query string) | Egress filter on every external call; classify outbound payloads before they leave |
| Audit pipeline failure | The audit writes are lost during outages, silently breaking the chain | Audit pipeline independent of agent infrastructure; back-pressure rather than silent drop |
| Over-broad consent | "User consented" gets attached to every request indiscriminately, including ones the user didn't actually consent to | Consent scoped to specific purposes and resources; consent record verifiable independently of the agent |
| Erasure incompleteness | Subject is deleted from primary store but lives on in caches, embeddings, backups, or downstream copies | Coordinator with explicit completion criteria; vector index re-build; cache invalidation by subject ID |
The supervision-to-autonomy gradient
The standard mental model for agent oversight is a binary: either a human is in the loop, or the agent is autonomous. The binary is wrong. Real systems already operate on a gradient, but informally and inconsistently. Making it explicit lets you treat trust as something agents earn rather than something granted up front.
Four operating points cover almost all useful cases. A fifth is worth knowing about for high-throughput environments.
The agent proposes; the human disposes. No action ships without an explicit approval click. This is the right mode for any agent in its first month of deployment, any agent acting on data the team has not yet labeled, and any agent whose actions are irreversible. Latency is high; trust accumulation is fast because every disagreement is captured.
The agent acts and the human reviews the action within a short SLA, with the option to roll back. This works for actions whose effects are reversible inside a useful window. Most internal-tooling agents belong here. The review step is the trust-building loop: when reviewers agree with 99 of 100 actions, the agent earns the right to move up.
Most actions ship with no review; a sampled fraction (say 5%) is reviewed asynchronously. This is the working mode for high-volume agents that have demonstrated reliability. The sample rate is a knob you tune downward as confidence grows and upward when something looks off in production.
The agent acts without human review of individual actions. Trust is maintained through aggregate metrics: error rate, drift detection, and periodic re-evaluation against held-out cases. Reserved for narrow domains where the agent has a long track record at sampled level and where individual failures are tolerable in a way that aggregate failure is not.
Useful in batch-style environments. The agent is permitted to act, but its actions are held in escrow and released by a human reviewer in batches. This unifies sampled and reviewed for cases where human review is the throughput bottleneck and the actions are independent, so batched approval is feasible.
The earning-up mechanic is what makes this work as a system rather than a static taxonomy. Promotion criteria are context-specific. An agent that has demonstrated competence in 1,000 supervised invoice-processing cases earns the right to operate in reviewed mode for invoice processing, not for refund decisions. The trust state is per-domain, granular, and auditable. Every promotion or demotion is a recorded event in the same audit log that records every action.
A practical implementation: the policy decision point checks not just whether an action is permitted, but at what supervision level the agent currently operates for this action's domain. The supervision level joins the trust score, the capability token, and the policy bundle as inputs to the access decision. An agent operating at "supervised" level for refunds and "autonomous" for read-only queries on the same workflow is the normal case, not the exception.
Practical advice
- Default-deny everywhere. Missing rules don't allow; missing classifications don't allow; missing endpoint matches don't route. Loud failures are how you find the gaps before regulators do.
- Classify at ingest, not at query time. Real-time classification on every retrieval kills your latency budget and produces inconsistent results. Pre-compute labels; store them as metadata; treat label edits as a privileged operation.
- Filter at the index level, not after retrieval. A store that returns forbidden records and then strips them is a leaky pipeline. The index itself should never produce results outside the subject's classification ceiling.
- Make policies in-memory and version-controlled. Policies are code: review them in Git, deploy them with the application, roll them back like any other change. Hot-reloading via change feed is fine; remote evaluation per request is not.
- Track lineage with a context variable. Threading lineage through every function manually means it gets dropped exactly when something goes wrong.
ContextVarpropagates automatically through async tasks and is forgotten only if you remove the wrapper, which is detectable in code review. - Run the erasure path quarterly. Pick a synthetic subject; trigger a full erasure; verify that downstream consumers actually delete (vector index, caches, backups, docs). Untested compliance flows fail silently the moment they're needed.
- Audit the audit log itself. Hash-chain it, write to a separate trust domain, sign each entry. The audit log is the last line of defense and the first thing a sophisticated attacker tries to corrupt.
- Treat structured and unstructured the same way. A SQL row with PII and a Slack message with PII are the same kind of risk. The classification, the policy, the redactor, the audit are the same; only the store wrapper changes.