Skip to content

API Reference

Full reference for the plugin-rosetta Python API. All public classes and functions are documented here. Source code is shown inline.


Base translator

The abstract base class all resource-specific translators extend.

Bases: ABC

Abstract base for per-resource FHIR-to-OMOP translators.

Source code in src/plugin_rosetta/translators/base.py
class FhirToOmopTranslator(ABC):
    """Abstract base for per-resource FHIR-to-OMOP translators."""

    # Subclasses set this to the FHIR resourceType string they handle.
    resource_type: str = ""

    @abstractmethod
    def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
        """Translate one FHIR resource dict to one (flat) OMOP row dict.

        Returns an empty dict if the record cannot be translated.
        """
        ...

    # ------------------------------------------------------------------
    # Convenience entry-points
    # ------------------------------------------------------------------

    def translate_dict(self, record: dict[str, Any]) -> dict[str, Any]:
        """Translate a single FHIR resource dict."""
        return self.translate_record(record)

    def translate_ndjson(self, path: Path) -> list[dict[str, Any]]:
        """Read a FHIR ndjson file line-by-line and translate each record.

        Uses orjson for performance.  Lines that fail to parse or that have
        an unexpected resourceType are skipped with a warning.
        """
        results: list[dict[str, Any]] = []
        with open(path, "rb") as f:
            for lineno, line in enumerate(f, start=1):
                line = line.strip()
                if not line:
                    continue
                try:
                    record = orjson.loads(line)
                except orjson.JSONDecodeError as exc:
                    print(f"  Warning: line {lineno} JSON parse error: {exc}")
                    continue
                if (
                    self.resource_type
                    and record.get("resourceType") != self.resource_type
                ):
                    continue
                row = self.translate_record(record)
                if row:
                    results.append(row)
        return results

    def translate_parquet(self, path: Path) -> list[dict[str, Any]]:
        """Read a FHIR Parquet file with Polars and translate each record.

        Each Parquet row is converted to a plain Python dict and passed through
        ``translate_record``.  For complex nested schemas you may need to
        override this method.
        """
        df = pl.read_parquet(path)
        results: list[dict[str, Any]] = []
        for record in df.iter_rows(named=True):
            row = self.translate_record(record)
            if row:
                results.append(row)
        return results

    # ------------------------------------------------------------------
    # Helpers shared by subclasses
    # ------------------------------------------------------------------

    @staticmethod
    def _get(record: dict[str, Any], *keys: str) -> Any:
        """Safely traverse a nested dict by key path.  Returns None if missing."""
        value: Any = record
        for key in keys:
            if not isinstance(value, dict):
                return None
            value = value.get(key)
        return value

    @staticmethod
    def _first_coding_code(codeable_concept: dict | None) -> str | None:
        """Return the first coding.code from a CodeableConcept dict."""
        if not isinstance(codeable_concept, dict):
            return None
        for coding in codeable_concept.get("coding", []):
            code = coding.get("code")
            if code is not None:
                return str(code)
        return None

translate_dict(record)

Translate a single FHIR resource dict.

Source code in src/plugin_rosetta/translators/base.py
def translate_dict(self, record: dict[str, Any]) -> dict[str, Any]:
    """Translate a single FHIR resource dict."""
    return self.translate_record(record)

translate_ndjson(path)

Read a FHIR ndjson file line-by-line and translate each record.

Uses orjson for performance. Lines that fail to parse or that have an unexpected resourceType are skipped with a warning.

Source code in src/plugin_rosetta/translators/base.py
def translate_ndjson(self, path: Path) -> list[dict[str, Any]]:
    """Read a FHIR ndjson file line-by-line and translate each record.

    Uses orjson for performance.  Lines that fail to parse or that have
    an unexpected resourceType are skipped with a warning.
    """
    results: list[dict[str, Any]] = []
    with open(path, "rb") as f:
        for lineno, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                record = orjson.loads(line)
            except orjson.JSONDecodeError as exc:
                print(f"  Warning: line {lineno} JSON parse error: {exc}")
                continue
            if (
                self.resource_type
                and record.get("resourceType") != self.resource_type
            ):
                continue
            row = self.translate_record(record)
            if row:
                results.append(row)
    return results

translate_parquet(path)

Read a FHIR Parquet file with Polars and translate each record.

