Skip to content

API Reference

Auto-generated from source docstrings.

Core

layerclass

layerclass(cls)

Converts a plain class into a layered configuration object.

The decorated class gains full support for deterministic multi-source config merging: load values from files, environment variables, and remote stores; merge them in priority order; resolve ${variable} references; validate by category; and freeze the result for safe sharing across threads.

Methods added to the class

layer(other, rules=None): Merge another config on top. Nested @layerclass fields are recursively merged. rules is a {field_name: LayerRule} dict controlling merge strategy (OVERRIDE, PRESERVE, MERGE, APPEND). validate(categories=None, fields=None): Run validation rules. Pass a list of category names to run only those; None runs bare (uncategorized) rules only; "*" runs everything. resolve(): Resolve all ${field_name} interpolations in-place. copy(): Deep copy the config instance. to_dict(redact=False, by_alias=False): Export as a plain dict. explain(full_history=False, redact=True): Structured info about current values, sources, and types—great for debugging. diff(other): Compare two configs; returns a list of changed fields. freeze() / frozen: Prevent further mutation of field values. json_schema(): Generate a JSON Schema dict from field definitions. get(field, default=None): Dot-notation field access with fallback. set(field, value, strict=False, source="set()"): Dot-notation setter with optional immediate single-field validation.

Class-level attributes added

_field_defs: {name: FieldDef} schema from field() declarations. _sources: Per-instance {name: SourceHistory} tracking provenance. _computed_fields: {name: fn} for @computed_field methods.

Example

@layerclass class DatabaseConfig: host: str = field(str, default="localhost", description="DB host") port: int = field(int, default=5432, server=[require, is_port]) url: str = field(str, default="${host}:${port}")

@computed_field
def dsn(self) -> str:
    return f"postgresql://{self.host}:{self.port}/mydb"
