Plugin API Reference

This page is the reference for the ADL plugin contract. The prose sections around each block explain when and why to override, and call out the non-obvious behaviours that are easiest to miss.


Plugin Base Class

class adl.core.registries.Plugin

Bases: Instance

Base class for all ADL data-source plugins.

Subclasses must:

  • Set type — a unique string registry key (e.g. "adl_tahmo_plugin").

  • Set label — a human-readable name shown in the admin UI and logs. A blank label raises ImproperlyConfigured on startup.

  • Implement get_station_data().

All other methods have working defaults and can be overridden selectively.

Minimal subclass example:

from adl.core.registries import Plugin

class MyPlugin(Plugin):
    type = "my_plugin"
    label = "My Data Source Plugin"

    def get_station_data(self, station_link, start_date=None, end_date=None):
        client = station_link.network_connection.get_api_client()
        yield from client.get_measurements(
            station_link.station_code,
            start=start_date,
            end=end_date,
        )
get_urls()

Return a list of Django URL patterns to include in the ADL URL configuration.

The default implementation returns an empty list. Override this method if your plugin needs to serve views outside the Wagtail admin namespace. For views that belong inside the Wagtail admin, use the register_admin_urls hook in wagtail_hooks.py instead.

Returns:

A list of Django URL patterns.

Return type:

list

Example:

def get_urls(self):
    from django.urls import path, include
    from . import plugin_urls
    return [
        path("my-plugin/", include(plugin_urls, namespace=self.type)),
    ]
after_save_records(station_link, station_records, saved_records, qc_fail_results=None)

Hook called after each batch of ObservationRecord instances has been upserted for a station.

The default implementation does nothing. Override this method if your plugin needs to trigger side effects after data lands in the database — for example, sending a notification, updating an external cache, or writing a plugin-specific summary log.

Parameters:
  • station_link – The StationLink instance that was just processed.

  • station_records – The raw record dicts as returned by get_station_data() for this station.

  • saved_records – The ObservationRecord instances that were upserted.

  • qc_fail_results – Dict of QC failure messages keyed by UTC ISO timestamp string, or None if no QC checks are configured for this parameter.

get_station_data(station_link, start_date=None, end_date=None)

Fetch raw observations for one station over a time window.

This is the only method subclasses must implement. ADL calls it with a pre-resolved [start_date, end_date) window and expects back an iterable of record dicts that it will normalize, convert, and upsert.

Parameters:
  • station_link – The configured StationLink subclass instance for this station. Provides upstream credentials via station_link.network_connection, the upstream station identifier, variable mappings, and the station timezone.

  • start_date (datetime, optional) – Start of the fetch window, expressed in the station’s local timezone (aware datetime). Fetch records where observation_time >= start_date.

  • end_date (datetime, optional) – End of the fetch window, expressed in the station’s local timezone (aware datetime). Fetch records where observation_time < end_date.

Returns:

An iterable (preferably a generator) of record dicts. Each dict must contain:

  • "observation_time" (datetime) — aware datetimes are used as-is; naive datetimes are interpreted as station-local time.

And may contain any number of source-parameter fields whose keys match mapping.source_parameter_name on the station link’s variable mappings, for example:

{
    "observation_time": datetime(2025, 1, 1, 10, 0, tzinfo=utc),
    "te": 22.4,
    "rh": 78.0,
}

Return type:

Iterable[Dict[str, Any]]

Raises:

NotImplementedError – If not overridden by the subclass.

Note

Prefer yield-ing records one at a time rather than returning a complete list. save_records() processes in chunks of SAVE_CHUNK_SIZE, so a generator avoids loading the entire upstream response into memory — important for large historical backfills.

Warning

Records with a missing or non-datetime observation_time, timestamps outside [start_date, end_date], or future timestamps are silently dropped by save_records(). No exception is raised.

get_default_end_date(station_link)

Return the default end date for a station’s ingestion window.

Computes the station-local “top of the next hour” as a timezone-aware datetime. This is used as end_date for every ingestion run unless overridden.

Parameters:

station_link – The StationLink instance whose timezone is used to localise the result.

Returns:

The top of the next hour in the station’s local timezone, as an aware datetime.

Return type:

datetime

Override this method if your data source reports at a different resolution — for example, to return the end of the current day for a daily-data source.

get_default_start_date(station_link)

Return the default start date for a station’s ingestion window.

Used as the start_date fallback when no prior observations exist in the database and no custom start date is set on the station link. The base implementation returns one hour before get_default_end_date().

Parameters:

station_link – The StationLink instance whose timezone is used to localise the result.

Returns:

One hour before the default end date, in the station’s local timezone, as an aware datetime.

Return type:

datetime

Override this method when your source’s natural polling window differs from one hour. For example, to fall back to the previous 24 hours:

def get_default_start_date(self, station_link):
    return self.get_default_end_date(station_link) - timedelta(days=1)
get_start_date_from_db(station_link)

Return the latest saved observation timestamp for this station and connection.

Queries ObservationRecord for the most recent time value matching (station, network_connection). This is the primary mechanism by which ingestion resumes from where it left off after a restart or gap.

Parameters:

station_link – The StationLink instance identifying the station and connection to query.

Returns:

The latest saved observation time as a UTC-aware datetime, or None if no records exist yet for this station and connection.

Return type:

datetime or None

Override this method when your API uses an inclusive end bound — i.e. it returns the record at exactly the timestamp you request — to avoid re-fetching the boundary record. For example, to add a one-minute offset:

def get_start_date_from_db(self, station_link):
    start_date = super().get_start_date_from_db(station_link)
    if start_date:
        start_date += timedelta(minutes=1)
    return start_date
get_dates_for_station(station_link, latest=False)

Resolve the (start_date, end_date) window to pass to get_station_data().

You should not normally need to override this method. Override the individual helpers (get_default_start_date(), get_default_end_date(), get_start_date_from_db()) instead.

start_date is resolved in the following priority order:

  1. get_start_date_from_db() — resume from the latest saved observation (normal incremental ingestion).

  2. station_link.get_first_collection_date() — the custom backfill start date set on the station link, localised to the station timezone.

  3. get_default_start_date() — final fallback when no prior data exists and no custom date is configured.

When latest=True, steps 1 and 2 are skipped and the default start date is always used. This mode is used when fetching the most recent data on demand rather than resuming normal ingestion.

Both dates are always localised to the station’s timezone before being returned. If start_date equals end_date after resolution, one hour is added to end_date to ensure a non-zero window.

Parameters:
  • station_link – The StationLink instance to resolve dates for.

  • latest (bool) – If True, skip DB and first-collection-date lookups and always use the default start date. Defaults to False.

Returns:

A (start_date, end_date) tuple, both timezone-aware and expressed in the station’s local timezone.

Return type:

Tuple[datetime, datetime]

save_records(station_link, station_records, start_date, end_date, chunk_size=None)

Normalize and upsert raw observation records into ObservationRecord.

Processes station_records in chunks of SAVE_CHUNK_SIZE (or chunk_size if provided) to keep memory usage bounded — works correctly with both lists and generators.

For each record the method:

  1. Validates and normalizes observation_time to a timezone-aware station-local datetime. Records with a missing, non-datetime, out-of-window, or future timestamp are silently dropped.

  2. Iterates the station link’s variable mappings and looks up record[mapping.source_parameter_name] for each one.

  3. Converts the value from mapping.source_parameter_unit to the ADL parameter’s canonical unit if they differ.

  4. Runs any configured QC checks against the value.

  5. Upserts an ObservationRecord row keyed on (time, station, connection, parameter), updating value, is_daily, qc_status, qc_bits, and qc_version on conflict.

You do not normally need to override or call this method directly. It is called automatically by process_station().

Parameters:
  • station_link – The StationLink instance whose variable mappings and timezone are used for normalization.

  • station_records (Iterable[Dict[str, Any]]) – An iterable (list or generator) of raw record dicts as returned by get_station_data().

  • start_date (datetime) – The inclusive start of the accepted time window. Records before this are silently dropped.

  • end_date (datetime) – The inclusive end of the accepted time window. Records after this are silently dropped.

  • chunk_size (int, optional) – Number of records to process per database batch. Defaults to SAVE_CHUNK_SIZE.

Returns:

A three-tuple of (total_saved, earliest_time, latest_time) where total_saved is the number of rows upserted, and earliest_time / latest_time are the observation timestamps of the first and last saved records, or None if no records were saved.

Return type:

Tuple[int, Optional[datetime], Optional[datetime]]

process_station(station_link, initial_start_date=None, initial_end_date=None)

Run the full ingestion pipeline for a single station link.

Resolves the date window, calls get_station_data(), passes the results to save_records(), and writes a StationLinkActivityLog entry regardless of outcome. Any exception raised during fetching or saving is caught, logged, and recorded on the activity log without re-raising, so that a failure on one station does not abort the rest of the connection’s run.

Called by run_process() for each enabled station link. You do not normally need to call or override this method directly.

Parameters:
  • station_link – The StationLink instance to process.

  • initial_start_date (datetime, optional) – Override the resolved start_date with this value if provided. Useful for manual or backfill invocations.

  • initial_end_date (datetime, optional) – Override the resolved end_date with this value if provided.