Each Parquet row is converted to a plain Python dict and passed through translate_record. For complex nested schemas you may need to override this method.

Source code in src/plugin_rosetta/translators/base.py
def translate_parquet(self, path: Path) -> list[dict[str, Any]]:
    """Read a FHIR Parquet file with Polars and translate each record.

    Each Parquet row is converted to a plain Python dict and passed through
    ``translate_record``.  For complex nested schemas you may need to
    override this method.
    """
    df = pl.read_parquet(path)
    results: list[dict[str, Any]] = []
    for record in df.iter_rows(named=True):
        row = self.translate_record(record)
        if row:
            results.append(row)
    return results

translate_record(record) abstractmethod

Translate one FHIR resource dict to one (flat) OMOP row dict.

Returns an empty dict if the record cannot be translated.

Source code in src/plugin_rosetta/translators/base.py
@abstractmethod
def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
    """Translate one FHIR resource dict to one (flat) OMOP row dict.

    Returns an empty dict if the record cannot be translated.
    """
    ...

Translators

PatientTranslator

Bases: FhirToOmopTranslator

Translate a FHIR R4 Patient resource to an OMOP Person row.

Mapping source: PersonMap.fml (HL7 fhir-omop-ig 1.0.0-ballot).

Notes: - person_id is taken from Patient.id (cast to int if numeric, else left as None — the ETL must assign a surrogate key). - Gender concept mapping (FHIR code -> OMOP concept_id) is not resolved here; gender_concept_id is set to 0 (unknown) and gender_source_value carries the raw FHIR code. - Race / ethnicity fields are not present in FHIR Patient R4 in a standard way and are therefore set to 0 (no matching concept).

Source code in src/plugin_rosetta/translators/fhir_to_omop/patient.py
class PatientTranslator(FhirToOmopTranslator):
    """Translate a FHIR R4 Patient resource to an OMOP Person row.

    Mapping source: PersonMap.fml (HL7 fhir-omop-ig 1.0.0-ballot).

    Notes:
    - ``person_id`` is taken from ``Patient.id`` (cast to int if numeric,
      else left as None — the ETL must assign a surrogate key).
    - Gender concept mapping (FHIR code -> OMOP concept_id) is not resolved
      here; ``gender_concept_id`` is set to 0 (unknown) and
      ``gender_source_value`` carries the raw FHIR code.
    - Race / ethnicity fields are not present in FHIR Patient R4 in a standard
      way and are therefore set to 0 (no matching concept).
    """

    resource_type = "Patient"

    def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
        fhir_id = record.get("id", "")

        # Birth date handling
        birth_date: str | None = record.get("birthDate")
        year_of_birth: int | None = None
        month_of_birth: int | None = None
        day_of_birth: int | None = None
        birth_datetime: str | None = None

        if birth_date:
            parts = birth_date.split("-")
            try:
                year_of_birth = int(parts[0]) if len(parts) >= 1 else None
                month_of_birth = int(parts[1]) if len(parts) >= 2 else None
                day_of_birth = int(parts[2]) if len(parts) >= 3 else None
                birth_datetime = (
                    birth_date + "T00:00:00" if len(parts) == 3 else birth_date
                )
            except (ValueError, IndexError):
                pass

        # Gender
        gender_source_value: str | None = record.get("gender")

        return {
            "person_id": None,  # surrogate key must be assigned by caller
            "fhir_id": fhir_id,
            "gender_concept_id": 0,  # requires concept lookup
            "year_of_birth": year_of_birth,
            "month_of_birth": month_of_birth,
            "day_of_birth": day_of_birth,
            "birth_datetime": birth_datetime,
            "race_concept_id": 0,
            "ethnicity_concept_id": 0,
            "gender_source_value": gender_source_value,
            "gender_source_concept_id": 0,
        }

EncounterTranslator

Bases: FhirToOmopTranslator

Translate a FHIR R4 Encounter resource to an OMOP VisitOccurrence row.

Mapping source: EncounterVisit.fml (HL7 fhir-omop-ig 1.0.0-ballot).

Notes: - visit_concept_id is populated from class.coding[0].code but requires a concept lookup to resolve to a standard OMOP concept. Until that lookup is implemented the source code is stored in visit_source_value and visit_concept_id is set to 0. - person_id and visit_occurrence_id are set to None; the caller must assign surrogate keys.