Source code in src/layer/core.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
def layerclass(cls):
    """Converts a plain class into a layered configuration object.

    The decorated class gains full support for deterministic multi-source
    config merging: load values from files, environment variables, and remote
    stores; merge them in priority order; resolve ``${variable}`` references;
    validate by category; and freeze the result for safe sharing across threads.

    Methods added to the class:
        layer(other, rules=None): Merge another config on top. Nested
            ``@layerclass`` fields are recursively merged. ``rules`` is a
            ``{field_name: LayerRule}`` dict controlling merge strategy
            (OVERRIDE, PRESERVE, MERGE, APPEND).
        validate(categories=None, fields=None): Run validation rules.
            Pass a list of category names to run only those; ``None`` runs
            bare (uncategorized) rules only; ``"*"`` runs everything.
        resolve(): Resolve all ``${field_name}`` interpolations in-place.
        copy(): Deep copy the config instance.
        to_dict(redact=False, by_alias=False): Export as a plain dict.
        explain(full_history=False, redact=True): Structured info about
            current values, sources, and types—great for debugging.
        diff(other): Compare two configs; returns a list of changed fields.
        freeze() / frozen: Prevent further mutation of field values.
        json_schema(): Generate a JSON Schema dict from field definitions.
        get(field, default=None): Dot-notation field access with fallback.
        set(field, value, strict=False, source="set()"): Dot-notation setter
            with optional immediate single-field validation.

    Class-level attributes added:
        _field_defs: ``{name: FieldDef}`` schema from ``field()`` declarations.
        _sources: Per-instance ``{name: SourceHistory}`` tracking provenance.
        _computed_fields: ``{name: fn}`` for ``@computed_field`` methods.

    Example:
        @layerclass
        class DatabaseConfig:
            host: str = field(str, default="localhost", description="DB host")
            port: int = field(int, default=5432, server=[require, is_port])
            url: str = field(str, default="${host}:${port}")

            @computed_field
            def dsn(self) -> str:
                return f"postgresql://{self.host}:{self.port}/mydb"
    """

    # Single pass: harvest FieldDefs, @parser, @validator, @root_validator, @computed_field
    field_defs = {}
    parsers = {}  # field_name -> [fn, ...]
    method_validators = []  # [(field_names_tuple, categories_list, fn), ...]
    root_validators = []  # [(categories_list, fn), ...]
    computed_fields = {}  # attr_name -> fn

    for attr_name in list(cls.__dict__.keys()):
        attr_value = cls.__dict__[attr_name]
        if isinstance(attr_value, FieldDef):
            field_defs[attr_name] = attr_value
            delattr(cls, attr_name)
        elif callable(attr_value):
            if hasattr(attr_value, "_layer_computed"):
                computed_fields[attr_name] = attr_value
                # Replace with a property so it evaluates dynamically
                setattr(cls, attr_name, property(attr_value))
            elif hasattr(attr_value, "_layer_parser_fields"):
                for fname in attr_value._layer_parser_fields:
                    parsers.setdefault(fname, []).append(attr_value)
                delattr(cls, attr_name)
            elif hasattr(attr_value, "_layer_validator_fields"):
                method_validators.append(
                    (
                        attr_value._layer_validator_fields,
                        attr_value._layer_validator_categories,
                        attr_value,
                    )
                )
                delattr(cls, attr_name)
            elif hasattr(attr_value, "_layer_root_validator"):
                root_validators.append(
                    (
                        attr_value._layer_validator_categories,
                        attr_value,
                    )
                )
                delattr(cls, attr_name)

    class WrappedConfig(cls):
        # Class-level attributes — accessible without instantiation
        _field_defs = field_defs
        _is_layerclass_marker = True
        _is_layer_obj_marker = True  # backward compat
        _parsers = parsers
        _method_validators = method_validators
        _root_validators = root_validators
        _computed_fields = computed_fields

        def __init__(self, **kwargs):
            self._sources = defaultdict(SourceHistory)
            self._frozen = False

            # Initialize with defaults
            for name, fdef in self._field_defs.items():
                if _is_layer_obj(fdef.type_hint):
                    # Nested @layer_obj: create a default instance
                    setattr(
                        self,
                        name,
                        fdef.default.copy() if fdef.default is not None else fdef.type_hint(),
                    )
                else:
                    setattr(self, name, fdef.default)
                self._sources[name].push("default", fdef.default)

            # Apply kwargs (usually from solidify)
            for k, v in kwargs.items():
                if k in self._field_defs:
                    setattr(self, k, v)

        def __setattr__(self, name, value):
            # Allow internal attributes
            if name.startswith("_"):
                super().__setattr__(name, value)
                return
            # Guard computed fields
            if name in type(self)._computed_fields:
                raise AttributeError(f"Cannot set computed field '{name}'")
            # Check frozen
            if hasattr(self, "_frozen") and self._frozen and name in self._field_defs:
                raise AttributeError(f"Cannot modify '{name}': config is frozen")
            super().__setattr__(name, value)

        def layer(self, other: "WrappedConfig", rules=None):
            """Merge another config on top of this one.

            For nested @layer_obj fields, recursively layers the sub-config.
            """
            rules = rules or {}
            for name in self._field_defs:
                other_source = other._sources[name].current if name in other._sources else "default"

                # Skip fields that weren't explicitly set in 'other'
                if other_source == "default":
                    continue

                other_val = getattr(other, name, None)
                base_val = getattr(self, name, None)
                rule = rules.get(name, LayerRule.OVERRIDE)

                # Nested layer_obj: recurse
                fdef = self._field_defs[name]
                if (
                    _is_layer_obj(fdef.type_hint)
                    and _is_layer_obj(base_val)
                    and _is_layer_obj(other_val)
                ):
                    nested_rules = rules.get(name) if isinstance(rules.get(name), dict) else None
                    base_val.layer(other_val, rules=nested_rules)
                    # Merge source info: mark as the other source since it was touched
                    self._sources[name].push(other_source, getattr(self, name).copy())
                    continue

                if callable(rule) and not isinstance(rule, LayerRule):
                    setattr(self, name, rule(base_val, other_val))
                elif rule == LayerRule.PRESERVE:
                    continue
                elif rule == LayerRule.MERGE:
                    if isinstance(base_val, dict) and isinstance(other_val, dict):
                        setattr(self, name, {**base_val, **other_val})
                    else:
                        setattr(self, name, other_val)
                elif rule == LayerRule.APPEND:
                    if isinstance(base_val, list) and isinstance(other_val, list):
                        setattr(self, name, base_val + other_val)
                    else:
                        setattr(self, name, other_val)
                else:  # OVERRIDE (default)
                    setattr(self, name, other_val)

                self._sources[name].push(other_source, getattr(self, name))

                # TODO: changes with the source change

            return self

        def resolve(self):
            """Resolve all ${...} variable interpolations in-place."""
            resolve_all(self)
            return self

        def source_of(self, field_name: str) -> str:
            """Return the current (most recent) source for a field."""
            return self._sources[field_name].current

        def source_history_of(self, field_name: str) -> list:
            """Return full source history for a field."""
            return self._sources[field_name].entries

        def validate(self, categories=None, fields=None):
            """Run validation rules for specific categories and/or fields.

            Args:
                categories: List of category names, "*" or ["*"] for all, None for bare only.
                fields: Optional list of field names to limit validation to.

            Returns:
                ValidationResult with errors (if any).
            """
            errors = []

            # Normalize categories
            if categories == "*":
                categories = ["*"]
            check_all = categories == ["*"] if categories else False
            cats_to_check = set(categories or [])

            for name, fdef in self._field_defs.items():
                # Field filter
                if fields is not None and name not in fields:
                    continue

                val = getattr(self, name)

                # Nested: recurse validation
                if _is_layer_obj(fdef.type_hint) and _is_layer_obj(val):
                    nested_result = val.validate(categories, fields=None)
                    for err in nested_result.errors:
                        # Prefix the field name for clarity
                        err.field = f"{name}.{err.field}"
                        errors.append(err)
                    continue

                # Collect rules to run
                rules_to_run = [("bare", r) for r in fdef.categories.get("_bare", [])]
                for cat, cat_rules in fdef.categories.items():
                    if cat == "_bare":
                        continue
                    if check_all or cat in cats_to_check:
                        rules_to_run.extend([(cat, r) for r in cat_rules])  # type: ignore[arg-type]

                # Execute rules
                for cat_name, rule in rules_to_run:
                    try:
                        rule(val, name, self)
                    except ValidationError as e:
                        e.category = cat_name
                        errors.append(e)

            # Phase 2: @validator methods
            for field_names, validator_cats, fn in self._method_validators:
                should_run = (
                    not validator_cats or check_all or bool(cats_to_check & set(validator_cats))
                )
                if not should_run:
                    continue
                for fname in field_names:
                    if fields is not None and fname not in fields:
                        continue
                    try:
                        fn(self, fname, getattr(self, fname))
                    except ValidationError as e:
                        errors.append(e)

            # Phase 3: @root_validator methods
            from .exceptions import ConfigError as _ConfigError

            for validator_cats, fn in self._root_validators:
                should_run = (
                    not validator_cats or check_all or bool(cats_to_check & set(validator_cats))
                )
                if not should_run:
                    continue
                try:
                    fn(self)
                except ValidationError as e:
                    errors.append(e)
                except _ConfigError as e:
                    errors.append(
                        ValidationError("__root__", str(e), fn.__name__, "root_validator")
                    )

            return ValidationResult(errors)

        def explain(self, full_history=False, redact: bool = True):
            info = []
            for name, fdef in self._field_defs.items():
                val = getattr(self, name)

                if _is_layer_obj(fdef.type_hint) and _is_layer_obj(val):
                    nested_info = val.explain(full_history=full_history)
                    for item in nested_info:
                        item["field"] = f"{name}.{item['field']}"
                    info.extend(nested_info)
                    continue

                categories = [c for c in fdef.categories.keys() if c != "_bare"]
                entry = {
                    "field": name,
                    "value": _maybe_redact(val, fdef, redact),
                    "source": self._sources[name].current,
                    "type": fdef.type_hint.__name__
                    if hasattr(fdef.type_hint, "__name__")
                    else str(fdef.type_hint),
                    "default": fdef.default,
                    "description": fdef.description,
                    "categories": categories,
                }
                if full_history:
                    entry["history"] = [
                        {
                            "source": e.source,
                            "value": _maybe_redact(e.value, fdef, redact),
                        }
                        for e in self._sources[name].entries
                    ]
                info.append(entry)
            # Computed fields
            import typing as _typing

            for name, fn in type(self)._computed_fields.items():
                return_type = _typing.get_type_hints(fn).get("return", _typing.Any)
                info.append(
                    {
                        "field": name,
                        "value": getattr(self, name),
                        "source": "computed",
                        "type": getattr(return_type, "__name__", str(return_type)),
                        "default": None,
                        "description": fn.__doc__,
                    }
                )
            return info

        def get(self, field_name: str, default: Any = None) -> Any:
            """Get a field value by name, with optional fallback.

            Supports dot-notation for nested configs: config.get("database.host")
            """
            if "." in field_name:
                parts = field_name.split(".", 1)
                nested = getattr(self, parts[0], None)
                if nested is not None and _is_layer_obj(nested):
                    return nested.get(parts[1], default)
                return default

            if field_name in self._field_defs:
                return getattr(self, field_name, default)
            return default

        def set(
            self,
            field_name: str,
            value: Any,
            strict: bool = False,
            source: str = "set()",
        ) -> None:
            """Set a field value with optional single-field validation.

            Args:
                field_name: Name of the field (supports dot-notation for nested).
                value: The value to set.
                strict: If True, run this field's validation rules immediately after
                        setting. Raises ValidationError on failure.
                source: Source tag for tracking (default "set()").

            Raises:
                AttributeError: If field doesn't exist or config is frozen.
                ValidationError: If strict=True and validation fails.
            """
            # Dot-notation for nested
            if "." in field_name:
                parts = field_name.split(".", 1)
                nested = getattr(self, parts[0], None)
                if nested is not None and _is_layer_obj(nested):
                    nested.set(parts[1], value, strict=strict, source=source)
                    return
                raise AttributeError(
                    f"Cannot set '{field_name}': '{parts[0]}' is not a nested config"
                )

            if field_name not in self._field_defs:
                raise AttributeError(f"Unknown field: '{field_name}'")

            fdef = self._field_defs[field_name]

            # Apply before_coerce parsers
            for parse_fn in type(self)._parsers.get(field_name, []):
                if getattr(parse_fn, "_layer_parser_before_coerce", False):
                    value = parse_fn(self, value)

            # Type coerce if the value is a string and the target isn't
            if isinstance(value, str) and fdef.type_hint is not str:
                from .solidify import _coerce

                value = _coerce(value, fdef.type_hint, parser=fdef.parser)

            # Apply @parser methods (after coercion, before write)
            for parse_fn in type(self)._parsers.get(field_name, []):
                if not getattr(parse_fn, "_layer_parser_before_coerce", False):
                    value = parse_fn(self, value)

            setattr(self, field_name, value)
            self._sources[field_name].push(source, value)

            # Single-field validation if strict
            if strict:
                result = self.validate(categories="*", fields=[field_name])
                result.raise_if_invalid()

        def copy(self):
            """Deep copy the config object."""
            new = self.__class__()
            for name, fdef in self._field_defs.items():
                val = getattr(self, name)
                if _is_layer_obj(fdef.type_hint) and _is_layer_obj(val):
                    setattr(new, name, val.copy())
                else:
                    setattr(new, name, deepcopy(val))
            new._sources = {k: deepcopy(v) for k, v in self._sources.items()}
            return new

        def to_dict(self, redact=False, by_alias=False):
            """Export current values as a plain dict. Recursively converts nested @layer_obj.

            Args:
                redact: If True, replace secret field values with "***".
                by_alias: If True, use each field's alias as the output key (falls back
                    to the field name when no alias is defined).

            Design note:
            to_dict() defaults to redact=False because it's often used for serialization
            back to disk.
            explain() defaults to redact=True because it's primarily a debugging/logging
            tool. The caller can always override.
            """
            result = {}
            for name, fdef in self._field_defs.items():
                out_key = (fdef.alias or name) if by_alias else name
                val = getattr(self, name)
                if _is_layer_obj(fdef.type_hint) and _is_layer_obj(val):
                    result[out_key] = val.to_dict(redact=redact, by_alias=by_alias)
                else:
                    val = _maybe_redact(val, fdef, redact)
                    if dataclasses.is_dataclass(val) and not isinstance(val, type):
                        result[out_key] = dataclasses.asdict(val)
                    elif hasattr(val, "model_dump"):  # Pydantic v2
                        result[out_key] = val.model_dump()
                    else:
                        result[out_key] = val
            # Computed fields — always included, never redacted
            for name in type(self)._computed_fields:
                result[name] = getattr(self, name)
            return result

        def diff(self, other, redact=True):
            """Compare this config with another, returning a list of differences.

            Args:
                other: Another config instance of the same type.

            Returns:
                List of dicts with field, old_value, new_value, old_source, new_source
                for each field that differs.
            """
            diffs = []
            for name, fdef in self._field_defs.items():
                self_val = getattr(self, name)
                other_val = getattr(other, name, None)

                if (
                    _is_layer_obj(fdef.type_hint)
                    and _is_layer_obj(self_val)
                    and _is_layer_obj(other_val)
                ):
                    nested_diffs = self_val.diff(other_val)
                    for d in nested_diffs:
                        d["field"] = f"{name}.{d['field']}"
                    diffs.extend(nested_diffs)
                    continue

                if self_val != other_val:
                    diffs.append(
                        {
                            "field": name,
                            "old_value": _maybe_redact(self_val, fdef, redact),
                            "new_value": _maybe_redact(other_val, fdef, redact),
                            "old_source": self._sources[name].current
                            if name in self._sources
                            else "unknown",
                            "new_source": other._sources[name].current
                            if name in other._sources
                            else "unknown",
                        }
                    )
            return diffs

        def freeze(self):
            """Freeze the config, preventing further mutation of field values.

            After calling freeze(), any attempt to set a field value will raise
            AttributeError. Internal attributes (prefixed with _) are unaffected.
            """
            # Recursively freeze nested configs
            for name, fdef in self._field_defs.items():
                val = getattr(self, name)
                if _is_layer_obj(fdef.type_hint) and _is_layer_obj(val):
                    val.freeze()
            self._frozen = True

        def _unfreeze_deep(self):
            """Recursively unfreeze this config and all nested layer_obj children."""
            self._frozen = False
            for name, fdef in self._field_defs.items():
                val = getattr(self, name)
                if _is_layer_obj(fdef.type_hint) and _is_layer_obj(val):
                    val._unfreeze_deep()

        @property
        def frozen(self):
            """Whether this config is frozen."""
            return self._frozen

        @classmethod
        def json_schema(cls):
            """Generate a JSON Schema dict from the field definitions.

            Handles nested @layer_obj types recursively. Includes description,
            default, and enum constraints from one_of validators.

            Returns:
                A dict conforming to JSON Schema draft-07.
            """
            _TYPE_MAP = {
                str: "string",
                int: "integer",
                float: "number",
                bool: "boolean",
                list: "array",
                dict: "object",
            }

            properties = {}
            required = []

            for name, fdef in cls._field_defs.items():
                # Nested @layer_obj
                if _is_layer_obj(fdef.type_hint):
                    prop = fdef.type_hint.json_schema()
                else:
                    json_type = _TYPE_MAP.get(fdef.type_hint, "string")
                    prop = {"type": json_type}

                # Description
                if fdef.description:
                    prop["description"] = fdef.description

                # Default
                if fdef.default is not None:
                    prop["default"] = fdef.default

                # Extract enum from one_of validators
                for cat_rules in fdef.categories.values():
                    for rule in cat_rules:
                        # one_of returns a closure named _one_of with __closure__
                        # containing the values
                        if hasattr(rule, "__name__") and rule.__name__ == "_one_of":
                            if rule.__closure__ and len(rule.__closure__) > 0:
                                allowed = rule.__closure__[0].cell_contents
                                if isinstance(allowed, tuple):
                                    prop["enum"] = list(allowed)

                # Extract in_range for minimum/maximum
                for cat_rules in fdef.categories.values():
                    for rule in cat_rules:
                        if hasattr(rule, "__name__") and rule.__name__ == "_in_range":
                            if rule.__closure__ and len(rule.__closure__) >= 2:
                                cells = [c.cell_contents for c in rule.__closure__]
                                # Closure ordering is alphabetical (hi, lo) — use min/max
                                numeric = [c for c in cells if isinstance(c, (int, float))]
                                if len(numeric) >= 2:
                                    prop["minimum"] = min(numeric)
                                    prop["maximum"] = max(numeric)

                # Check if required in any category
                for cat_rules in fdef.categories.values():
                    for rule in cat_rules:
                        if hasattr(rule, "__name__") and rule.__name__ == "require":
                            if name not in required:
                                required.append(name)

                properties[name] = prop

            schema = {
                "$schema": "http://json-schema.org/draft-07/schema#",
                "type": "object",
                "properties": properties,
            }
            if required:
                schema["required"] = required

            # Add title from class name
            schema["title"] = cls.__name__

            return schema

    WrappedConfig.__name__ = cls.__name__
    WrappedConfig.__qualname__ = cls.__qualname__
    return WrappedConfig

field

field(
    type_hint: type,
    *uncategorized_rules,
    default: Any = None,
    meta: dict[str, Any] = None,
    description: str = None,
    secret: bool = False,
    parser: Any = None,
    alias: str = None,
    aliases: list[str] = None,
    env: str = None,
    reloadable: bool = True,
    **category_rules,
) -> Any

Declares a configuration field with optional validation rules.

Parameters:

Name Type Description Default
type_hint type

The expected type of the field.

required
*uncategorized_rules

Validators that always run (bare rules).

()
default Any

Default value for the field.

None
meta dict[str, Any]

Arbitrary metadata dict (e.g. {"cli_option": click.option(...)}).

None
description str

Human-readable description of the field.

None
alias str

Alternate name used when loading from dicts/JSON/YAML (e.g. "apiKey").

