Plugin API Reference¶
This page is the reference for the ADL plugin contract. The prose sections around each block explain when and why to override, and call out the non-obvious behaviours that are easiest to miss.
Plugin Base Class¶
- class adl.core.registries.Plugin¶
Bases:
InstanceBase class for all ADL data-source plugins.
Subclasses must:
Set
type— a unique string registry key (e.g."adl_tahmo_plugin").Set
label— a human-readable name shown in the admin UI and logs. A blanklabelraisesImproperlyConfiguredon startup.Implement
get_station_data().
All other methods have working defaults and can be overridden selectively.
Minimal subclass example:
from adl.core.registries import Plugin class MyPlugin(Plugin): type = "my_plugin" label = "My Data Source Plugin" def get_station_data(self, station_link, start_date=None, end_date=None): client = station_link.network_connection.get_api_client() yield from client.get_measurements( station_link.station_code, start=start_date, end=end_date, )
- get_urls()¶
Return a list of Django URL patterns to include in the ADL URL configuration.
The default implementation returns an empty list. Override this method if your plugin needs to serve views outside the Wagtail admin namespace. For views that belong inside the Wagtail admin, use the
register_admin_urlshook inwagtail_hooks.pyinstead.- Returns:
A list of Django URL patterns.
- Return type:
Example:
def get_urls(self): from django.urls import path, include from . import plugin_urls return [ path("my-plugin/", include(plugin_urls, namespace=self.type)), ]
- after_save_records(station_link, station_records, saved_records, qc_fail_results=None)¶
Hook called after each batch of
ObservationRecordinstances has been upserted for a station.The default implementation does nothing. Override this method if your plugin needs to trigger side effects after data lands in the database — for example, sending a notification, updating an external cache, or writing a plugin-specific summary log.
- Parameters:
station_link – The
StationLinkinstance that was just processed.station_records – The raw record dicts as returned by
get_station_data()for this station.saved_records – The
ObservationRecordinstances that were upserted.qc_fail_results – Dict of QC failure messages keyed by UTC ISO timestamp string, or
Noneif no QC checks are configured for this parameter.
- get_station_data(station_link, start_date=None, end_date=None)¶
Fetch raw observations for one station over a time window.
This is the only method subclasses must implement. ADL calls it with a pre-resolved
[start_date, end_date)window and expects back an iterable of record dicts that it will normalize, convert, and upsert.- Parameters:
station_link – The configured
StationLinksubclass instance for this station. Provides upstream credentials viastation_link.network_connection, the upstream station identifier, variable mappings, and the station timezone.start_date (datetime, optional) – Start of the fetch window, expressed in the station’s local timezone (aware datetime). Fetch records where
observation_time >= start_date.end_date (datetime, optional) – End of the fetch window, expressed in the station’s local timezone (aware datetime). Fetch records where
observation_time < end_date.
- Returns:
An iterable (preferably a generator) of record dicts. Each dict must contain:
"observation_time"(datetime) — aware datetimes are used as-is; naive datetimes are interpreted as station-local time.
And may contain any number of source-parameter fields whose keys match
mapping.source_parameter_nameon the station link’s variable mappings, for example:{ "observation_time": datetime(2025, 1, 1, 10, 0, tzinfo=utc), "te": 22.4, "rh": 78.0, }
- Return type:
Iterable[Dict[str, Any]]
- Raises:
NotImplementedError – If not overridden by the subclass.
Note
Prefer
yield-ing records one at a time rather than returning a complete list.save_records()processes in chunks ofSAVE_CHUNK_SIZE, so a generator avoids loading the entire upstream response into memory — important for large historical backfills.Warning
Records with a missing or non-
datetimeobservation_time, timestamps outside[start_date, end_date], or future timestamps are silently dropped bysave_records(). No exception is raised.
- get_default_end_date(station_link)¶
Return the default end date for a station’s ingestion window.
Computes the station-local “top of the next hour” as a timezone-aware datetime. This is used as
end_datefor every ingestion run unless overridden.- Parameters:
station_link – The
StationLinkinstance whose timezone is used to localise the result.- Returns:
The top of the next hour in the station’s local timezone, as an aware datetime.
- Return type:
datetime
Override this method if your data source reports at a different resolution — for example, to return the end of the current day for a daily-data source.
- get_default_start_date(station_link)¶
Return the default start date for a station’s ingestion window.
Used as the
start_datefallback when no prior observations exist in the database and no custom start date is set on the station link. The base implementation returns one hour beforeget_default_end_date().- Parameters:
station_link – The
StationLinkinstance whose timezone is used to localise the result.- Returns:
One hour before the default end date, in the station’s local timezone, as an aware datetime.
- Return type:
datetime
Override this method when your source’s natural polling window differs from one hour. For example, to fall back to the previous 24 hours:
def get_default_start_date(self, station_link): return self.get_default_end_date(station_link) - timedelta(days=1)
- get_start_date_from_db(station_link)¶
Return the latest saved observation timestamp for this station and connection.
Queries
ObservationRecordfor the most recenttimevalue matching(station, network_connection). This is the primary mechanism by which ingestion resumes from where it left off after a restart or gap.- Parameters:
station_link – The
StationLinkinstance identifying the station and connection to query.- Returns:
The latest saved observation time as a UTC-aware datetime, or
Noneif no records exist yet for this station and connection.- Return type:
datetime or None
Override this method when your API uses an inclusive end bound — i.e. it returns the record at exactly the timestamp you request — to avoid re-fetching the boundary record. For example, to add a one-minute offset:
def get_start_date_from_db(self, station_link): start_date = super().get_start_date_from_db(station_link) if start_date: start_date += timedelta(minutes=1) return start_date
- get_dates_for_station(station_link, latest=False)¶
Resolve the
(start_date, end_date)window to pass toget_station_data().You should not normally need to override this method. Override the individual helpers (
get_default_start_date(),get_default_end_date(),get_start_date_from_db()) instead.start_dateis resolved in the following priority order:get_start_date_from_db()— resume from the latest saved observation (normal incremental ingestion).station_link.get_first_collection_date()— the custom backfill start date set on the station link, localised to the station timezone.get_default_start_date()— final fallback when no prior data exists and no custom date is configured.
When
latest=True, steps 1 and 2 are skipped and the default start date is always used. This mode is used when fetching the most recent data on demand rather than resuming normal ingestion.Both dates are always localised to the station’s timezone before being returned. If
start_dateequalsend_dateafter resolution, one hour is added toend_dateto ensure a non-zero window.- Parameters:
station_link – The
StationLinkinstance to resolve dates for.latest (bool) – If
True, skip DB and first-collection-date lookups and always use the default start date. Defaults toFalse.
- Returns:
A
(start_date, end_date)tuple, both timezone-aware and expressed in the station’s local timezone.- Return type:
Tuple[datetime, datetime]
- save_records(station_link, station_records, start_date, end_date, chunk_size=None)¶
Normalize and upsert raw observation records into
ObservationRecord.Processes
station_recordsin chunks ofSAVE_CHUNK_SIZE(orchunk_sizeif provided) to keep memory usage bounded — works correctly with both lists and generators.For each record the method:
Validates and normalizes
observation_timeto a timezone-aware station-local datetime. Records with a missing, non-datetime, out-of-window, or future timestamp are silently dropped.Iterates the station link’s variable mappings and looks up
record[mapping.source_parameter_name]for each one.Converts the value from
mapping.source_parameter_unitto the ADL parameter’s canonical unit if they differ.Runs any configured QC checks against the value.
Upserts an
ObservationRecordrow keyed on(time, station, connection, parameter), updatingvalue,is_daily,qc_status,qc_bits, andqc_versionon conflict.
You do not normally need to override or call this method directly. It is called automatically by
process_station().- Parameters:
station_link – The
StationLinkinstance whose variable mappings and timezone are used for normalization.station_records (Iterable[Dict[str, Any]]) – An iterable (list or generator) of raw record dicts as returned by
get_station_data().start_date (datetime) – The inclusive start of the accepted time window. Records before this are silently dropped.
end_date (datetime) – The inclusive end of the accepted time window. Records after this are silently dropped.
chunk_size (int, optional) – Number of records to process per database batch. Defaults to
SAVE_CHUNK_SIZE.
- Returns:
A three-tuple of
(total_saved, earliest_time, latest_time)wheretotal_savedis the number of rows upserted, andearliest_time/latest_timeare the observation timestamps of the first and last saved records, orNoneif no records were saved.- Return type:
Tuple[int, Optional[datetime], Optional[datetime]]
- process_station(station_link, initial_start_date=None, initial_end_date=None)¶
Run the full ingestion pipeline for a single station link.
Resolves the date window, calls
get_station_data(), passes the results tosave_records(), and writes aStationLinkActivityLogentry regardless of outcome. Any exception raised during fetching or saving is caught, logged, and recorded on the activity log without re-raising, so that a failure on one station does not abort the rest of the connection’s run.Called by
run_process()for each enabled station link. You do not normally need to call or override this method directly.- Parameters:
station_link – The
StationLinkinstance to process.initial_start_date (datetime, optional) – Override the resolved
start_datewith this value if provided. Useful for manual or backfill invocations.initial_end_date (datetime, optional) – Override the resolved
end_datewith this value if provided.
- Returns:
The number of
ObservationRecordrows upserted, or0if no data was available or an error occurred.- Return type:
- run_process(network_connection, initial_start_date=None)¶
Run the ingestion pipeline for all enabled station links on a connection.
Iterates
network_connection.station_links.all(), skips any link whereenabledisFalse, and callsprocess_station()for each remaining link. This is the entry point called by the Celery beat task on the configured schedule.- Parameters:
network_connection – The
NetworkConnectioninstance whose station links should be processed.initial_start_date (datetime, optional) – If provided, passed through to every
process_station()call asinitial_start_date, overriding the normal DB-resume logic for all stations. Useful for bulk backfills.
- Returns:
A dict mapping each processed
station.idto the number ofObservationRecordrows upserted for that station.- Return type:
- perform_qc_checks_with_pipeline(value, variable_mapping, adl_param, station_link, obs_time)¶
Run the configured QC pipeline against a single observation value.
Looks up the QC checks from
variable_mapping.qc_checksif present, falling back toadl_param.qc_checks. If no checks are configured, returns immediately with aNOT_EVALUATEDstatus.QC pipelines are cached per
(parameter_id, modified_at)to avoid rebuilding them on every record. The cache is invalidated automatically when a parameter’smodified_attimestamp changes.Called internally by
save_records()for each mapping row. You do not normally need to call this method directly.- Parameters:
value (float) – The converted observation value to check (in the ADL canonical unit for
adl_param).variable_mapping – The variable mapping instance for this parameter, used to look up per-mapping QC overrides.
adl_param – The
DataParameterinstance, used as the fallback QC check source and for logging.station_link – The
StationLinkinstance, passed to the QC context builder for station-level metadata.obs_time (datetime) – The timezone-aware observation timestamp, used to fetch recent history for checks that require it (e.g. step, persistence).
- Returns:
A three-tuple of
(qc_bits, qc_status, qc_messages)whereqc_bitsis aQCBitsflag value,qc_statusis aQCStatuschoice, andqc_messagesis a list of failure message dicts (empty on pass).- Return type:
What you must implement¶
The only method your plugin must override is get_station_data().
Everything else has a working default.
Date-window override chain¶
ADL resolves the (start_date, end_date) window through a chain of calls.
Override the individual helpers rather than get_dates_for_station()
itself:
Method |
Override when… |
|---|---|
Your source reports at daily (or other) resolution rather than hourly |
|
Your source’s natural fallback window is larger or smaller than one hour |
|
Your API uses an inclusive end bound and you need to offset the boundary to avoid re-fetching the last saved record |
Silent failure modes¶
save_records() drops records silently in
the following cases — no exception is raised and nothing appears in the saved
count:
observation_timeis missing, not a :class:datetime, beforestart_date, afterend_date, or in the future.A variable mapping row is missing
source_parameter_name,source_parameter_unit, oradl_parameter.The key returned in your record dict does not exactly match
mapping.source_parameter_name(case-sensitive).The value for a mapping key is
Noneor not numeric (intorfloat).Unit conversion raises an exception for a specific mapping row.
If data is disappearing without errors, work through this list first.
NetworkConnection Base Class¶
- class adl.core.models.NetworkConnection(*args, **kwargs)¶
Bases:
PolymorphicModel,ClusterableModelConfiguration for one upstream data integration — credentials, schedule, timezone defaults, and the plugin type that performs ingestion.
NetworkConnectionis a Django-polymorphic base class. Each plugin defines a concrete subclass that adds its own credential fields (API keys, FTP details, database URLs, etc.). ADL uses thepluginfield to look up the registeredPlugininstance at runtime.Celery Beat triggers
collect_data()on the interval set byplugin_processing_interval. Settingplugin_processing_enabledtoFalsepauses ingestion for the entire connection without deleting any configuration.Important
Plugin subclasses must set
station_link_model_string_labelto the"app_label.ModelName"string of theirStationLinksubclass. Without this, station links will not appear in the admin and ingestion will silently do nothing.- Parameters:
id (BigAutoField) – Primary key: ID
name (CharField) – Name
plugin (CharField) – Plugin. Plugin to use for this network
plugin_processing_enabled (BooleanField) – Active. If unchecked, the plugin will NOT run automatically
plugin_processing_interval (PositiveIntegerField) – Interval. How often the plugin should run, in minutes
stations_timezone (TimeZoneField) – Stations Timezone. Default Timezone of the stations in this network connection
batch_size (PositiveIntegerField) – Processing Batch Size. Number of stations to process in a single batch
is_daily_data (BooleanField) – Is Daily Data. Check to mark data from this connection as daily data
sort_order (PositiveIntegerField) – Sort Order. Order in which the connections are displayed
Relationship fields:
- Parameters:
polymorphic_ctype (
ForeignKeytoContentType) – Polymorphic ctype (related name:networkconnection_set+)network (
ForeignKeytoNetwork) – Network (related name:networkconnection)
Reverse relationships:
- Parameters:
station_links (Reverse
ForeignKeyfromStationLink) – All station links of this Network Connection (related name ofnetwork_connection)observation_records (Reverse
ForeignKeyfromObservationRecord) – All observation records of this Network Connection (related name ofconnection)hourlyobsagg (Reverse
ForeignKeyfromHourlyObsAgg) – All hourly obs aggs of this Network Connection (related name ofconnection)dispatch_channels (Reverse
ManyToManyFieldfromDispatchChannel) – All dispatch channels of this Network Connection (related name ofnetwork_connections)
Extend the standard model constructor to allow child object lists to be passed in via kwargs
- Parameters:
id (BigAutoField) – Primary key: ID
name (CharField) – Name
plugin (CharField) – Plugin. Plugin to use for this network
plugin_processing_enabled (BooleanField) – Active. If unchecked, the plugin will NOT run automatically
plugin_processing_interval (PositiveIntegerField) – Interval. How often the plugin should run, in minutes
stations_timezone (TimeZoneField) – Stations Timezone. Default Timezone of the stations in this network connection
batch_size (PositiveIntegerField) – Processing Batch Size. Number of stations to process in a single batch
is_daily_data (BooleanField) – Is Daily Data. Check to mark data from this connection as daily data
sort_order (PositiveIntegerField) – Sort Order. Order in which the connections are displayed
Relationship fields:
- Parameters:
polymorphic_ctype (
ForeignKeytoContentType) – Polymorphic ctype (related name:networkconnection_set+)network (
ForeignKeytoNetwork) – Network (related name:networkconnection)
Reverse relationships:
- Parameters:
station_links (Reverse
ForeignKeyfromStationLink) – All station links of this Network Connection (related name ofnetwork_connection)observation_records (Reverse
ForeignKeyfromObservationRecord) – All observation records of this Network Connection (related name ofconnection)hourlyobsagg (Reverse
ForeignKeyfromHourlyObsAgg) – All hourly obs aggs of this Network Connection (related name ofconnection)dispatch_channels (Reverse
ManyToManyFieldfromDispatchChannel) – All dispatch channels of this Network Connection (related name ofnetwork_connections)
Required class attribute¶
Important
Every NetworkConnection subclass must set:
.. code-block:: python
station_link_model_string_label = "your_app.YourStationLink"
ADL uses this string to find the correct StationLink subclass when
rendering the station link form and running ingestion. If it is missing,
station links will not appear in the admin and data collection will silently
do nothing.
Extending the admin form¶
Always extend NetworkConnection.panels rather than replacing it, to keep
the standard fields (name, network, plugin selector, interval, timezone) in
the form:
panels = NetworkConnection.panels + [
MultiFieldPanel([
FieldPanel("api_key"),
FieldPanel("api_secret"),
], heading=_("API Credentials")),
]
StationLink Base Class¶
- class adl.core.models.StationLink(*args, **kwargs)¶
Bases:
PolymorphicModel,ClusterableModelBinds one
Stationto oneNetworkConnectionand holds all per-station ingestion configuration.StationLinkis a Django-polymorphic base class. Each plugin defines a concrete subclass that adds the upstream station identifier (e.g. a TAHMO station code, an FTP filename pattern) and per-station variable mappings.The effective timezone for a station link is resolved by the
timezoneproperty: it returns the connection’sstations_timezonewhenuse_connection_timezoneisTrue, otherwise the per-stationtimezone_info. ADL uses this timezone when computing date windows and normalizing observation timestamps.Setting
enabledtoFalsecausesrun_process()to skip this station without affecting any other stations on the same connection.- Parameters:
id (BigAutoField) – Primary key: ID
enabled (BooleanField) – Enabled. If unchecked, this station will not be processed
use_connection_timezone (BooleanField) – Use Connection Timezone. If checked, the station will use the timezone from the network connection. If unchecked, it will use the station’s set timezone that will be appear below
timezone_info (TimeZoneField) – Station Timezone. Timezone used by the station for recording observations
aggregate_from_date (DateTimeField) – Aggregation Start Date. Date to start aggregation from. Leave empty to use the current date and time
modified_at (DateTimeField) – Modified at
Relationship fields:
- Parameters:
polymorphic_ctype (
ForeignKeytoContentType) – Polymorphic ctype (related name:stationlink_set+)network_connection (
ForeignKeytoNetworkConnection) – Network Connection (related name:station_links)station (
ForeignKeytoStation) – Station (related name:stationlink)
Reverse relationships:
- Parameters:
dispatch_channel_links (Reverse
ForeignKeyfromDispatchChannelStationLink) – All dispatch channel links of this station link (related name ofstation_link)stationlinkactivitylog (Reverse
ForeignKeyfromStationLinkActivityLog) – All station link activity logs of this station link (related name ofstation_link)
Extend the standard model constructor to allow child object lists to be passed in via kwargs
- Parameters:
id (BigAutoField) – Primary key: ID
enabled (BooleanField) – Enabled. If unchecked, this station will not be processed
use_connection_timezone (BooleanField) – Use Connection Timezone. If checked, the station will use the timezone from the network connection. If unchecked, it will use the station’s set timezone that will be appear below
timezone_info (TimeZoneField) – Station Timezone. Timezone used by the station for recording observations
aggregate_from_date (DateTimeField) – Aggregation Start Date. Date to start aggregation from. Leave empty to use the current date and time
modified_at (DateTimeField) – Modified at
Relationship fields:
- Parameters:
polymorphic_ctype (
ForeignKeytoContentType) – Polymorphic ctype (related name:stationlink_set+)network_connection (
ForeignKeytoNetworkConnection) – Network Connection (related name:station_links)station (
ForeignKeytoStation) – Station (related name:stationlink)
Reverse relationships:
- Parameters:
dispatch_channel_links (Reverse
ForeignKeyfromDispatchChannelStationLink) – All dispatch channel links of this station link (related name ofstation_link)stationlinkactivitylog (Reverse
ForeignKeyfromStationLinkActivityLog) – All station link activity logs of this station link (related name ofstation_link)
- property timezone¶
Returns the timezone for the station link. If use_connection_timezone is True, it returns the timezone from the network connection. Otherwise, it returns the station’s timezone.
- get_variable_mappings()¶
Returns the variable mappings for the station link. This method should be overridden in subclasses to provide specific mappings.
- get_first_collection_date()¶
Returns the first collection date for the station link. This method should be overridden in subclasses to provide specific logic.
Variable mapping contract¶
Your per-variable mapping model (an Orderable with a ParentalKey on
the station link) must expose these three attributes. They can be model
fields, properties, or a mix:
Attribute |
Type |
Description |
|---|---|---|
|
|
The key in the record dict returned by |
|
|
The unit the upstream value is expressed in |
|
|
The ADL canonical parameter to store the value under |
If any of the three is missing or None on a mapping row, that mapping is
skipped silently for every record.
Extending the admin form¶
Always extend StationLink.panels to keep the standard fields (connection,
station, enabled toggle, timezone options):
panels = StationLink.panels + [
FieldPanel("my_station_code", widget=MyStationSelectWidget),
FieldPanel("start_date"),
InlinePanel("variable_mappings", label=_("Variable Mappings")),
]
plugin_registry¶
- class adl.core.registries.PluginRegistry(allow_instance_override=False)¶
Bases:
RegistryPlugin registry for ADL data-source plugins.
The module-level singleton plugin_registry is the instance you interact
with directly. Import it and call register inside AppConfig.ready():
from adl.core.registries import plugin_registry
class MyPluginConfig(AppConfig):
name = "my_plugin"
def ready(self):
from .plugins import MyPlugin
plugin_registry.register(MyPlugin())
Important
The type string on your Plugin subclass must be globally unique
across all installed plugins. Registering two plugins with the same type
will raise an error at startup. Using the Python package name as the type
(e.g. "adl_tahmo_plugin") is a safe convention.
DataParameter.convert_value_from_units¶
- DataParameter.convert_value_from_units(value, from_unit)¶
Called automatically by save_records() for
every mapping row where the source unit differs from the parameter’s canonical
unit. You do not call this directly, but understanding it helps when debugging
unexpected values.
If the parameter has a custom_unit_context configured (for conversions
that pint cannot resolve dimensionally — for example, precipitation from
mm to kg/m²), that context is applied automatically.
Settings Hook¶
If your plugin needs to modify Django settings at startup, implement the
setup function in <your_plugin>/config/settings/settings.py:
def setup(settings):
"""
Called after ADL has configured its own settings, before Django starts.
Modify the ``settings`` object as you would a normal Django settings file.
"""
settings.INSTALLED_APPS += ["some_required_dependency"]
ADL discovers and calls this function automatically on startup. The file path
must follow the convention exactly:
src/<plugin_name>/config/settings/settings.py.