Source code in src/plugin_rosetta/translators/fhir_to_omop/encounter.py
class EncounterTranslator(FhirToOmopTranslator):
    """Translate a FHIR R4 Encounter resource to an OMOP VisitOccurrence row.

    Mapping source: EncounterVisit.fml (HL7 fhir-omop-ig 1.0.0-ballot).

    Notes:
    - ``visit_concept_id`` is populated from ``class.coding[0].code`` but
      requires a concept lookup to resolve to a standard OMOP concept.
      Until that lookup is implemented the source code is stored in
      ``visit_source_value`` and ``visit_concept_id`` is set to 0.
    - ``person_id`` and ``visit_occurrence_id`` are set to None; the caller
      must assign surrogate keys.
    """

    resource_type = "Encounter"

    def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
        fhir_id = record.get("id", "")

        # Visit type from class.coding[0].code
        visit_source_value: str | None = None
        for cls in record.get("class", []):
            code = self._first_coding_code(cls)
            if code:
                visit_source_value = code
                break

        # Period
        actual_period = record.get("actualPeriod", {}) or {}
        visit_start_datetime: str | None = actual_period.get("start")
        visit_end_datetime: str | None = actual_period.get("end")
        visit_start_date: str | None = (
            visit_start_datetime[:10] if visit_start_datetime else None
        )
        visit_end_date: str | None = (
            visit_end_datetime[:10] if visit_end_datetime else None
        )

        # Admission
        admission = record.get("admission", {}) or {}
        admitted_from_source_value: str | None = self._first_coding_code(
            admission.get("admitSource")
        )
        discharged_to_source_value: str | None = self._first_coding_code(
            admission.get("dischargeDisposition")
        )

        return {
            "visit_occurrence_id": None,
            "fhir_id": fhir_id,
            "person_id": None,
            "visit_concept_id": 0,  # requires concept lookup
            "visit_start_date": visit_start_date,
            "visit_start_datetime": visit_start_datetime,
            "visit_end_date": visit_end_date,
            "visit_end_datetime": visit_end_datetime,
            "visit_type_concept_id": 0,
            "visit_source_value": visit_source_value,
            "visit_source_concept_id": 0,
            "admitted_from_concept_id": 0,
            "admitted_from_source_value": admitted_from_source_value,
            "discharged_to_concept_id": 0,
            "discharged_to_source_value": discharged_to_source_value,
        }

ConditionTranslator

Bases: FhirToOmopTranslator

Translate a FHIR R4 Condition resource to an OMOP ConditionOccurrence row.

Mapping source: condition.fml (HL7 fhir-omop-ig 1.0.0-ballot).

Notes: - condition_concept_id requires a terminology lookup from code.coding[0].code; set to 0 until resolved. - Onset and abatement use the dateTime polymorphic variant only. Other onset types (Period, Age, Range, string) are not handled here.

Source code in src/plugin_rosetta/translators/fhir_to_omop/condition.py
class ConditionTranslator(FhirToOmopTranslator):
    """Translate a FHIR R4 Condition resource to an OMOP ConditionOccurrence row.

    Mapping source: condition.fml (HL7 fhir-omop-ig 1.0.0-ballot).

    Notes:
    - ``condition_concept_id`` requires a terminology lookup from
      ``code.coding[0].code``; set to 0 until resolved.
    - Onset and abatement use the ``dateTime`` polymorphic variant only.
      Other onset types (Period, Age, Range, string) are not handled here.
    """

    resource_type = "Condition"

    def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
        fhir_id = record.get("id", "")

        # Condition code
        condition_source_value: str | None = self._first_coding_code(record.get("code"))

        # Start date: prefer recordedDate, fallback to onsetDateTime
        recorded_date: str | None = record.get("recordedDate")
        onset_datetime: str | None = record.get("onsetDateTime")
        start_raw = recorded_date or onset_datetime
        condition_start_date: str | None = start_raw[:10] if start_raw else None
        condition_start_datetime: str | None = onset_datetime or (
            recorded_date + "T00:00:00" if recorded_date else None
        )

        # End date
        abatement_datetime: str | None = record.get("abatementDateTime")
        condition_end_date: str | None = (
            abatement_datetime[:10] if abatement_datetime else None
        )

        # Type concept from category
        category_source_value: str | None = None
        for cat in record.get("category", []):
            code = self._first_coding_code(cat)
            if code:
                category_source_value = code
                break

        # Status
        status_source_value: str | None = self._first_coding_code(
            record.get("clinicalStatus")
        )

        # Source concept from evidence
        evidence_source_value: str | None = None
        for ev in record.get("evidence", []):
            for concept in ev.get("concept", []):
                code = self._first_coding_code(concept)
                if code:
                    evidence_source_value = code
                    break

        return {
            "condition_occurrence_id": None,
            "fhir_id": fhir_id,
            "person_id": None,
            "condition_concept_id": 0,  # requires concept lookup
            "condition_start_date": condition_start_date,
            "condition_start_datetime": condition_start_datetime,
            "condition_end_date": condition_end_date,
            "condition_end_datetime": abatement_datetime,
            "condition_type_concept_id": 0,
            "condition_status_concept_id": 0,
            "condition_source_value": condition_source_value,
            "condition_source_concept_id": 0,
            "condition_status_source_value": status_source_value,
            "condition_type_source_value": category_source_value,
        }