None
aliases list[str]

Additional fallback names tried in order after alias.

None
env str

Explicit environment variable name, overrides PREFIX_FIELD_NAME convention.

None
**category_rules

Named categories mapping to lists of validators. e.g. cluster=[require], common=[one_of("json", "yaml")]

{}

Returns:

Type Description
Any

A FieldDef instance (replaced by the default value after @layer_obj processes it).

Source code in src/layer/core.py
def field(
    type_hint: type,
    *uncategorized_rules,
    default: Any = None,
    meta: dict[str, Any] = None,
    description: str = None,
    secret: bool = False,
    parser: Any = None,
    alias: str = None,
    aliases: list[str] = None,
    env: str = None,
    reloadable: bool = True,
    **category_rules,
) -> Any:
    """Declares a configuration field with optional validation rules.

    Args:
        type_hint: The expected type of the field.
        *uncategorized_rules: Validators that always run (bare rules).
        default: Default value for the field.
        meta: Arbitrary metadata dict (e.g. {"cli_option": click.option(...)}).
        description: Human-readable description of the field.
        alias: Alternate name used when loading from dicts/JSON/YAML (e.g. "apiKey").
        aliases: Additional fallback names tried in order after alias.
        env: Explicit environment variable name, overrides PREFIX_FIELD_NAME convention.
        **category_rules: Named categories mapping to lists of validators.
            e.g. cluster=[require], common=[one_of("json", "yaml")]

    Returns:
        A FieldDef instance (replaced by the default value after @layer_obj processes it).
    """
    categories = {"_bare": list(uncategorized_rules)}
    categories.update(category_rules)
    return FieldDef(
        type_hint,
        default,
        categories,
        meta,
        description,
        secret=secret,
        parser=parser,
        alias=alias,
        aliases=aliases,
        env=env,
        reloadable=reloadable,
    )

FieldDef

Metadata about a single configuration field.

Source code in src/layer/core.py
class FieldDef:
    """Metadata about a single configuration field."""

    def __init__(
        self,
        type_hint: type,
        default: Any = None,
        categories: dict[str, list] = None,
        meta: dict[str, Any] = None,
        description: str = None,
        secret: bool = False,
        parser: Any = None,
        alias: str = None,
        aliases: list[str] = None,
        env: str = None,
        reloadable: bool = True,
    ):
        self.type_hint = type_hint
        self.default = default
        self.categories = categories or {}
        self.meta = meta or {}
        self.description = description
        self.secret = secret
        self.parser = parser
        self.alias = alias
        self.aliases = aliases or []
        self.env = env
        self.reloadable = reloadable

computed_field

computed_field(fn)

Marks a method as a computed (read-only, dynamic) field.

The decorated method is exposed as a property evaluated on each access and is automatically included in to_dict() and explain(). Attempting to assign a value to a computed field raises AttributeError.

Parameters:

Name Type Description Default
fn

The method to promote to a computed field. Must accept only self and should include a return-type annotation so explain() can report the type.

required

Returns:

Type Description

The same callable with _layer_computed = True set; the

@layerclass decorator later replaces it with a property.

Example

@layerclass class ServiceConfig: timeout_base: int = field(int, default=10) retry_count: int = field(int, default=3)

@computed_field
def total_timeout(self) -> int:
    """Total max wait across all retries."""
    return self.timeout_base * self.retry_count
Source code in src/layer/core.py
def computed_field(fn):
    """Marks a method as a computed (read-only, dynamic) field.

    The decorated method is exposed as a property evaluated on each access and
    is automatically included in ``to_dict()`` and ``explain()``. Attempting to
    assign a value to a computed field raises ``AttributeError``.

    Args:
        fn: The method to promote to a computed field. Must accept only ``self``
            and should include a return-type annotation so ``explain()`` can
            report the type.

    Returns:
        The same callable with ``_layer_computed = True`` set; the
        ``@layerclass`` decorator later replaces it with a ``property``.

    Example:
        @layerclass
        class ServiceConfig:
            timeout_base: int = field(int, default=10)
            retry_count: int = field(int, default=3)

            @computed_field
            def total_timeout(self) -> int:
                \"\"\"Total max wait across all retries.\"\"\"
                return self.timeout_base * self.retry_count
    """
    fn._layer_computed = True
    return fn

parser

parser(*field_names, before_coerce=False)

Marks a method as a data parser for the specified field(s).

By default, the method is called after type coercion but before the value is written to the field. If before_coerce=True is provided, it runs before the value is coerced by the type resolution engine.

The method receives the current value and must return the transformed value. It runs during solidify(), solidify_env(), and set().

Usage

@parser("endpoint") def _clean_endpoint(self, value: str) -> str: return value.strip().rstrip("/")

@parser("status", before_coerce=True) def _parse_status(self, value: Any) -> str: if isinstance(value, dict) and "status" in value: return value["status"] return value

Source code in src/layer/core.py
def parser(*field_names, before_coerce=False):
    """Marks a method as a data parser for the specified field(s).

    By default, the method is called after type coercion but before the value is
    written to the field. If `before_coerce=True` is provided, it runs before
    the value is coerced by the type resolution engine.

    The method receives the current value and must return the transformed value.
    It runs during solidify(), solidify_env(), and set().

    Usage:
        @parser("endpoint")
        def _clean_endpoint(self, value: str) -> str:
            return value.strip().rstrip("/")

        @parser("status", before_coerce=True)
        def _parse_status(self, value: Any) -> str:
            if isinstance(value, dict) and "status" in value:
                return value["status"]
            return value
    """

    def decorator(fn):
        fn._layer_parser_fields = field_names
        fn._layer_parser_before_coerce = before_coerce
        return fn

    return decorator

validator

validator(*field_names, categories=None)

Marks a method as a stateful validator for the specified field(s).

Called once per listed field during validate(). The method receives (self, field_name, value) and should raise ValidationError if invalid. If categories is omitted the validator runs on every validate() call (bare).

Usage

@validator("cert_path", "key_path") def _files_exist(self, field_name, value): if value and not os.path.exists(value): raise ValidationError(field_name, "File not found", "path_check", "bare")

@validator("cert_path", categories=["production"]) def _certs_match(self, field_name, value): ...

Source code in src/layer/core.py
def validator(*field_names, categories=None):
    """Marks a method as a stateful validator for the specified field(s).

    Called once per listed field during validate(). The method receives
    (self, field_name, value) and should raise ValidationError if invalid.
    If categories is omitted the validator runs on every validate() call (bare).

    Usage:
        @validator("cert_path", "key_path")
        def _files_exist(self, field_name, value):
            if value and not os.path.exists(value):
                raise ValidationError(field_name, "File not found", "path_check", "bare")

        @validator("cert_path", categories=["production"])
        def _certs_match(self, field_name, value):
            ...
    """

    def decorator(fn):
        fn._layer_validator_fields = field_names
        fn._layer_validator_categories = list(categories or [])
        return fn

    return decorator

root_validator

root_validator(categories=None)

Marks a method as a cross-field (root) validator.

Called at the end of validate() with no arguments besides self. Should raise ConfigError (or ValidationError) if the overall state is invalid. If categories is omitted the validator runs on every validate() call.

Usage

@root_validator(categories=["database"]) def _check_connection(self): if self.dsn and self.host: raise ConfigError("Cannot specify both 'dsn' and 'host'.")

Source code in src/layer/core.py
def root_validator(categories=None):
    """Marks a method as a cross-field (root) validator.

    Called at the end of validate() with no arguments besides self. Should
    raise ConfigError (or ValidationError) if the overall state is invalid.
    If categories is omitted the validator runs on every validate() call.

    Usage:
        @root_validator(categories=["database"])
        def _check_connection(self):
            if self.dsn and self.host:
                raise ConfigError("Cannot specify both 'dsn' and 'host'.")
    """

    def decorator(fn):
        fn._layer_root_validator = True
        fn._layer_validator_categories = list(categories or [])
        return fn

    return decorator

Pipeline

ConfigPipeline

Orchestrates multiple providers into a layered config instance.

Follows a strict separation of concerns:

  1. Loadload() ingests all providers, merges overlays (with optional LayerRule per provider), resolves ${variable} references, and freezes the live config. It never runs validation.
  2. Validate — call pipeline.validate(categories) explicitly after loading.
  3. Hot-reload — providers with watch=True trigger _reload() automatically when their source changes.
Example

pipeline = ( ConfigPipeline(AppConfig, mode=SolidifyMode.STRICT) .add_provider(FileProvider("base.yml")) .add_provider(EnvProvider("APP"), rules={"ports": LayerRule.APPEND}) ) config = pipeline.load() pipeline.validate(["server"]).raise_if_invalid()