Returns:

The number of ObservationRecord rows upserted, or 0 if no data was available or an error occurred.

Return type:

int

run_process(network_connection, initial_start_date=None)

Run the ingestion pipeline for all enabled station links on a connection.

Iterates network_connection.station_links.all(), skips any link where enabled is False, and calls process_station() for each remaining link. This is the entry point called by the Celery beat task on the configured schedule.

Parameters:
  • network_connection – The NetworkConnection instance whose station links should be processed.

  • initial_start_date (datetime, optional) – If provided, passed through to every process_station() call as initial_start_date, overriding the normal DB-resume logic for all stations. Useful for bulk backfills.

Returns:

A dict mapping each processed station.id to the number of ObservationRecord rows upserted for that station.

Return type:

Dict[int, int]

perform_qc_checks_with_pipeline(value, variable_mapping, adl_param, station_link, obs_time)

Run the configured QC pipeline against a single observation value.

Looks up the QC checks from variable_mapping.qc_checks if present, falling back to adl_param.qc_checks. If no checks are configured, returns immediately with a NOT_EVALUATED status.

QC pipelines are cached per (parameter_id, modified_at) to avoid rebuilding them on every record. The cache is invalidated automatically when a parameter’s modified_at timestamp changes.

Called internally by save_records() for each mapping row. You do not normally need to call this method directly.

Parameters:
  • value (float) – The converted observation value to check (in the ADL canonical unit for adl_param).

  • variable_mapping – The variable mapping instance for this parameter, used to look up per-mapping QC overrides.

  • adl_param – The DataParameter instance, used as the fallback QC check source and for logging.

  • station_link – The StationLink instance, passed to the QC context builder for station-level metadata.

  • obs_time (datetime) – The timezone-aware observation timestamp, used to fetch recent history for checks that require it (e.g. step, persistence).

Returns:

A three-tuple of (qc_bits, qc_status, qc_messages) where qc_bits is a QCBits flag value, qc_status is a QCStatus choice, and qc_messages is a list of failure message dicts (empty on pass).

Return type:

Tuple[QCBits, QCStatus, list]

What you must implement

The only method your plugin must override is get_station_data(). Everything else has a working default.

Date-window override chain

ADL resolves the (start_date, end_date) window through a chain of calls. Override the individual helpers rather than get_dates_for_station() itself:

Method

Override when…

get_default_end_date()

Your source reports at daily (or other) resolution rather than hourly

get_default_start_date()

Your source’s natural fallback window is larger or smaller than one hour

get_start_date_from_db()

Your API uses an inclusive end bound and you need to offset the boundary to avoid re-fetching the last saved record

Silent failure modes

save_records() drops records silently in the following cases — no exception is raised and nothing appears in the saved count:

  • observation_time is missing, not a :class:datetime, before start_date, after end_date, or in the future.

  • A variable mapping row is missing source_parameter_name, source_parameter_unit, or adl_parameter.

  • The key returned in your record dict does not exactly match mapping.source_parameter_name (case-sensitive).

  • The value for a mapping key is None or not numeric (int or float).

  • Unit conversion raises an exception for a specific mapping row.

If data is disappearing without errors, work through this list first.


NetworkConnection Base Class

class adl.core.models.NetworkConnection(*args, **kwargs)

Bases: PolymorphicModel, ClusterableModel

Configuration for one upstream data integration — credentials, schedule, timezone defaults, and the plugin type that performs ingestion.

NetworkConnection is a Django-polymorphic base class. Each plugin defines a concrete subclass that adds its own credential fields (API keys, FTP details, database URLs, etc.). ADL uses the plugin field to look up the registered Plugin instance at runtime.

Celery Beat triggers collect_data() on the interval set by plugin_processing_interval. Setting plugin_processing_enabled to False pauses ingestion for the entire connection without deleting any configuration.

Important

Plugin subclasses must set station_link_model_string_label to the "app_label.ModelName" string of their StationLink subclass. Without this, station links will not appear in the admin and ingestion will silently do nothing.

Parameters:
  • id (BigAutoField) – Primary key: ID

  • name (CharField) – Name

  • plugin (CharField) – Plugin. Plugin to use for this network

  • plugin_processing_enabled (BooleanField) – Active. If unchecked, the plugin will NOT run automatically

  • plugin_processing_interval (PositiveIntegerField) – Interval. How often the plugin should run, in minutes

  • stations_timezone (TimeZoneField) – Stations Timezone. Default Timezone of the stations in this network connection

  • batch_size (PositiveIntegerField) – Processing Batch Size. Number of stations to process in a single batch

  • is_daily_data (BooleanField) – Is Daily Data. Check to mark data from this connection as daily data

  • sort_order (PositiveIntegerField) – Sort Order. Order in which the connections are displayed