ObservationTranslator

Bases: FhirToOmopTranslator

Translate a FHIR R4 Observation to an OMOP Observation row.

Mapping source: Observation.fml (HL7 fhir-omop-ig 1.0.0-ballot).

Notes: - Records whose category codes are outside OBS_CATEGORIES are skipped (return empty dict). Those should be routed to a MeasurementTranslator. - Polymorphic value[x] is handled for Quantity, CodeableConcept, string.

Source code in src/plugin_rosetta/translators/fhir_to_omop/observation.py
class ObservationTranslator(FhirToOmopTranslator):
    """Translate a FHIR R4 Observation to an OMOP Observation row.

    Mapping source: Observation.fml (HL7 fhir-omop-ig 1.0.0-ballot).

    Notes:
    - Records whose category codes are outside ``OBS_CATEGORIES`` are skipped
      (return empty dict).  Those should be routed to a MeasurementTranslator.
    - Polymorphic ``value[x]`` is handled for Quantity, CodeableConcept, string.
    """

    resource_type = "Observation"

    def _is_observation_category(self, record: dict[str, Any]) -> bool:
        for cat in record.get("category", []):
            for coding in cat.get("coding") or []:
                if coding.get("code") in OBS_CATEGORIES:
                    return True
        return False

    def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
        # Route: only handle non-measurement observations
        if not self._is_observation_category(record):
            return {}

        fhir_id = record.get("id", "")

        # Observation code
        observation_source_value: str | None = self._first_coding_code(
            record.get("code")
        )

        # Effective date/time
        effective_datetime: str | None = record.get("effectiveDateTime") or record.get(
            "effectiveInstant"
        )
        if effective_datetime is None:
            period = record.get("effectivePeriod", {}) or {}
            effective_datetime = period.get("start")
        observation_date: str | None = (
            effective_datetime[:10] if effective_datetime else None
        )

        # Value[x]
        value_as_number: float | None = None
        unit_source_value: str | None = None
        value_as_concept_source: str | None = None
        value_as_string: str | None = None

        if "valueQuantity" in record:
            qty = record["valueQuantity"] or {}
            val = qty.get("value")
            value_as_number = float(val) if val is not None else None
            unit_source_value = qty.get("unit") or qty.get("code")
        elif "valueCodeableConcept" in record:
            value_as_concept_source = self._first_coding_code(
                record["valueCodeableConcept"]
            )
        elif "valueString" in record:
            value_as_string = str(record["valueString"])

        # Note -> observation_source_value override
        for note in record.get("note", []):
            text = note.get("text") if isinstance(note, dict) else str(note)
            if text:
                observation_source_value = text
                break

        return {
            "observation_id": None,
            "fhir_id": fhir_id,
            "person_id": None,
            "observation_concept_id": 0,  # requires concept lookup
            "observation_date": observation_date,
            "observation_datetime": effective_datetime,
            "observation_type_concept_id": 0,
            "value_as_number": value_as_number,
            "value_as_string": value_as_string,
            "value_as_concept_id": 0,
            "unit_concept_id": 0,
            "observation_source_value": observation_source_value,
            "observation_source_concept_id": 0,
            "value_source_value": value_as_concept_source,
            "unit_source_value": unit_source_value,
        }

ProcedureTranslator

Bases: FhirToOmopTranslator