Source code in src/layer/pipeline.py
class ConfigPipeline:
    """Orchestrates multiple providers into a layered config instance.

    Follows a strict separation of concerns:

    1. **Load** — ``load()`` ingests all providers, merges overlays (with optional
       ``LayerRule`` per provider), resolves ``${variable}`` references, and freezes
       the live config. It never runs validation.
    2. **Validate** — call ``pipeline.validate(categories)`` explicitly after loading.
    3. **Hot-reload** — providers with ``watch=True`` trigger ``_reload()`` automatically
       when their source changes.

    Example:
        pipeline = (
            ConfigPipeline(AppConfig, mode=SolidifyMode.STRICT)
            .add_provider(FileProvider("base.yml"))
            .add_provider(EnvProvider("APP"), rules={"ports": LayerRule.APPEND})
        )
        config = pipeline.load()
        pipeline.validate(["server"]).raise_if_invalid()
    """

    def __init__(
        self,
        target: Any,
        mode=None,
        observer=None,
        logger: logging.Logger = None,
    ):
        """Initialize the pipeline.

        Args:
            target: A ``@layerclass`` class or instance. If a class is given,
                a fresh instance is created as the live config.
            mode: Optional ``SolidifyMode`` applied to all ``solidify()`` calls
                inside ``load()`` and ``_build_shadow()``. Defaults to ``None``
                (legacy LAX-like behavior).
            observer: Optional ``BasePipelineObserver`` instance for lifecycle
                hooks. If ``None`` and ``logger`` is also ``None``, no events
                are emitted.
            logger: Optional ``logging.Logger``. When provided, a
                ``LoggerObserver`` is automatically created and used.
        """
        if isinstance(target, type):
            self._target_cls = target
            self._live = target()
        else:
            self._target_cls = type(target)
            self._live = target

        self._mode = mode
        self._providers: list = []
        self._reactors: dict[str, list[Callable]] = {}
        self._mutator: Callable | None = None
        self._lock = threading.Lock()
        self._watcher = None
        self._loaded = False

        if logger is not None:
            from .observers import LoggerObserver

            self._observer = LoggerObserver(logger)
        else:
            self._observer = observer  # May be None or a BasePipelineObserver

    @property
    def config(self) -> Any:
        """The live config instance."""
        return self._live

    def add_provider(self, provider, rules: dict = None) -> "ConfigPipeline":
        """Add a provider to the pipeline, optionally with per-field layering rules.

        Providers are applied in order during ``load()``. Later providers override
        values from earlier ones, subject to any ``LayerRule`` overrides.

        Args:
            provider: A ``BaseProvider`` instance (``FileProvider``, ``EnvProvider``,
                ``SSMProvider``, etc.).
            rules: Optional ``{field_name: LayerRule}`` dict controlling how each
                field from this provider is merged. Supports dot-notation for nested
                fields (e.g. ``{"database.ports": LayerRule.APPEND}``). Fields not
                listed use ``LayerRule.OVERRIDE`` (default).

        Returns:
            ``self`` for fluent chaining.

        Example:
            pipeline.add_provider(EnvProvider("APP"), rules={"ports": LayerRule.APPEND})
        """
        if hasattr(provider, "bind_schema"):
            provider.bind_schema(self._target_cls)
        self._providers.append((provider, rules or {}))
        return self

    def on_change(self, field_path: str, callback: Callable) -> "ConfigPipeline":
        """Register a callback for field changes during hot-reload.

        Args:
            field_path: Dot-separated field path (e.g. ``"database.host"``),
                or ``"*"`` to override the default mutator for all changes.
            callback: Called with ``(field, old_value, new_value, shadow_config)``.

        Returns:
            ``self`` for fluent chaining.
        """
        if field_path == "*":
            self._mutator = callback
        else:
            self._reactors.setdefault(field_path, []).append(callback)
        return self

    def load(self) -> Any:
        """Execute all providers in order, merging results onto the live config.

        The pipeline performs four operations in sequence:

        1. Read each provider and coerce its data via ``solidify()``.
        2. Layer each overlay onto the live config using the provider's rules.
        3. Resolve all ``${variable}`` interpolations.
        4. Freeze the live config to prevent accidental mutation.

        No validation is performed. Call ``pipeline.validate()`` separately.

        Returns:
            The frozen live config instance.
        """
        from .solidify import solidify

        for provider, rules in self._providers:
            data = provider.read()
            if self._observer:
                self._observer.on_provider_read(provider.source_name, data)
            if not data:
                continue
            overlay = solidify(data, self._target_cls, source=provider.source_name, mode=self._mode)
            self._live.layer(overlay, rules=rules)
            if self._observer:
                self._observer.on_layer_merged(provider.source_name, rules)

        self._live.resolve()
        self._live.freeze()
        self._loaded = True
        return self._live

    def validate(self, categories=None):
        """Run validation on the live config.

        This is the correct place to trigger validation — never inside ``load()``.

        Args:
            categories: Passed directly to ``config.validate(categories)``.
                ``None`` runs bare (uncategorized) rules only; ``"*"`` or
                ``["*"]`` runs all categories.

        Returns:
            A ``ValidationResult`` with ``.errors`` and ``.raise_if_invalid()``.
        """
        return self._live.validate(categories)

    def _build_shadow(self) -> Any:
        """Build a fresh config by re-running all providers.

        Does not touch the live config. Used internally by ``_reload()``.
        """
        from .solidify import solidify

        shadow = self._target_cls()
        for provider, rules in self._providers:
            data = provider.read()
            if not data:
                continue
            overlay = solidify(data, self._target_cls, source=provider.source_name, mode=self._mode)
            shadow.layer(overlay, rules=rules)
        shadow.resolve()
        shadow.freeze()
        return shadow

    def _reload(self):
        """Hot-reload: build shadow, diff, filter locked fields, fire reactors, apply mutator."""
        shadow = self._build_shadow()
        diffs = self._live.diff(shadow, redact=False)
        if not diffs:
            return

        if self._observer:
            self._observer.on_hot_reload_triggered(diffs)

        # Filter out non-reloadable fields
        filtered = []
        for d in diffs:
            fdef = _get_fdef_by_path(self._target_cls, d["field"])
            if fdef is not None and not fdef.reloadable:
                logging.warning("layer: Skipped hot-reload for locked field '%s'", d["field"])
                if self._observer:
                    self._observer.on_hot_reload_locked(d["field"])
                continue
            filtered.append(d)
        diffs = filtered

        if not diffs:
            return

        # Phase 1: Fire specific reactors
        for d in diffs:
            field_path = d["field"]
            for callback in self._reactors.get(field_path, []):
                callback(field_path, d["old_value"], d["new_value"], shadow)

        # Phase 2: Apply mutation
        if self._mutator:
            for d in diffs:
                self._mutator(d["field"], d["old_value"], d["new_value"], shadow)
        else:
            self._default_mutator(diffs, shadow)

    def _default_mutator(self, diffs, shadow):
        """Thread-safe default mutator: unfreeze, apply all diffs, freeze."""
        with self._lock:
            self._live._unfreeze_deep()
            for d in diffs:
                self._live.set(d["field"], d["new_value"], source="hot-reload")
            self._live.freeze()

    def start(self):
        """Start watching for changes from watchable providers.

        Requires watchdog: ``pip install layer[watch]``
        """
        watchable = [p for p, _ in self._providers if p.watchable]
        if not watchable:
            return self

        try:
            from watchdog.events import FileSystemEventHandler
            from watchdog.observers import Observer
        except ImportError:
            from .exceptions import MissingDependencyError

            raise MissingDependencyError(
                "watchdog is required for hot-reloading: pip install layer[watch]"
            )

        pipeline = self

        class _ReloadHandler(FileSystemEventHandler):
            def on_modified(self, event):
                if not event.is_directory:
                    pipeline._reload()

        self._watcher = Observer()
        for provider in watchable:
            import os

            watch_dir = os.path.dirname(os.path.abspath(provider._path))
            self._watcher.schedule(_ReloadHandler(), watch_dir, recursive=False)
        self._watcher.daemon = True
        self._watcher.start()
        return self

    def stop(self):
        """Stop the file watcher."""
        if self._watcher:
            self._watcher.stop()
            self._watcher.join()
            self._watcher = None

config property

config: Any

The live config instance.

__init__

__init__(
    target: Any,
    mode=None,
    observer=None,
    logger: Logger = None,
)

Initialize the pipeline.

Parameters:

Name Type Description Default
target Any

A @layerclass class or instance. If a class is given, a fresh instance is created as the live config.

required
mode

Optional SolidifyMode applied to all solidify() calls inside load() and _build_shadow(). Defaults to None (legacy LAX-like behavior).

None
observer

Optional BasePipelineObserver instance for lifecycle hooks. If None and logger is also None, no events are emitted.

None
logger Logger

Optional logging.Logger. When provided, a LoggerObserver is automatically created and used.

None
Source code in src/layer/pipeline.py
def __init__(
    self,
    target: Any,
    mode=None,
    observer=None,
    logger: logging.Logger = None,
):
    """Initialize the pipeline.

    Args:
        target: A ``@layerclass`` class or instance. If a class is given,
            a fresh instance is created as the live config.
        mode: Optional ``SolidifyMode`` applied to all ``solidify()`` calls
            inside ``load()`` and ``_build_shadow()``. Defaults to ``None``
            (legacy LAX-like behavior).
        observer: Optional ``BasePipelineObserver`` instance for lifecycle
            hooks. If ``None`` and ``logger`` is also ``None``, no events
            are emitted.
        logger: Optional ``logging.Logger``. When provided, a
            ``LoggerObserver`` is automatically created and used.
    """
    if isinstance(target, type):
        self._target_cls = target
        self._live = target()
    else:
        self._target_cls = type(target)
        self._live = target

    self._mode = mode
    self._providers: list = []
    self._reactors: dict[str, list[Callable]] = {}
    self._mutator: Callable | None = None
    self._lock = threading.Lock()
    self._watcher = None
    self._loaded = False

    if logger is not None:
        from .observers import LoggerObserver

        self._observer = LoggerObserver(logger)
    else:
        self._observer = observer  # May be None or a BasePipelineObserver

add_provider

add_provider(
    provider, rules: dict = None
) -> ConfigPipeline

Add a provider to the pipeline, optionally with per-field layering rules.

Providers are applied in order during load(). Later providers override values from earlier ones, subject to any LayerRule overrides.

Parameters:

Name Type Description Default
provider

A BaseProvider instance (FileProvider, EnvProvider, SSMProvider, etc.).

required
rules dict

Optional {field_name: LayerRule} dict controlling how each field from this provider is merged. Supports dot-notation for nested fields (e.g. {"database.ports": LayerRule.APPEND}). Fields not listed use LayerRule.OVERRIDE (default).

None

Returns:

Type Description
ConfigPipeline

self for fluent chaining.

Example

pipeline.add_provider(EnvProvider("APP"), rules={"ports": LayerRule.APPEND})

Source code in src/layer/pipeline.py
def add_provider(self, provider, rules: dict = None) -> "ConfigPipeline":
    """Add a provider to the pipeline, optionally with per-field layering rules.

    Providers are applied in order during ``load()``. Later providers override
    values from earlier ones, subject to any ``LayerRule`` overrides.

    Args:
        provider: A ``BaseProvider`` instance (``FileProvider``, ``EnvProvider``,
            ``SSMProvider``, etc.).
        rules: Optional ``{field_name: LayerRule}`` dict controlling how each
            field from this provider is merged. Supports dot-notation for nested
            fields (e.g. ``{"database.ports": LayerRule.APPEND}``). Fields not
            listed use ``LayerRule.OVERRIDE`` (default).

    Returns:
        ``self`` for fluent chaining.

    Example:
        pipeline.add_provider(EnvProvider("APP"), rules={"ports": LayerRule.APPEND})
    """
    if hasattr(provider, "bind_schema"):
        provider.bind_schema(self._target_cls)
    self._providers.append((provider, rules or {}))
    return self

on_change

on_change(
    field_path: str, callback: Callable
) -> ConfigPipeline

Register a callback for field changes during hot-reload.

Parameters:

Name Type Description Default
field_path str

Dot-separated field path (e.g. "database.host"), or "*" to override the default mutator for all changes.

required
callback Callable

Called with (field, old_value, new_value, shadow_config).

required

Returns:

Type Description
ConfigPipeline

self for fluent chaining.

Source code in src/layer/pipeline.py
def on_change(self, field_path: str, callback: Callable) -> "ConfigPipeline":
    """Register a callback for field changes during hot-reload.

    Args:
        field_path: Dot-separated field path (e.g. ``"database.host"``),
            or ``"*"`` to override the default mutator for all changes.
        callback: Called with ``(field, old_value, new_value, shadow_config)``.

    Returns:
        ``self`` for fluent chaining.
    """
    if field_path == "*":
        self._mutator = callback
    else:
        self._reactors.setdefault(field_path, []).append(callback)
    return self

load

load() -> Any

Execute all providers in order, merging results onto the live config.

The pipeline performs four operations in sequence:

  1. Read each provider and coerce its data via solidify().
  2. Layer each overlay onto the live config using the provider's rules.
  3. Resolve all ${variable} interpolations.
  4. Freeze the live config to prevent accidental mutation.

No validation is performed. Call pipeline.validate() separately.

Returns:

Type Description
Any

The frozen live config instance.