Relationship fields:

Parameters:

Reverse relationships:

Parameters:
  • station_links (Reverse ForeignKey from StationLink) – All station links of this Network Connection (related name of network_connection)

  • observation_records (Reverse ForeignKey from ObservationRecord) – All observation records of this Network Connection (related name of connection)

  • hourlyobsagg (Reverse ForeignKey from HourlyObsAgg) – All hourly obs aggs of this Network Connection (related name of connection)

  • dispatch_channels (Reverse ManyToManyField from DispatchChannel) – All dispatch channels of this Network Connection (related name of network_connections)

Extend the standard model constructor to allow child object lists to be passed in via kwargs

Parameters:
  • id (BigAutoField) – Primary key: ID

  • name (CharField) – Name

  • plugin (CharField) – Plugin. Plugin to use for this network

  • plugin_processing_enabled (BooleanField) – Active. If unchecked, the plugin will NOT run automatically

  • plugin_processing_interval (PositiveIntegerField) – Interval. How often the plugin should run, in minutes

  • stations_timezone (TimeZoneField) – Stations Timezone. Default Timezone of the stations in this network connection

  • batch_size (PositiveIntegerField) – Processing Batch Size. Number of stations to process in a single batch

  • is_daily_data (BooleanField) – Is Daily Data. Check to mark data from this connection as daily data

  • sort_order (PositiveIntegerField) – Sort Order. Order in which the connections are displayed

Relationship fields:

Parameters:

Reverse relationships:

Parameters:
  • station_links (Reverse ForeignKey from StationLink) – All station links of this Network Connection (related name of network_connection)

  • observation_records (Reverse ForeignKey from ObservationRecord) – All observation records of this Network Connection (related name of connection)

  • hourlyobsagg (Reverse ForeignKey from HourlyObsAgg) – All hourly obs aggs of this Network Connection (related name of connection)

  • dispatch_channels (Reverse ManyToManyField from DispatchChannel) – All dispatch channels of this Network Connection (related name of network_connections)

Required class attribute

Important

Every NetworkConnection subclass must set:

.. code-block:: python

station_link_model_string_label = "your_app.YourStationLink"

ADL uses this string to find the correct StationLink subclass when rendering the station link form and running ingestion. If it is missing, station links will not appear in the admin and data collection will silently do nothing.

Extending the admin form

Always extend NetworkConnection.panels rather than replacing it, to keep the standard fields (name, network, plugin selector, interval, timezone) in the form:

panels = NetworkConnection.panels + [
    MultiFieldPanel([
        FieldPanel("api_key"),
        FieldPanel("api_secret"),
    ], heading=_("API Credentials")),
]


plugin_registry

class adl.core.registries.PluginRegistry(allow_instance_override=False)

Bases: Registry

Plugin registry for ADL data-source plugins.

property urls

Returns a list of all the urls that are in the registered instances. They are going to be added to the root url config.

Returns:

The urls of the registered instances.

Return type:

list

The module-level singleton plugin_registry is the instance you interact with directly. Import it and call register inside AppConfig.ready():

from adl.core.registries import plugin_registry


class MyPluginConfig(AppConfig):
    name = "my_plugin"
    
    def ready(self):
        from .plugins import MyPlugin
        plugin_registry.register(MyPlugin())

Important

The type string on your Plugin subclass must be globally unique across all installed plugins. Registering two plugins with the same type will raise an error at startup. Using the Python package name as the type (e.g. "adl_tahmo_plugin") is a safe convention.


DataParameter.convert_value_from_units

DataParameter.convert_value_from_units(value, from_unit)

Called automatically by save_records() for every mapping row where the source unit differs from the parameter’s canonical unit. You do not call this directly, but understanding it helps when debugging unexpected values.

If the parameter has a custom_unit_context configured (for conversions that pint cannot resolve dimensionally — for example, precipitation from mm to kg/m²), that context is applied automatically.


Settings Hook

If your plugin needs to modify Django settings at startup, implement the setup function in <your_plugin>/config/settings/settings.py:

def setup(settings):
    """
    Called after ADL has configured its own settings, before Django starts.
    Modify the ``settings`` object as you would a normal Django settings file.
    """
    settings.INSTALLED_APPS += ["some_required_dependency"]

ADL discovers and calls this function automatically on startup. The file path must follow the convention exactly: src/<plugin_name>/config/settings/settings.py.