Translate a FHIR R4 Procedure resource to an OMOP ProcedureOccurrence row.

Mapping source: Procedure.fml (HL7 fhir-omop-ig 1.0.0-ballot).

Source code in src/plugin_rosetta/translators/fhir_to_omop/procedure.py
class ProcedureTranslator(FhirToOmopTranslator):
    """Translate a FHIR R4 Procedure resource to an OMOP ProcedureOccurrence row.

    Mapping source: Procedure.fml (HL7 fhir-omop-ig 1.0.0-ballot).
    """

    resource_type = "Procedure"

    def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
        fhir_id = record.get("id", "")

        # Procedure code
        procedure_source_value: str | None = self._first_coding_code(record.get("code"))

        # Occurrence[x] - dateTime or Period
        procedure_datetime: str | None = record.get("occurrenceDateTime")
        procedure_end_datetime: str | None = None

        if procedure_datetime is None:
            period = record.get("occurrencePeriod", {}) or {}
            procedure_datetime = period.get("start")
            procedure_end_datetime = period.get("end")

        procedure_date: str | None = (
            procedure_datetime[:10] if procedure_datetime else None
        )
        procedure_end_date: str | None = (
            procedure_end_datetime[:10] if procedure_end_datetime else None
        )

        return {
            "procedure_occurrence_id": None,
            "fhir_id": fhir_id,
            "person_id": None,
            "procedure_concept_id": 0,  # requires concept lookup
            "procedure_date": procedure_date,
            "procedure_datetime": procedure_datetime,
            "procedure_end_date": procedure_end_date,
            "procedure_end_datetime": procedure_end_datetime,
            "procedure_type_concept_id": 0,
            "procedure_source_value": procedure_source_value,
            "procedure_source_concept_id": 0,
        }

MedicationTranslator

Bases: FhirToOmopTranslator

Translate a FHIR R4 MedicationStatement to an OMOP DrugExposure row.

Mapping source: medication.fml (HL7 fhir-omop-ig 1.0.0-ballot).

Source code in src/plugin_rosetta/translators/fhir_to_omop/medication.py
class MedicationTranslator(FhirToOmopTranslator):
    """Translate a FHIR R4 MedicationStatement to an OMOP DrugExposure row.

    Mapping source: medication.fml (HL7 fhir-omop-ig 1.0.0-ballot).
    """

    resource_type = "MedicationStatement"

    def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
        fhir_id = record.get("id", "")

        # Drug concept from medication.concept.coding[0].code
        medication = record.get("medication") or {}
        drug_source_value: str | None = None
        if isinstance(medication, dict):
            concept = medication.get("concept") or {}
            drug_source_value = self._first_coding_code(concept)

        # Effective[x]
        start_datetime: str | None = record.get("effectiveDateTime")
        end_datetime: str | None = None

        if start_datetime is None:
            period = record.get("effectivePeriod", {}) or {}
            start_datetime = period.get("start")
            end_datetime = period.get("end")

        start_date: str | None = start_datetime[:10] if start_datetime else None
        end_date: str | None = end_datetime[:10] if end_datetime else None

        # Drug type from category
        drug_type_source_value: str | None = None
        for cat in record.get("category", []):
            code = self._first_coding_code(cat)
            if code:
                drug_type_source_value = code
                break

        # Stop reason from reason
        stop_reason: str | None = None
        for reason in record.get("reason", []):
            if isinstance(reason, dict):
                concept = reason.get("concept") or {}
                code = self._first_coding_code(concept)
                if code:
                    stop_reason = code
                    break

        return {
            "drug_exposure_id": None,
            "fhir_id": fhir_id,
            "person_id": None,
            "drug_concept_id": 0,  # requires concept lookup
            "drug_exposure_start_date": start_date,
            "drug_exposure_start_datetime": start_datetime,
            "drug_exposure_end_date": end_date,
            "drug_exposure_end_datetime": end_datetime,
            "verbatim_end_date": end_date,
            "drug_type_concept_id": 0,
            "stop_reason": stop_reason,
            "drug_source_value": drug_source_value,
            "drug_source_concept_id": 0,
            "drug_type_source_value": drug_type_source_value,
        }

ImmunizationTranslator

Bases: FhirToOmopTranslator

Translate a FHIR R4 Immunization resource to an OMOP DrugExposure row.