Source code in src/layer/pipeline.py
def load(self) -> Any:
    """Execute all providers in order, merging results onto the live config.

    The pipeline performs four operations in sequence:

    1. Read each provider and coerce its data via ``solidify()``.
    2. Layer each overlay onto the live config using the provider's rules.
    3. Resolve all ``${variable}`` interpolations.
    4. Freeze the live config to prevent accidental mutation.

    No validation is performed. Call ``pipeline.validate()`` separately.

    Returns:
        The frozen live config instance.
    """
    from .solidify import solidify

    for provider, rules in self._providers:
        data = provider.read()
        if self._observer:
            self._observer.on_provider_read(provider.source_name, data)
        if not data:
            continue
        overlay = solidify(data, self._target_cls, source=provider.source_name, mode=self._mode)
        self._live.layer(overlay, rules=rules)
        if self._observer:
            self._observer.on_layer_merged(provider.source_name, rules)

    self._live.resolve()
    self._live.freeze()
    self._loaded = True
    return self._live

validate

validate(categories=None)

Run validation on the live config.

This is the correct place to trigger validation — never inside load().

Parameters:

Name Type Description Default
categories

Passed directly to config.validate(categories). None runs bare (uncategorized) rules only; "*" or ["*"] runs all categories.

None

Returns:

Type Description

A ValidationResult with .errors and .raise_if_invalid().

Source code in src/layer/pipeline.py
def validate(self, categories=None):
    """Run validation on the live config.

    This is the correct place to trigger validation — never inside ``load()``.

    Args:
        categories: Passed directly to ``config.validate(categories)``.
            ``None`` runs bare (uncategorized) rules only; ``"*"`` or
            ``["*"]`` runs all categories.

    Returns:
        A ``ValidationResult`` with ``.errors`` and ``.raise_if_invalid()``.
    """
    return self._live.validate(categories)

start

start()

Start watching for changes from watchable providers.

Requires watchdog: pip install layer[watch]

Source code in src/layer/pipeline.py
def start(self):
    """Start watching for changes from watchable providers.

    Requires watchdog: ``pip install layer[watch]``
    """
    watchable = [p for p, _ in self._providers if p.watchable]
    if not watchable:
        return self

    try:
        from watchdog.events import FileSystemEventHandler
        from watchdog.observers import Observer
    except ImportError:
        from .exceptions import MissingDependencyError

        raise MissingDependencyError(
            "watchdog is required for hot-reloading: pip install layer[watch]"
        )

    pipeline = self

    class _ReloadHandler(FileSystemEventHandler):
        def on_modified(self, event):
            if not event.is_directory:
                pipeline._reload()

    self._watcher = Observer()
    for provider in watchable:
        import os

        watch_dir = os.path.dirname(os.path.abspath(provider._path))
        self._watcher.schedule(_ReloadHandler(), watch_dir, recursive=False)
    self._watcher.daemon = True
    self._watcher.start()
    return self

stop

stop()

Stop the file watcher.

Source code in src/layer/pipeline.py
def stop(self):
    """Stop the file watcher."""
    if self._watcher:
        self._watcher.stop()
        self._watcher.join()
        self._watcher = None

Solidification

solidify

solidify(
    data: dict[str, Any],
    target: type,
    source: str = "unknown",
    check: list[str] | None = None,
    strict: bool = False,
    coerce: bool = True,
    mode: Optional[SolidifyMode] = None,
)

Converts loose data (dict) into a typed config instance.

Parameters:

Name Type Description Default
data dict[str, Any]

Input data dict.

required
target type

Target @layer_obj class.

required
source str

Source tag for tracking (e.g. "config.yml", "cli").

'unknown'
check list[str] | None

If provided, validate these categories immediately after loading.

None
strict bool

If True, raise StructureError on unknown keys. (Legacy; prefer mode=)

False
coerce bool

If True, attempt type coercion based on field type hints. (Legacy; prefer mode=)

True
mode Optional[SolidifyMode]

SolidifyMode controlling strictness. Overrides strict/coerce when set. LAX — unknown keys ignored, coercion errors swallowed. STANDARD — unknown keys ignored, CoercionError bubbles. STRICT — unknown keys raise StructureError, no coercion.

None

Returns:

Type Description

An instance of target with values set from data.

Source code in src/layer/solidify.py
def solidify(
    data: dict[str, Any],
    target: type,
    source: str = "unknown",
    check: list[str] | None = None,
    strict: bool = False,
    coerce: bool = True,
    mode: Optional["SolidifyMode"] = None,
):
    """Converts loose data (dict) into a typed config instance.

    Args:
        data: Input data dict.
        target: Target @layer_obj class.
        source: Source tag for tracking (e.g. "config.yml", "cli").
        check: If provided, validate these categories immediately after loading.
        strict: If True, raise StructureError on unknown keys. (Legacy; prefer mode=)
        coerce: If True, attempt type coercion based on field type hints. (Legacy; prefer mode=)
        mode: SolidifyMode controlling strictness. Overrides strict/coerce when set.
            LAX — unknown keys ignored, coercion errors swallowed.
            STANDARD — unknown keys ignored, CoercionError bubbles.
            STRICT — unknown keys raise StructureError, no coercion.

    Returns:
        An instance of target with values set from data.
    """
    # mode takes precedence over legacy strict/coerce kwargs
    if mode is not None:
        strict = mode == SolidifyMode.STRICT
        coerce = mode != SolidifyMode.STRICT

    instance = target()

    # Pre-compute reverse alias map: alias_string -> canonical field name
    alias_map: dict[str, str] = {}
    for field_name, fdef in instance._field_defs.items():
        if fdef.alias:
            alias_map[fdef.alias] = field_name
        for a in fdef.aliases:
            alias_map[a] = field_name

    for key, value in data.items():
        # Resolve canonical field name: try exact match, then alias map, then
        # kebab/case normalization
        normalized_key = key.replace("-", "_").lower()
        if normalized_key in instance._field_defs:
            field_name = normalized_key
        elif key in alias_map:
            field_name = alias_map[key]
        elif normalized_key in alias_map:
            field_name = alias_map[normalized_key]
        elif strict:
            raise StructureError(f"Unknown key '{key}' found in source '{source}'")
        else:
            continue

        fdef = instance._field_defs[field_name]

        # Nested @layer_obj: recursively solidify if value is a dict
        if _is_layer_obj_type(fdef.type_hint) and isinstance(value, dict):
            nested = solidify(value, fdef.type_hint, source=source, coerce=coerce, mode=mode)
            setattr(instance, field_name, nested)
            instance._sources[field_name].push(source, nested)
        else:
            # Apply before_coerce parsers
            for parse_fn in type(instance)._parsers.get(field_name, []):
                if getattr(parse_fn, "_layer_parser_before_coerce", False):
                    value = parse_fn(instance, value)

            # Coerce if requested
            if coerce and fdef.type_hint is not None:
                try:
                    value = _coerce(value, fdef.type_hint, parser=fdef.parser)
                except (ValueError, TypeError, CoercionError):
                    if mode == SolidifyMode.STANDARD:
                        raise  # STANDARD: let coercion errors bubble
                    # LAX or legacy (mode=None): swallow and leave as-is

            # Apply @parser methods (after coercion, before write)
            for parse_fn in type(instance)._parsers.get(field_name, []):
                if not getattr(parse_fn, "_layer_parser_before_coerce", False):
                    value = parse_fn(instance, value)

            setattr(instance, field_name, value)
            instance._sources[field_name].push(source, value)

    if check:
        instance.validate(check).raise_if_invalid()

    return instance

solidify_file

solidify_file(
    path: str,
    target: type,
    source: str = None,
    check: list[str] | None = None,
    strict: bool = False,
    coerce: bool = True,
    mode: Optional[SolidifyMode] = None,
)

Load a config file (YAML, JSON, or TOML) and solidify it into a typed config.

Detects format from file extension. Requires the corresponding library to be installed (PyYAML for .yml/.yaml, tomllib/tomli for .toml).

Parameters:

Name Type Description Default
path str

Path to the config file.

required
target type

Target @layer_obj class.

required
source str

Source tag. Defaults to the file path.

None
check list[str] | None

Categories to validate after loading.

None
strict bool

Raise on unknown keys. (Legacy; prefer mode=)

False
coerce bool

Attempt type coercion. (Legacy; prefer mode=)

True
mode Optional[SolidifyMode]

SolidifyMode controlling strictness. Overrides strict/coerce when set.

None

Returns:

Type Description

An instance of target.

Raises:

Type Description
FileNotFoundError

If path doesn't exist.

StructureError

If file format is unsupported or parsing fails.

Source code in src/layer/solidify.py
def solidify_file(
    path: str,
    target: type,
    source: str = None,
    check: list[str] | None = None,
    strict: bool = False,
    coerce: bool = True,
    mode: Optional["SolidifyMode"] = None,
):
    """Load a config file (YAML, JSON, or TOML) and solidify it into a typed config.

    Detects format from file extension. Requires the corresponding library
    to be installed (PyYAML for .yml/.yaml, tomllib/tomli for .toml).

    Args:
        path: Path to the config file.
        target: Target @layer_obj class.
        source: Source tag. Defaults to the file path.
        check: Categories to validate after loading.
        strict: Raise on unknown keys. (Legacy; prefer mode=)
        coerce: Attempt type coercion. (Legacy; prefer mode=)
        mode: SolidifyMode controlling strictness. Overrides strict/coerce when set.

    Returns:
        An instance of target.

    Raises:
        FileNotFoundError: If path doesn't exist.
        StructureError: If file format is unsupported or parsing fails.
    """
    if source is None:
        source = str(path)

    data = _read_file(path)

    return solidify(
        data,
        target,
        source=source,
        check=check,
        strict=strict,
        coerce=coerce,
        mode=mode,
    )

solidify_env

solidify_env(
    prefix: str,
    target: type,
    key_map: dict[str, Any] | None = None,
    separator: str = "_",
)

Loads configuration from environment variables.

Parameters:

Name Type Description Default
prefix str

Env var prefix (e.g. "AK" -> reads AK_ENDPOINT, AK_DEBUG, etc.)

required
target type

Target @layer_obj class.

required
key_map dict[str, Any] | None

Optional dict mapping field names to custom env var names. Values can be a string (single env var) or list of strings (fallback chain).

None
separator str

Separator between prefix and field name (default "_").

'_'

Returns:

Type Description

An instance of target with values set from environment variables.

