Service
The HTTP + async-queue layer — a FastAPI app factory with dependency injection, the RQ task queue behind a TaskQueue Protocol, worker-owned ingestion jobs, the FAQ short-circuit cache, and ServiceConfig (env RAGSPINE_*).
The service domain (src/ragspine/service/) wraps the engine in an HTTP API and an async
ingestion pipeline. It owns five things: ServiceConfig (env-driven), the FastAPI app, the
RQ task queue, worker-owned ingestion jobs, and the FAQ short-circuit cache.
One invariant governs the whole layer:
The FAQ cache sits in front of the anti-fabrication guard — so it must conservatively exclude anything a vetted short-circuit could get wrong: structured-numeric / competitor / real-time / expired / disabled / RESTRICTED questions never short-circuit. See FAQ short-circuit.
Layout
ServiceConfig
config.py assembles a frozen ServiceConfig from the environment via
ServiceConfig.from_env(). Every value is optional; the defaults run the offline,
deterministic path with no API key. The full variable table lives in
Configuration; the most-used:
| Field | Env var | Default |
|---|---|---|
db_path | RAGSPINE_DB_PATH | data/fact_metric.db |
chunk_db_path | RAGSPINE_CHUNK_DB_PATH | None |
provider_type | RAGSPINE_PROVIDER | mock (mock | anthropic) |
redis_url | RAGSPINE_REDIS_URL | redis://localhost:6379/0 |
faq_source | RAGSPINE_FAQ_SOURCE | None (→ empty cache) |
allowed_upload_root | RAGSPINE_ALLOWED_UPLOAD_ROOT | None |
config.py also provides build_provider(config) (mock → MockProvider, anthropic →
AnthropicProvider), the context managers open_fact_store(config) and
open_narrative_retriever(config, provider), and validate_ingest_path(path, config, *, suffixes) — which enforces the allowed_upload_root containment (rejecting path traversal)
and a suffix whitelist, raising PathNotAllowedError on violation.
FastAPI app — factory + dependency injection
api/app.py exposes an app factory:
def create_app(
config: ServiceConfig | None = None,
*,
provider: LLMProvider | None = None,
queue: TaskQueue | None = None,
faq_cache: FAQCache | None = None,
) -> FastAPI: ...Anything not passed is built from config (ServiceConfig.from_env(), build_provider,
RQQueue(config.redis_url), and FAQCache.from_file(config.faq_source) or
FAQCache.empty()). The assembled instances are stored on app.state, and api/dependencies.py
exposes them as FastAPI dependencies (get_config, get_provider, get_queue,
get_faq_cache) — overridable via app.dependency_overrides in tests. routes.py wires
them as Annotated aliases (ConfigDep, ProviderDep, QueueDep, FAQCacheDep).
Endpoints
All routes live on a module-level router in api/routes.py:
Prop
Type
Request/response models (api/schemas.py, Pydantic v2) include AskRequest(question, reference_date), AskResponse(request_id, answer, route, answer_kind, clarification, sources, tool_status_summary, cache), the two ingest-job request models, JobSubmitResponse(job_id),
and JobStatusResponse(id, status, result, error).
The two ingest routes only accept whitelisted suffixes — structured: .xlsx / .xlsm / .pptx / .pdf; narrative: .pptx / .pdf.
Task queue — the TaskQueue Protocol
tasks/task_queue.py defines the queue seam:
class TaskQueue(Protocol):
def enqueue(self, func_path: str, payload: dict, *, job_id=None,
timeout=None, max_retries=0, result_ttl=None,
failure_ttl=None) -> str: ...
def get(self, job_id: str) -> JobStatus | None: ...FakeQueue
Synchronous, in-memory. enqueue runs the job inline (failures recorded as JOB_FAILED, not re-raised). Used in tests and offline demos — no Redis needed.
RQQueue
Production queue over RQ + Redis (both lazy-imported). enqueue maps to rq.Queue.enqueue; get fetches via rq.job.Job.fetch and maps status. Adds ping() for /readyz.
Status constants: JOB_QUEUED, JOB_STARTED, JOB_FINISHED, JOB_FAILED. A JobStatus
carries id, status, result, error (error shape {type, message, stage, retryable}).
Ingestion jobs — worker-owned stores
tasks/jobs.py holds the two job functions the queue resolves by dotted path
(run_structured_ingest_job, run_narrative_ingest_job). Each one is self-contained:
it builds a ServiceConfig from its payload, defensively re-validates the file path(s) (a
violation raises JobError(stage="validation")), opens its own SQLite stores, runs the
ingest, and closes everything in finally.
Jobs never reuse the caller's database connections — they own their stores so they can run
out-of-process in a worker. Their report serializers (ingest_report_to_dict,
narrative_report_to_dict) emit counts and statuses only, never raw fact values or chunk text.
FAQ short-circuit cache
faq/faq_cache.py is the SME-vetted Q→A cache that can bypass the LLM entirely. FAQCache
builds a normalized index over questions and aliases (NFKC + casefold + whitespace fold +
trailing-punctuation strip); construct it with FAQCache.from_file(path) or
FAQCache.empty(). The core method is a pure function:
def lookup(self, question: str, *, reference_date=None) -> FAQHit | None: ...Its exclusion rules are front-loaded — any match returns None (a miss) — in this order:
Out-of-scope / competitor — clarify_scope mode is out_of_scope_entity, or
intent.external_entity is set.
Structured-numeric — intent.route == "structured", or any of metric / entity / period
(single or multi-value) is present.
Real-time — the normalized question contains a real-time cue (e.g. time words, “股价”, “price now”).
Expired / disabled — the item is disabled or outside its [valid_from, valid_until] window.
RESTRICTED.Only when none of these fire does lookup return a FAQHit (item_id, version,
answer, source, cache_type="faq"). In routes.py, /v1/ask runs the FAQ lookup
first; a hit returns immediately with route="faq" and never touches the provider, fact
store, or retriever — so it sits cleanly in front of the anti-fabrication guard, and its
conservative exclusions are what keep that safe.
Run it
RAGSPINE_DB_PATH=data/fact_metric.db \
RAGSPINE_CHUNK_DB_PATH=data/narrative.db \
.venv/bin/python scripts/run_server.py --host 0.0.0.0 --port 8000# needs Redis; --queue defaults to "ragspine-ingest" (matches RQQueue)
RAGSPINE_REDIS_URL=redis://localhost:6379/0 .venv/bin/python scripts/run_worker.pyBoth require the [service] extra (pip install -e ".[service]"). The server-side enqueue
queue name and the worker-side consume queue name share the same default
(ragspine-ingest), so they agree out of the box.
See also
Evaluation
The QA and extraction evaluation harnesses — four-gate QA metrics, per-channel extraction accuracy, and a baseline regression gate that ratchets up and never silently down.
Pipeline
Pipeline-topology export — derive a static PipelineGraph from RAGSpine's real wiring and render it as Mermaid, DOT, or JSON, via the agent / retriever / service builders and the topology.py CLI.