Mapping source: ImmunizationMap.fml (HL7 fhir-omop-ig 1.0.0-ballot).

Source code in src/plugin_rosetta/translators/fhir_to_omop/immunization.py
class ImmunizationTranslator(FhirToOmopTranslator):
    """Translate a FHIR R4 Immunization resource to an OMOP DrugExposure row.

    Mapping source: ImmunizationMap.fml (HL7 fhir-omop-ig 1.0.0-ballot).
    """

    resource_type = "Immunization"

    def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
        fhir_id = record.get("id", "")

        # Vaccine code
        drug_source_value: str | None = self._first_coding_code(
            record.get("vaccineCode")
        )

        # Occurrence datetime
        occurrence_datetime: str | None = record.get("occurrenceDateTime")
        occurrence_date: str | None = (
            occurrence_datetime[:10] if occurrence_datetime else None
        )

        # Dose quantity
        dose_qty = record.get("doseQuantity") or {}
        quantity: float | None = None
        dose_unit_source_value: str | None = None
        if isinstance(dose_qty, dict):
            val = dose_qty.get("value")
            quantity = float(val) if val is not None else None
            dose_unit_source_value = dose_qty.get("code") or dose_qty.get("unit")

        # Route
        route_source_value: str | None = None
        route = record.get("route") or {}
        if isinstance(route, dict):
            route_source_value = route.get("text") or self._first_coding_code(route)

        # Lot number
        lot_number: str | None = record.get("lotNumber")

        return {
            "drug_exposure_id": None,
            "fhir_id": fhir_id,
            "person_id": None,
            "drug_concept_id": 0,  # requires concept lookup
            "drug_exposure_start_date": occurrence_date,
            "drug_exposure_start_datetime": occurrence_datetime,
            "drug_exposure_end_date": occurrence_date,
            "drug_exposure_end_datetime": occurrence_datetime,
            "drug_type_concept_id": 0,
            "quantity": quantity,
            "route_concept_id": 0,
            "route_source_value": route_source_value,
            "dose_unit_source_value": dose_unit_source_value,
            "lot_number": lot_number,
            "drug_source_value": drug_source_value,
            "drug_source_concept_id": 0,
        }

AllergyTranslator

Bases: FhirToOmopTranslator

Translate a FHIR R4 AllergyIntolerance resource to an OMOP Observation row.

Mapping source: Allergy.fml (HL7 fhir-omop-ig 1.0.0-ballot).

Source code in src/plugin_rosetta/translators/fhir_to_omop/allergy.py
class AllergyTranslator(FhirToOmopTranslator):
    """Translate a FHIR R4 AllergyIntolerance resource to an OMOP Observation row.

    Mapping source: Allergy.fml (HL7 fhir-omop-ig 1.0.0-ballot).
    """

    resource_type = "AllergyIntolerance"

    def translate_record(self, record: dict[str, Any]) -> dict[str, Any]:
        fhir_id = record.get("id", "")

        # Allergy code -> observation_concept_id / observation_source_value
        observation_source_value: str | None = self._first_coding_code(
            record.get("code")
        )

        # Onset datetime
        onset_datetime: str | None = record.get("onsetDateTime")
        observation_date: str | None = onset_datetime[:10] if onset_datetime else None

        # Reaction manifestation -> value_as_concept_id / value_source_value
        value_source_value: str | None = None
        for reaction in record.get("reaction", []):
            if not isinstance(reaction, dict):
                continue
            for manifestation in reaction.get("manifestation", []):
                if not isinstance(manifestation, dict):
                    continue
                concept = manifestation.get("concept") or {}
                code = self._first_coding_code(concept)
                if code:
                    value_source_value = code
                    break
            if value_source_value:
                break

        return {
            "observation_id": None,
            "fhir_id": fhir_id,
            "person_id": None,
            "observation_concept_id": 0,  # requires concept lookup
            "observation_date": observation_date,
            "observation_datetime": onset_datetime,
            "observation_type_concept_id": 0,
            "value_as_concept_id": 0,
            "observation_source_value": observation_source_value,
            "observation_source_concept_id": 0,
            "value_source_value": value_source_value,
        }

I/O — NDJSON

Yield dicts from a FHIR ndjson file, optionally filtered by resourceType.

Parameters:

Name Type Description Default
path Path

Path to the .ndjson file.

required
resource_type str | None