Source code in src/layer/solidify.py
def solidify_env(
    prefix: str,
    target: type,
    key_map: dict[str, Any] | None = None,
    separator: str = "_",
):
    """Loads configuration from environment variables.

    Args:
        prefix: Env var prefix (e.g. "AK" -> reads AK_ENDPOINT, AK_DEBUG, etc.)
        target: Target @layer_obj class.
        key_map: Optional dict mapping field names to custom env var names.
            Values can be a string (single env var) or list of strings (fallback chain).
        separator: Separator between prefix and field name (default "_").

    Returns:
        An instance of target with values set from environment variables.
    """
    instance = target()
    key_map = key_map or {}

    for name, fdef in instance._field_defs.items():
        # Nested @layer_obj: use a sub-prefix
        if _is_layer_obj_type(fdef.type_hint):
            sub_prefix = f"{prefix.upper()}{separator}{name.upper()}"
            nested = solidify_env(sub_prefix, fdef.type_hint, separator=separator)
            # Only layer if any field was actually set from env
            has_env_values = any(s != "default" for s in nested._sources.values())
            if has_env_values:
                current = getattr(instance, name)
                current.layer(nested)
                instance._sources[name].push(f"env:{sub_prefix}_*", current)
            continue

        # Determine env var name(s) to check, in priority order:
        # 1. fdef.env (explicit override on the field)
        # 2. key_map (caller-supplied override)
        # 3. PREFIX_FIELD_NAME convention
        if fdef.env:
            env_keys = [fdef.env]
        else:
            env_keys = key_map.get(name)
            if isinstance(env_keys, str):
                env_keys = [env_keys]
            if not env_keys:
                env_keys = [f"{prefix.upper()}{separator}{name.upper()}"]

        for env_key in env_keys:
            val = os.environ.get(env_key)
            if val is not None:
                # Apply before_coerce parsers
                for parse_fn in type(instance)._parsers.get(name, []):
                    if getattr(parse_fn, "_layer_parser_before_coerce", False):
                        val = parse_fn(instance, val)

                coerced = _coerce(val, fdef.type_hint, parser=fdef.parser)
                # Apply @parser methods (after coercion, before write)
                for parse_fn in type(instance)._parsers.get(name, []):
                    if not getattr(parse_fn, "_layer_parser_before_coerce", False):
                        coerced = parse_fn(instance, coerced)
                setattr(instance, name, coerced)
                instance._sources[name].push(f"env:{env_key}", coerced)
                break  # Stop at first found in fallback chain

    return instance

write_file

write_file(
    config,
    path: str,
    format: str = None,
    by_alias: bool = False,
)

Write a config object to a file.

Parameters:

Name Type Description Default
config

A @layer_obj instance.

required
path str

Output file path.

required
format str

"yaml", "json", or "toml". Auto-detected from extension if None.

None
by_alias bool

If True, use field aliases as keys in the output file.

False
Source code in src/layer/solidify.py
def write_file(config, path: str, format: str = None, by_alias: bool = False):
    """Write a config object to a file.

    Args:
        config: A @layer_obj instance.
        path: Output file path.
        format: "yaml", "json", or "toml". Auto-detected from extension if None.
        by_alias: If True, use field aliases as keys in the output file.
    """
    if format is None:
        ext = str(path).rsplit(".", 1)[-1].lower() if "." in str(path) else ""
        format = {"yml": "yaml", "yaml": "yaml", "json": "json", "toml": "toml"}.get(ext)

    data = config.to_dict(by_alias=by_alias)

    if format == "yaml":
        try:
            import yaml
        except ImportError:
            raise StructureError("PyYAML is required: pip install PyYAML")
        with open(path, "w") as f:
            yaml.dump(data, f, sort_keys=False, default_flow_style=False)

    elif format == "json":
        import json

        with open(path, "w") as f:
            json.dump(data, f, indent=2, default=str)

    elif format == "toml":
        try:
            import tomli_w
        except ImportError:
            raise StructureError("tomli_w is required to write .toml files: pip install tomli-w")
        with open(path, "wb") as f:
            tomli_w.dump(data, f)

    else:
        raise StructureError(f"Unsupported format: '{format}'. Use yaml, json, or toml")

SolidifyMode

Bases: Enum

Strictness mode for solidify() and ConfigPipeline.

LAX

Unknown keys are silently ignored. Type coercion errors are swallowed; the raw value is used as-is.

STANDARD (default): Unknown keys are silently ignored. Type coercion errors bubble up as CoercionError. STRICT: Unknown keys immediately raise StructureError. No coercion is attempted; incoming values must already match the type hint.

Source code in src/layer/solidify.py
class SolidifyMode(Enum):
    """Strictness mode for solidify() and ConfigPipeline.

    LAX:
        Unknown keys are silently ignored.
        Type coercion errors are swallowed; the raw value is used as-is.
    STANDARD (default):
        Unknown keys are silently ignored.
        Type coercion errors bubble up as CoercionError.
    STRICT:
        Unknown keys immediately raise StructureError.
        No coercion is attempted; incoming values must already match the type hint.
    """

    LAX = "lax"
    STANDARD = "standard"
    STRICT = "strict"

Validation

require

require(value: Any, field_name: str, config: Any) -> True

Field must be set (not None).

Source code in src/layer/validation.py
def require(value: Any, field_name: str, config: Any) -> True:
    """Field must be set (not None)."""
    if value is None:
        raise ValidationError(field_name, "Field is required", "require", "unknown")
    return True

optional

optional(value, field_name, config)

Explicitly marks field as optional. Always passes. Documentation-only.

Source code in src/layer/validation.py
def optional(value, field_name, config):
    """Explicitly marks field as optional. Always passes. Documentation-only."""
    return True

not_empty

not_empty(value, field_name, config)

Value must not be empty (empty string, empty list, empty dict).

Unlike require (which checks for None), this catches "" and [] too.

Source code in src/layer/validation.py
def not_empty(value, field_name, config):
    """Value must not be empty (empty string, empty list, empty dict).

    Unlike `require` (which checks for None), this catches "" and [] too.
    """
    if value is not None:
        if isinstance(value, (str, list, dict)) and len(value) == 0:
            raise ValidationError(
                field_name,
                "Must not be empty",
                "not_empty",
                "unknown",
            )
    return True

one_of

one_of(*allowed_values)

Value must be in the given set.

Source code in src/layer/validation.py
def one_of(*allowed_values):
    """Value must be in the given set."""

    def _one_of(value: Any, field_name: str, config: Any):
        if value is not None and value not in allowed_values:
            raise ValidationError(
                field_name, f"Must be one of {allowed_values}", "one_of", "unknown"
            )
        return True

    return _one_of

in_range

in_range(lo, hi)

Numeric value must be in [lo, hi].

Source code in src/layer/validation.py
def in_range(lo, hi):
    """Numeric value must be in [lo, hi]."""

    def _in_range(value, field_name, config):
        if value is not None and not (lo <= value <= hi):
            raise ValidationError(
                field_name,
                f"Must be between {lo} and {hi}, got {value}",
                "in_range",
                "unknown",
            )
        return True

    return _in_range

is_port

is_port(value, field_name, config)

Shorthand for in_range(1, 65535) with a clearer error message.

Source code in src/layer/validation.py
def is_port(value, field_name, config):
    """Shorthand for in_range(1, 65535) with a clearer error message."""
    if value is not None:
        if not isinstance(value, int) or not (1 <= value <= 65535):
            raise ValidationError(
                field_name,
                f"Must be a valid port (1-65535), got {value}",
                "is_port",
                "unknown",
            )
    return True

is_url

is_url(value, field_name, config)