If given, only yield records whose resourceType matches this string.

None

Yields:

Type Description
dict

Parsed FHIR resource dicts.

Source code in src/plugin_rosetta/translators/io/ndjson_reader.py
def iter_ndjson(path: Path, resource_type: str | None = None) -> Iterator[dict]:
    """Yield dicts from a FHIR ndjson file, optionally filtered by resourceType.

    Args:
        path: Path to the ``.ndjson`` file.
        resource_type: If given, only yield records whose ``resourceType``
            matches this string.

    Yields:
        Parsed FHIR resource dicts.
    """
    with open(path, "rb") as f:
        for lineno, line in enumerate(f, start=1):
            line = line.strip()
            if not line:
                continue
            try:
                record: dict = orjson.loads(line)
            except orjson.JSONDecodeError as exc:
                print(f"  Warning [{path.name}:{lineno}] JSON parse error: {exc}")
                continue
            if resource_type is None or record.get("resourceType") == resource_type:
                yield record

Read all records from a FHIR ndjson file into a list.

Convenience wrapper around :func:iter_ndjson.

Source code in src/plugin_rosetta/translators/io/ndjson_reader.py
def read_ndjson(path: Path, resource_type: str | None = None) -> list[dict]:
    """Read all records from a FHIR ndjson file into a list.

    Convenience wrapper around :func:`iter_ndjson`.
    """
    return list(iter_ndjson(path, resource_type=resource_type))

I/O — Parquet

Reading

Container for the result of reading a validated Parquet file.

Source code in src/plugin_rosetta/translators/io/parquet_reader.py
@dataclass
class ParquetReadResult:
    """Container for the result of reading a validated Parquet file."""

    df: pl.DataFrame
    errors: pl.DataFrame = field(default_factory=lambda: pl.DataFrame())

    @property
    def is_valid(self) -> bool:
        return self.errors.is_empty()

Read a Parquet file and return a Polars DataFrame.

No validation is applied; use :func:read_parquet_validated if you need schema validation via nyctea.

Source code in src/plugin_rosetta/translators/io/parquet_reader.py
def read_parquet(path: Path) -> pl.DataFrame:
    """Read a Parquet file and return a Polars DataFrame.

    No validation is applied; use :func:`read_parquet_validated` if you need
    schema validation via nyctea.
    """
    return pl.read_parquet(path)

Read a Parquet file and validate it against a nyctea SchemaModel.

Parameters:

Name Type Description Default
path Path

Path to the Parquet file.

required
schema_model

A nyctea SchemaModel instance describing the expected column schema.

required

Returns:

Name Type Description
A ParquetReadResult

class:ParquetReadResult containing the DataFrame and any

ParquetReadResult

validation errors as a Polars DataFrame.

Source code in src/plugin_rosetta/translators/io/parquet_reader.py
def read_parquet_validated(path: Path, schema_model) -> ParquetReadResult:
    """Read a Parquet file and validate it against a nyctea SchemaModel.

    Args:
        path: Path to the Parquet file.
        schema_model: A nyctea ``SchemaModel`` instance describing the expected
            column schema.

    Returns:
        A :class:`ParquetReadResult` containing the DataFrame and any
        validation errors as a Polars DataFrame.
    """
    from nyctea.engine import validate  # type: ignore[import]

    df = pl.read_parquet(path)
    result = validate(df, schema_model)
    return ParquetReadResult(df=df, errors=result.errors)

Writing

Write OMOP row dicts to a Parquet file.

Parameters:

Name Type Description Default
rows list[dict[str, Any]]

List of dicts, each representing one OMOP table row.

required
path Path

Destination Parquet file path.

required
schema dict[str, DataType] | None

Optional explicit Polars schema to enforce column dtypes. If None, Polars infers dtypes from the data.

None
compression str

Parquet compression codec. Defaults to "zstd".

'zstd'

Returns:

Type Description
DataFrame

The written Polars DataFrame.

Source code in src/plugin_rosetta/translators/io/parquet_writer.py
def write_parquet(
    rows: list[dict[str, Any]],
    path: Path,
    *,
    schema: dict[str, pl.DataType] | None = None,
    compression: str = "zstd",
) -> pl.DataFrame:
    """Write OMOP row dicts to a Parquet file.

    Args:
        rows: List of dicts, each representing one OMOP table row.
        path: Destination Parquet file path.
        schema: Optional explicit Polars schema to enforce column dtypes.
            If None, Polars infers dtypes from the data.
        compression: Parquet compression codec.  Defaults to ``"zstd"``.

    Returns:
        The written Polars DataFrame.
    """
    if schema is not None:
        df = pl.DataFrame(rows, schema=schema)
    else:
        df = pl.DataFrame(rows)
    path.parent.mkdir(parents=True, exist_ok=True)
    df.write_parquet(path, compression=compression)
    return df

Convert OMOP row dicts to a Polars DataFrame without writing to disk.

Source code in src/plugin_rosetta/translators/io/parquet_writer.py
def rows_to_dataframe(
    rows: list[dict[str, Any]],
    schema: dict[str, pl.DataType] | None = None,
) -> pl.DataFrame:
    """Convert OMOP row dicts to a Polars DataFrame without writing to disk."""
    if schema is not None:
        return pl.DataFrame(rows, schema=schema)
    return pl.DataFrame(rows)

Validation

Container for OMOP validation results.

Source code in src/plugin_rosetta/translators/io/omop_validator.py
@dataclass
class ValidationResult:
    """Container for OMOP validation results."""

    df: pl.DataFrame
    errors: pl.DataFrame = field(default_factory=lambda: pl.DataFrame())

    @property
    def is_valid(self) -> bool:
        return self.errors.is_empty()

Validate an OMOP Parquet file against a nyctea SchemaModel.

Parameters:

Name Type Description Default
path Path

Path to the OMOP Parquet file.

required
schema_model Any

A nyctea SchemaModel instance (from plugin_rosetta.translators.schemas.omop_nyctea).

required

Returns:

Name Type Description
A ValidationResult

class:ValidationResult with the DataFrame and validation errors.

Source code in src/plugin_rosetta/translators/io/omop_validator.py
def validate_omop_parquet(path: Path, schema_model: Any) -> ValidationResult:
    """Validate an OMOP Parquet file against a nyctea SchemaModel.

    Args:
        path: Path to the OMOP Parquet file.
        schema_model: A nyctea ``SchemaModel`` instance (from
            ``plugin_rosetta.translators.schemas.omop_nyctea``).

    Returns:
        A :class:`ValidationResult` with the DataFrame and validation errors.
    """
    from nyctea.engine import validate  # type: ignore[import]

    df = pl.read_parquet(path)
    result = validate(df, schema_model)
    return ValidationResult(df=df, errors=result.errors)

Validate an in-memory OMOP Polars DataFrame against a nyctea SchemaModel.

Parameters:

Name Type Description Default
df DataFrame

OMOP table as a Polars DataFrame.

required
schema_model Any

A nyctea SchemaModel instance.

required

Returns:

Name Type Description
A ValidationResult

class:ValidationResult with the DataFrame and validation errors.

Source code in src/plugin_rosetta/translators/io/omop_validator.py
def validate_omop_dataframe(df: pl.DataFrame, schema_model: Any) -> ValidationResult:
    """Validate an in-memory OMOP Polars DataFrame against a nyctea SchemaModel.

    Args:
        df: OMOP table as a Polars DataFrame.
        schema_model: A nyctea ``SchemaModel`` instance.

    Returns:
        A :class:`ValidationResult` with the DataFrame and validation errors.
    """
    from nyctea.engine import validate  # type: ignore[import]

    result = validate(df, schema_model)
    return ValidationResult(df=df, errors=result.errors)

Schema registry

OMOP and FHIR nyctea schema models used for columnar validation.

OMOP schemas

from plugin_rosetta.translators.schemas.omop_nyctea import get_schema

schema = get_schema("person")            # OMOP Person
schema = get_schema("visit_occurrence")  # OMOP VisitOccurrence
schema = get_schema("condition_occurrence")
schema = get_schema("observation")
schema = get_schema("procedure_occurrence")
schema = get_schema("drug_exposure")

FHIR schemas

from plugin_rosetta.translators.schemas.fhir_nyctea import get_schema

schema = get_schema("Patient")
schema = get_schema("Encounter")
schema = get_schema("Condition")
schema = get_schema("Observation")
schema = get_schema("Procedure")
schema = get_schema("MedicationStatement")
schema = get_schema("Immunization")
schema = get_schema("AllergyIntolerance")