Value must look like a URL (http:// or https://).

Source code in src/layer/validation.py
def is_url(value, field_name, config):
    """Value must look like a URL (http:// or https://)."""
    if value is not None:
        if not isinstance(value, str) or not value.startswith(("http://", "https://")):
            raise ValidationError(
                field_name,
                f"Must be a valid URL (http/https), got: {value!r}",
                "is_url",
                "unknown",
            )
    return True

is_positive

is_positive(value, field_name, config)

Numeric value must be > 0.

Source code in src/layer/validation.py
def is_positive(value, field_name, config):
    """Numeric value must be > 0."""
    if value is not None:
        if not isinstance(value, (int, float)) or value <= 0:
            raise ValidationError(
                field_name,
                f"Must be positive, got {value}",
                "is_positive",
                "unknown",
            )
    return True

regex

regex(pattern: str, message: str = None)

String must match the given regex pattern.

Usage

endpoint: str = field(str, cluster=[regex(r"https?://.+")])

Source code in src/layer/validation.py
def regex(pattern: str, message: str = None):
    """String must match the given regex pattern.

    Usage:
        endpoint: str = field(str, cluster=[regex(r"https?://.+")])
    """
    import re as _re

    compiled = _re.compile(pattern)

    def _regex(value, field_name, config):
        if value is not None and not compiled.match(str(value)):
            msg = message or f"Must match pattern: {pattern}"
            raise ValidationError(field_name, msg, "regex", "unknown")
        return True

    _regex.__name__ = "regex"
    return _regex

min_length

min_length(n)

String length >= n.

Source code in src/layer/validation.py
def min_length(n):
    """String length >= n."""

    def _min_length(value: Any, field_name: str, config: Any):
        if value is not None and len(str(value)) < n:
            raise ValidationError(
                field_name,
                f"Length must be >= {n}, got {len(str(value))}",
                "min_length",
                "unknown",
            )
        return True

    return _min_length

max_length

max_length(n: int)

String length <= n.

Source code in src/layer/validation.py
def max_length(n: int):
    """String length <= n."""

    def _max_length(value, field_name, config):
        if value is not None and len(str(value)) > n:
            raise ValidationError(
                field_name,
                f"Length must be <= {n}, got {len(str(value))}",
                "max_length",
                "unknown",
            )
        return True

    _max_length.__name__ = "max_length"
    return _max_length

path_exists

path_exists(value: Any, field_name: str, config: Any)

Path must exist on filesystem.

Source code in src/layer/validation.py
def path_exists(value: Any, field_name: str, config: Any):
    """Path must exist on filesystem."""
    if value is not None and not os.path.exists(str(value)):
        raise ValidationError(
            field_name, f"Path '{value}' does not exist", "path_exists", "unknown"
        )
    return True

instance_of

instance_of(expected_type)

Value must be isinstance(val, expected_type).

Source code in src/layer/validation.py
def instance_of(expected_type):
    """Value must be isinstance(val, expected_type)."""

    def _instance_of(value: Any, field_name: str, config: Any):
        if value is not None and not isinstance(value, expected_type):
            raise ValidationError(
                field_name,
                f"Expected {expected_type.__name__}, got {type(value).__name__}",
                "instance_of",
                "unknown",
            )
        return True

    return _instance_of

each_item

each_item(validator)

Apply a validator to each item in a list field.

Usage

partition_ids: list = field(list, each_item(min_length(1)), default=[])

Source code in src/layer/validation.py
def each_item(validator):
    """Apply a validator to each item in a list field.

    Usage:
        partition_ids: list = field(list, each_item(min_length(1)), default=[])
    """

    def _each_item(value, field_name, config):
        if value is not None and isinstance(value, list):
            for i, item in enumerate(value):
                try:
                    validator(item, f"{field_name}[{i}]", config)
                except ValidationError as e:
                    raise ValidationError(
                        f"{field_name}[{i}]",
                        e.message,
                        f"each_item({getattr(validator, '__name__', 'custom')})",
                        "unknown",
                    )
        return True

    _each_item.__name__ = "each_item"
    return _each_item

requires_if

requires_if(trigger_field: str, trigger_value: Any)

Field is required when another field equals a specific value.

Usage

client_cert: str = field(str, cluster=[requires_if("tls_enabled", True)], default=None )

Source code in src/layer/validation.py
def requires_if(trigger_field: str, trigger_value: Any):
    """Field is required when another field equals a specific value.

    Usage:
        client_cert: str = field(str,
            cluster=[requires_if("tls_enabled", True)],
            default=None
        )
    """

    def _requires_if(value, field_name, config):
        trigger_val = getattr(config, trigger_field, None)
        if trigger_val == trigger_value and value is None:
            raise ValidationError(
                field_name,
                f"Required when '{trigger_field}' is {trigger_value!r}",
                "requires_if",
                "unknown",
            )
        return True

    _requires_if.__name__ = "requires_if"
    return _requires_if

requires_any

requires_any(*field_names)

At least one of the listed fields must be set (not None).

Apply this validator to any ONE of the fields in the group.

Usage

token: str = field(str, auth=[requires_any("token", "username")], default=None) username: str = field(str, default=None)

Source code in src/layer/validation.py
def requires_any(*field_names):
    """At least one of the listed fields must be set (not None).

    Apply this validator to any ONE of the fields in the group.

    Usage:
        token: str = field(str, auth=[requires_any("token", "username")], default=None)
        username: str = field(str, default=None)
    """

    def _requires_any(value, field_name, config):
        if all(getattr(config, f, None) is None for f in field_names):
            raise ValidationError(
                field_name,
                f"At least one of {field_names} must be set",
                "requires_any",
                "unknown",
            )
        return True

    _requires_any.__name__ = "requires_any"
    return _requires_any

requires_all

requires_all(*field_names)

All of the listed fields must be set together, or none of them.

Usage

client_cert: str = field(str, cluster=[requires_all("client_certificate", "client_key")], default=None )

Source code in src/layer/validation.py
def requires_all(*field_names):
    """All of the listed fields must be set together, or none of them.

    Usage:
        client_cert: str = field(str,
            cluster=[requires_all("client_certificate", "client_key")],
            default=None
        )
    """

    def _requires_all(value, field_name, config):
        values = [getattr(config, f, None) for f in field_names]
        set_count = sum(1 for v in values if v is not None)
        if 0 < set_count < len(field_names):
            missing = [f for f, v in zip(field_names, values) if v is None]
            raise ValidationError(
                field_name,
                f"Fields {field_names} must all be set together. Missing: {missing}",
                "requires_all",
                "unknown",
            )
        return True

    _requires_all.__name__ = "requires_all"
    return _requires_all

mutually_exclusive

mutually_exclusive(*field_names)

At most one of the listed fields may be set.

Usage

token: str = field(str, auth=[mutually_exclusive("token", "username_password", "certificate")], default=None )

Source code in src/layer/validation.py
def mutually_exclusive(*field_names):
    """At most one of the listed fields may be set.

    Usage:
        token: str = field(str,
            auth=[mutually_exclusive("token", "username_password", "certificate")],
            default=None
        )
    """

    def _mutually_exclusive(value, field_name, config):
        set_fields = [f for f in field_names if getattr(config, f, None) is not None]
        if len(set_fields) > 1:
            raise ValidationError(
                field_name,
                f"Only one of {field_names} may be set, but got: {set_fields}",
                "mutually_exclusive",
                "unknown",
            )
        return True

    _mutually_exclusive.__name__ = "mutually_exclusive"
    return _mutually_exclusive

depends_on

depends_on(*required_fields)

If this field is set, the listed fields must also be set.

Usage

client_key: str = field(str, cluster=[depends_on("client_certificate")], default=None )

Source code in src/layer/validation.py
def depends_on(*required_fields):
    """If this field is set, the listed fields must also be set.

    Usage:
        client_key: str = field(str,
            cluster=[depends_on("client_certificate")],
            default=None
        )
    """

    def _depends_on(value, field_name, config):
        if value is not None:
            missing = [f for f in required_fields if getattr(config, f, None) is None]
            if missing:
                raise ValidationError(
                    field_name,
                    f"When '{field_name}' is set, {required_fields} "
                    f"must also be set. Missing: {missing}",
                    "depends_on",
                    "unknown",
                )
        return True

    _depends_on.__name__ = "depends_on"
    return _depends_on

Layering

LayerRule

Bases: Enum

Source code in src/layer/layering.py
4
5
6
7
8
class LayerRule(Enum):
    OVERRIDE = auto()
    PRESERVE = auto()
    MERGE = auto()
    APPEND = auto()

Providers

BaseProvider

Bases: ABC

Abstract base class for configuration providers.

Every provider must implement read() -> dict. The pipeline calls providers in order, layering each result onto the config instance.

Source code in src/layer/providers/base.py
class BaseProvider(ABC):
    """Abstract base class for configuration providers.

    Every provider must implement read() -> dict. The pipeline calls
    providers in order, layering each result onto the config instance.
    """

    @abstractmethod
    def read(self) -> dict:
        """Return config data as a flat or nested dict."""

    def bind_schema(self, schema: type) -> None:
        """Optional hook for providers that need to inspect the @layerclass schema
        (e.g., for deep mapping or explicit alias resolution) before reading.

        Args:
            schema: The target @layerclass type.
        """
        pass

    @property
    def source_name(self) -> str:
        """Human-readable label for source tracking."""
        return self.__class__.__name__

    @property
    def watchable(self) -> bool:
        """Whether this provider supports hot-reload watching."""
        return False

source_name property

source_name: str

Human-readable label for source tracking.

watchable property

watchable: bool

Whether this provider supports hot-reload watching.

read abstractmethod

read() -> dict

Return config data as a flat or nested dict.

Source code in src/layer/providers/base.py
@abstractmethod
def read(self) -> dict:
    """Return config data as a flat or nested dict."""

bind_schema

bind_schema(schema: type) -> None

Optional hook for providers that need to inspect the @layerclass schema (e.g., for deep mapping or explicit alias resolution) before reading.

Parameters:

Name Type Description Default
schema type

The target @layerclass type.

required
Source code in src/layer/providers/base.py
def bind_schema(self, schema: type) -> None:
    """Optional hook for providers that need to inspect the @layerclass schema
    (e.g., for deep mapping or explicit alias resolution) before reading.

    Args:
        schema: The target @layerclass type.
    """
    pass

Sources

SourceHistory dataclass

Full history stack for a single field.

Source code in src/layer/sources.py
@dataclass
class SourceHistory:
    """Full history stack for a single field."""

    entries: list[SourceEntry] = dc_field(default_factory=list)

    def push(self, source: str, value: Any):
        self.entries.append(SourceEntry(source=source, value=value))

    @property
    def current(self) -> str:
        """The most recent source tag (backward-compatible with old _sources[name])."""
        return self.entries[-1].source if self.entries else "default"

    @property
    def current_value(self) -> Any:
        return self.entries[-1].value if self.entries else None

    def all_sources(self) -> list[str]:
        """Return list of source tags in chronological order."""
        return [e.source for e in self.entries]

    def __repr__(self):
        return f"SourceHistory({self.all_sources()})"

current property

current: str

The most recent source tag (backward-compatible with old _sources[name]).

all_sources

all_sources() -> list[str]

Return list of source tags in chronological order.

Source code in src/layer/sources.py
def all_sources(self) -> list[str]:
    """Return list of source tags in chronological order."""
    return [e.source for e in self.entries]

Exceptions

ConfigError

Bases: Exception

Base exception for all layer errors.

Source code in src/layer/exceptions.py
1
2
3
4
class ConfigError(Exception):
    """Base exception for all layer errors."""

    pass

ValidationError

Bases: ConfigError

One or more validation rules failed.

Source code in src/layer/exceptions.py
class ValidationError(ConfigError):
    """One or more validation rules failed."""

    def __init__(self, field: str, message: str, rule: str, category: str):
        super().__init__(f"[Category: {category}] Field '{field}' failed rule '{rule}': {message}")
        self.field = field
        self.message = message
        self.rule = rule
        self.category = category

StructureError

Bases: ConfigError

Source data doesn't match schema (unknown keys in strict mode).

Source code in src/layer/exceptions.py
class StructureError(ConfigError):
    """Source data doesn't match schema (unknown keys in strict mode)."""

    pass

LayeringError

Bases: ConfigError

Merge conflict or invalid rule application.

Source code in src/layer/exceptions.py
class LayeringError(ConfigError):
    """Merge conflict or invalid rule application."""

    pass

CoercionError

Bases: ConfigError

Raised when a value cannot be coerced to the target type.

Used internally so Union handling can try the next candidate type on failure.

Source code in src/layer/exceptions.py
class CoercionError(ConfigError):
    """Raised when a value cannot be coerced to the target type.

    Used internally so Union handling can try the next candidate type on failure.
    """

    pass

InterpolationError

Bases: ConfigError

Raised on unresolvable or circular references.

Source code in src/layer/exceptions.py
class InterpolationError(ConfigError):
    """Raised on unresolvable or circular references."""

    pass

InterpolationCycleError

Bases: InterpolationError

Raised when a circular ${variable} reference is detected.

Source code in src/layer/exceptions.py
class InterpolationCycleError(InterpolationError):
    """Raised when a circular ${variable} reference is detected."""

    pass

MissingDependencyError

Bases: ConfigError

Raised when an optional dependency (boto3, watchdog, etc.) is not installed.

Source code in src/layer/exceptions.py
class MissingDependencyError(ConfigError):
    """Raised when an optional dependency (boto3, watchdog, etc.) is not installed."""

    pass

HotReloadError

Bases: ConfigError

Raised when an error occurs during hot-reload of configuration.

Source code in src/layer/exceptions.py
class HotReloadError(ConfigError):
    """Raised when an error occurs during hot-reload of configuration."""

    pass

Exporters

to_json_schema

to_json_schema(config_cls: type) -> dict

Return the JSON Schema dict for a @layerclass.

Wraps config_cls.json_schema() and returns the result directly.

Parameters:

Name Type Description Default
config_cls type

A @layerclass decorated class.

required

Returns:

Type Description
dict

A JSON Schema dict (draft-07).

Source code in src/layer/exporters.py
def to_json_schema(config_cls: type) -> dict:
    """Return the JSON Schema dict for a ``@layerclass``.

    Wraps ``config_cls.json_schema()`` and returns the result directly.

    Args:
        config_cls: A ``@layerclass`` decorated class.

    Returns:
        A JSON Schema dict (draft-07).
    """
    return config_cls.json_schema()

to_yaml

to_yaml(config_cls: type) -> str

Generate a YAML configuration template from a @layerclass.

Non-secret fields are emitted with their default values. Secret fields are omitted and replaced with a commented-out placeholder. Field descriptions are emitted as YAML comments above the fields.

Parameters:

Name Type Description Default
config_cls type

A @layerclass decorated class.

required

Returns:

Type Description
str

A YAML string suitable for saving as config.yml.

Example

print(to_yaml(AppConfig))

# Database host

host: localhost

port: 5432

Source code in src/layer/exporters.py
def to_yaml(config_cls: type) -> str:
    """Generate a YAML configuration template from a ``@layerclass``.

    Non-secret fields are emitted with their default values. Secret fields
    are omitted and replaced with a commented-out placeholder. Field
    descriptions are emitted as YAML comments above the fields.

    Args:
        config_cls: A ``@layerclass`` decorated class.

    Returns:
        A YAML string suitable for saving as ``config.yml``.

    Example:
        print(to_yaml(AppConfig))
        # # Database host
        # host: localhost
        # port: 5432
    """
    lines = []
    _render_yaml_fields(config_cls, 0, lines)
    return "\n".join(lines)

to_dotenv_template

to_dotenv_template(
    config_cls: type, prefix: str = ""
) -> str

Generate a .env file template from a @layerclass definition.

Each field becomes one KEY=<default> line, with its description emitted as a # comment above the line. Nested @layerclass fields are rendered as a labelled section. Secret fields have their default replaced with <secret>.

Parameters:

Name Type Description Default
config_cls type

A @layerclass decorated class.

required
prefix str

Optional env var prefix (e.g. "APP"APP_HOST=localhost).

''

Returns:

Type Description
str

A multi-line string suitable for saving as .env.template.

Example

@layerclass class Config: host: str = field(str, default="localhost", description="Database host") port: int = field(int, default=5432)

print(to_dotenv_template(Config, prefix="APP"))

# Database host

APP_HOST=localhost

APP_PORT=5432

Source code in src/layer/exporters.py
def to_dotenv_template(config_cls: type, prefix: str = "") -> str:
    """Generate a ``.env`` file template from a ``@layerclass`` definition.

    Each field becomes one ``KEY=<default>`` line, with its description emitted
    as a ``# comment`` above the line. Nested ``@layerclass`` fields are rendered
    as a labelled section. Secret fields have their default replaced with
    ``<secret>``.

    Args:
        config_cls: A ``@layerclass`` decorated class.
        prefix: Optional env var prefix (e.g. ``"APP"`` → ``APP_HOST=localhost``).

    Returns:
        A multi-line string suitable for saving as ``.env.template``.

    Example:
        @layerclass
        class Config:
            host: str = field(str, default="localhost", description="Database host")
            port: int = field(int, default=5432)

        print(to_dotenv_template(Config, prefix="APP"))
        # # Database host
        # APP_HOST=localhost
        # APP_PORT=5432
    """
    lines = []
    _render_dotenv_fields(config_cls, prefix.upper(), lines)
    return "\n".join(lines)

to_configmap

to_configmap(
    config_cls: type, name: str = "app-config"
) -> str

Generate a Kubernetes ConfigMap YAML string from a @layerclass.

Non-secret fields are emitted as ConfigMap data entries. Secret fields are omitted with a comment indicating they belong in a Secret resource.

Parameters:

Name Type Description Default
config_cls type

A @layerclass decorated class.

required
name str

The metadata.name for the ConfigMap resource (default "app-config").

'app-config'

Returns:

Type Description
str

A YAML string suitable for kubectl apply -f.

Example

print(to_configmap(AppConfig, name="my-app"))

apiVersion: v1

kind: ConfigMap

metadata:

name: my-app

data:

HOST: localhost

PORT: "5432"

Source code in src/layer/exporters.py
def to_configmap(config_cls: type, name: str = "app-config") -> str:
    """Generate a Kubernetes ConfigMap YAML string from a ``@layerclass``.

    Non-secret fields are emitted as ConfigMap data entries. Secret fields
    are omitted with a comment indicating they belong in a Secret resource.

    Args:
        config_cls: A ``@layerclass`` decorated class.
        name: The ``metadata.name`` for the ConfigMap resource
            (default ``"app-config"``).

    Returns:
        A YAML string suitable for ``kubectl apply -f``.

    Example:
        print(to_configmap(AppConfig, name="my-app"))
        # apiVersion: v1
        # kind: ConfigMap
        # metadata:
        #   name: my-app
        # data:
        #   HOST: localhost
        #   PORT: "5432"
    """
    lines = [
        "apiVersion: v1",
        "kind: ConfigMap",
        "metadata:",
        f"  name: {name}",
        "data:",
    ]
    _render_configmap_fields(config_cls, "", lines)
    return "\n".join(lines)

Observers

BasePipelineObserver

Abstract base class for pipeline lifecycle observers.

All methods are no-ops by default. Subclass and override only the hooks you need.

Example

class MetricsObserver(BasePipelineObserver): def on_hot_reload_triggered(self, diffs): statsd.increment("config.reload", tags=[f"changes:{len(diffs)}"])

def on_hot_reload_locked(self, field):
    statsd.increment("config.reload.locked", tags=[f"field:{field}"])
Source code in src/layer/observers.py
class BasePipelineObserver:
    """Abstract base class for pipeline lifecycle observers.

    All methods are no-ops by default. Subclass and override only the hooks
    you need.

    Example:
        class MetricsObserver(BasePipelineObserver):
            def on_hot_reload_triggered(self, diffs):
                statsd.increment("config.reload", tags=[f"changes:{len(diffs)}"])

            def on_hot_reload_locked(self, field):
                statsd.increment("config.reload.locked", tags=[f"field:{field}"])
    """

    def on_provider_read(self, provider_name: str, data: dict) -> None:
        """Called after a provider successfully reads its data.

        Args:
            provider_name: The ``source_name`` of the provider.
            data: The raw dict returned by the provider.
        """

    def on_coercion_error(
        self, field: str, value: Any, target_type: type, error: Exception
    ) -> None:
        """Called when a type coercion fails (LAX mode swallows the error).

        Args:
            field: The field name that failed coercion.
            value: The raw value that could not be coerced.
            target_type: The target type that coercion was attempted for.
            error: The original exception.
        """

    def on_layer_merged(self, provider_name: str, rules_applied: dict) -> None:
        """Called after each provider's overlay is layered onto the live config.

        Args:
            provider_name: The ``source_name`` of the provider.
            rules_applied: The ``LayerRule`` dict used during the merge.
        """

    def on_hot_reload_triggered(self, diffs: list) -> None:
        """Called when a hot-reload detects one or more field changes.

        Args:
            diffs: List of diff dicts (field, old_value, new_value, …) from
                ``config.diff(shadow)``.
        """

    def on_hot_reload_locked(self, field: str) -> None:
        """Called when a hot-reload is skipped for a ``reloadable=False`` field.

        Args:
            field: Dot-notation path of the locked field.
        """

on_provider_read

on_provider_read(provider_name: str, data: dict) -> None

Called after a provider successfully reads its data.

Parameters:

Name Type Description Default
provider_name str

The source_name of the provider.

required
data dict

The raw dict returned by the provider.

required
Source code in src/layer/observers.py
def on_provider_read(self, provider_name: str, data: dict) -> None:
    """Called after a provider successfully reads its data.

    Args:
        provider_name: The ``source_name`` of the provider.
        data: The raw dict returned by the provider.
    """

on_coercion_error

on_coercion_error(
    field: str,
    value: Any,
    target_type: type,
    error: Exception,
) -> None

Called when a type coercion fails (LAX mode swallows the error).

Parameters:

Name Type Description Default
field str

The field name that failed coercion.

required
value Any

The raw value that could not be coerced.

required
target_type type

The target type that coercion was attempted for.

required
error Exception

The original exception.

required
Source code in src/layer/observers.py
def on_coercion_error(
    self, field: str, value: Any, target_type: type, error: Exception
) -> None:
    """Called when a type coercion fails (LAX mode swallows the error).

    Args:
        field: The field name that failed coercion.
        value: The raw value that could not be coerced.
        target_type: The target type that coercion was attempted for.
        error: The original exception.
    """

on_layer_merged

on_layer_merged(
    provider_name: str, rules_applied: dict
) -> None

Called after each provider's overlay is layered onto the live config.

Parameters:

Name Type Description Default
provider_name str

The source_name of the provider.

required
rules_applied dict

The LayerRule dict used during the merge.

required
Source code in src/layer/observers.py
def on_layer_merged(self, provider_name: str, rules_applied: dict) -> None:
    """Called after each provider's overlay is layered onto the live config.

    Args:
        provider_name: The ``source_name`` of the provider.
        rules_applied: The ``LayerRule`` dict used during the merge.
    """

on_hot_reload_triggered

on_hot_reload_triggered(diffs: list) -> None

Called when a hot-reload detects one or more field changes.

Parameters:

Name Type Description Default
diffs list

List of diff dicts (field, old_value, new_value, …) from config.diff(shadow).

required
Source code in src/layer/observers.py
def on_hot_reload_triggered(self, diffs: list) -> None:
    """Called when a hot-reload detects one or more field changes.

    Args:
        diffs: List of diff dicts (field, old_value, new_value, …) from
            ``config.diff(shadow)``.
    """

on_hot_reload_locked

on_hot_reload_locked(field: str) -> None

Called when a hot-reload is skipped for a reloadable=False field.

Parameters:

Name Type Description Default
field str

Dot-notation path of the locked field.

required
Source code in src/layer/observers.py
def on_hot_reload_locked(self, field: str) -> None:
    """Called when a hot-reload is skipped for a ``reloadable=False`` field.

    Args:
        field: Dot-notation path of the locked field.
    """

LoggerObserver

Bases: BasePipelineObserver

Observer that emits structured log messages via a standard logging.Logger.

Parameters:

Name Type Description Default
logger Logger

A logging.Logger instance. Typically obtained via logging.getLogger(__name__).

required
Example

import logging pipeline = ConfigPipeline(AppConfig, logger=logging.getLogger("myapp"))

Source code in src/layer/observers.py
class LoggerObserver(BasePipelineObserver):
    """Observer that emits structured log messages via a standard ``logging.Logger``.

    Args:
        logger: A ``logging.Logger`` instance. Typically obtained via
            ``logging.getLogger(__name__)``.

    Example:
        import logging
        pipeline = ConfigPipeline(AppConfig, logger=logging.getLogger("myapp"))
    """

    def __init__(self, logger: logging.Logger):
        self._logger = logger

    def on_provider_read(self, provider_name: str, data: dict) -> None:
        self._logger.debug("layer: provider '%s' read %d key(s)", provider_name, len(data))

    def on_coercion_error(
        self, field: str, value: Any, target_type: type, error: Exception
    ) -> None:
        self._logger.warning(
            "layer: coercion failed for field '%s' (value=%r, target=%s): %s",
            field,
            value,
            getattr(target_type, "__name__", str(target_type)),
            error,
        )

    def on_layer_merged(self, provider_name: str, rules_applied: dict) -> None:
        self._logger.debug(
            "layer: merged overlay from '%s' (rules=%s)", provider_name, rules_applied
        )

    def on_hot_reload_triggered(self, diffs: list) -> None:
        fields = [d["field"] for d in diffs]
        self._logger.info("layer: hot-reload detected %d change(s): %s", len(diffs), fields)

    def on_hot_reload_locked(self, field: str) -> None:
        self._logger.warning(
            "layer: skipped hot-reload for locked field '%s' (reloadable=False)", field
        )