#!/usr/bin/env python
# pywws - Python software for USB Wireless Weather Stations
# http://github.com/jim-easterbrook/pywws
# Copyright (C) 2008-16 pywws contributors
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Post weather update to services such as Weather Underground
::
%s
Introduction
------------
There are an increasing number of web sites around the world that
encourage amateur weather station owners to upload data over the
internet.
This module enables pywws to upload readings to these organisations.
It is highly customisable using configuration files. Each 'service'
requires a configuration file and one or two templates in
``pywws/services`` (that should not need to be edited by the user) and
a section in ``weather.ini`` containing user specific data such as
your site ID and password.
See :ref:`How to integrate pywws with various weather services
<guides-integration-other>` for details of the available services.
Configuration
-------------
If you haven't already done so, visit the organisation's web site and
create an account for your weather station. Make a note of any site ID
and password details you are given.
Stop any pywws software that is running and then run ``toservice`` to
create a section in ``weather.ini``::
python -m pywws.toservice data_dir service_name
``service_name`` is the single word service name used by pywws, such
as ``metoffice``, ``data_dir`` is your weather data directory, as
usual.
Edit ``weather.ini`` and find the section corresponding to the service
name, e.g. ``[underground]``. Copy your site details into this
section, for example::
[underground]
password = secret
station = ABCDEFG1A
Now you can test your configuration::
python -m pywws.toservice -vvv data_dir service_name
This should show you the data string that is uploaded. Any failure
should generate an error message.
Upload old data
---------------
Now you can upload your last 7 days' data, if the service supports it.
Run ``toservice`` with the catchup option::
python -m pywws.toservice -cvv data_dir service_name
This may take 20 minutes or more, depending on how much data you have.
Add service(s) upload to regular tasks
--------------------------------------
Edit your ``weather.ini`` again, and add a list of services to the
``[live]``, ``[logged]``, ``[hourly]``, ``[12 hourly]`` or ``[daily]``
section, depending on how often you want to send data. For example::
[live]
twitter = []
plot = []
text = []
services = ['underground_rf', 'cwop']
[logged]
twitter = []
plot = []
text = []
services = ['metoffice', 'cwop']
[hourly]
twitter = []
plot = []
text = []
services = ['underground']
Note that the ``[live]`` section is only used when running
:py:mod:`pywws.LiveLog`. It is a good idea to repeat any
service selected in ``[live]`` in the ``[logged]`` or ``[hourly]``
section in case you switch to running :py:mod:`pywws.Hourly`.
Restart your regular pywws program (:py:mod:`pywws.Hourly` or
:py:mod:`pywws.LiveLog`) and visit the appropriate web site to
see regular updates from your weather station.
Using a different template
--------------------------
For some services (mainly MQTT) you might want to write your own
template to give greater control over the uploaded data. Copy the
default template file from ``pywws/services`` to your template directory
and then edit it to do what you want. Now edit ``weather.ini`` and
change the ``template`` value from ``default`` to the name of your
custom template.
API
---
"""
__docformat__ = "restructuredtext en"
__usage__ = """
usage: python -m pywws.toservice [options] data_dir service_name
options are:
-h or --help display this help
-c or --catchup upload all data since last upload
-v or --verbose increase amount of reassuring messages
data_dir is the root directory of the weather data
service_name is the service to upload to, e.g. underground
"""
__doc__ %= __usage__
__usage__ = __doc__.split('\n')[0] + __usage__
import base64
from configparser import SafeConfigParser
from datetime import datetime, timedelta
import getopt
import logging
import os
import pkg_resources
import re
import socket
import io
import sys
import urllib.request, urllib.parse, urllib.error
import urllib.request, urllib.error, urllib.parse
import urllib.parse
from pywws import DataStore
from pywws.Logger import ApplicationLogger
from pywws import Template
from pywws import __version__
PARENT_MARGIN = timedelta(minutes=2)
[docs]class ToService(object):
"""Upload weather data to weather services such as Weather
Underground.
"""
def __init__(self, params, status, calib_data, service_name):
"""
:param params: pywws configuration.
:type params: :class:`pywws.DataStore.params`
:param status: pywws status store.
:type status: :class:`pywws.DataStore.status`
:param calib_data: 'calibrated' data.
:type calib_data: :class:`pywws.DataStore.calib_store`
:param service_name: name of service to upload to.
:type service_name: string
"""
self.logger = logging.getLogger('pywws.ToService(%s)' % service_name)
self.params = params
self.status = status
self.data = calib_data
self.service_name = service_name
# 'derived' services such as 'underground_rf' share their
# parent's config and templates
config_section = self.service_name.split('_')[0]
if config_section == self.service_name:
self.parent = None
else:
self.parent = config_section
self.old_response = None
self.old_ex = None
self.http_headers = None
# set default socket timeout, so urlopen calls don't hang forever
if eval(self.params.get('config', 'asynchronous', 'False')):
socket.setdefaulttimeout(60)
else:
socket.setdefaulttimeout(20)
# open params file
service_params = SafeConfigParser()
service_params.optionxform = str
param_string = pkg_resources.resource_string(
'pywws', 'services/%s.ini' % (self.service_name))
if sys.version_info[0] >= 3:
param_string = param_string.decode('utf-8')
service_params.readfp(io.StringIO(param_string))
# get URL
self.server = service_params.get('config', 'url')
parsed_url = urllib.parse.urlsplit(self.server)
if parsed_url.scheme == 'aprs':
self.send_data = self.aprs_send_data
server, port = parsed_url.netloc.split(':')
self.server = (server, int(port))
elif parsed_url.scheme == 'mqtt':
self.send_data = self.mqtt_send_data
else:
self.send_data = self.http_send_data
self.use_get = eval(service_params.get('config', 'use get'))
# get fixed part of upload data
self.fixed_data = dict()
for name, value in service_params.items('fixed'):
if value[0] == '*':
value = self.params.get(config_section, value[1:], 'unknown')
self.fixed_data[name] = value
# create templater
self.templater = Template.Template(
self.params, self.status, self.data, self.data, None, None,
use_locale=False)
template_name = self.params.get(config_section, 'template', 'default')
if template_name != 'default':
template_dir = self.params.get(
'paths', 'templates', os.path.expanduser('~/weather/templates/'))
self.template_file = open(
os.path.join(template_dir, template_name), 'rb')
else:
template_name = 'services/%s_template_%s.txt' % (
config_section, self.params.get('config', 'ws type'))
if not pkg_resources.resource_exists('pywws', template_name):
template_name = 'services/%s_template_1080.txt' % (config_section)
self.template_file = pkg_resources.resource_stream(
'pywws', template_name)
# get other parameters
self.auth_type = service_params.get('config', 'auth_type')
if self.auth_type == 'basic':
user = self.params.get(config_section, 'user', 'unknown')
password = self.params.get(config_section, 'password', 'unknown')
details = user + ':' + password
self.auth = 'Basic ' + base64.b64encode(details.encode('utf-8')).decode('utf-8')
elif self.auth_type == 'mqtt':
self.user = self.params.get(config_section, 'user', 'unknown')
self.password = self.params.get(config_section, 'password', 'unknown')
self.catchup = eval(service_params.get('config', 'catchup'))
self.expected_result = eval(service_params.get('config', 'result'))
self.interval = eval(service_params.get('config', 'interval'))
self.interval = max(self.interval, 40)
self.interval = timedelta(seconds=self.interval)
if service_params.has_option('config', 'http_headers'):
self.http_headers = eval(service_params.get('config', 'http_headers'))
# move 'last update' from params to status
last_update = self.params.get_datetime(self.service_name, 'last update')
if last_update:
self.params.unset(self.service_name, 'last update')
self.status.set(
'last update', self.service_name, last_update.isoformat(' '))
# set timestamp of first data to upload
self.next_update = datetime.utcnow() - max(
timedelta(days=self.catchup), self.interval)
self.next_update = min(self.next_update, self.data.before(datetime.max))
[docs] def prepare_data(self, data):
"""Prepare a weather data record.
The :obj:`data` parameter contains the data to be encoded. It
should be a 'calibrated' data record, as stored in
:class:`pywws.DataStore.calib_store`. The relevant data items
are extracted and converted to strings using a template, then
merged with the station's "fixed" data.
:param data: the weather data record.
:type data: dict
:return: dict.
:rtype: string
"""
# check we have external data
if data['temp_out'] is None:
return None
# convert data
data_str = self.templater.make_text(self.template_file, data)
self.template_file.seek(0)
if not data_str:
return None
prepared_data = eval(data_str)
prepared_data.update(self.fixed_data)
return prepared_data
[docs] def mqtt_send_data(self, timestamp, prepared_data, ignore_last_update=False):
import paho.mqtt.client as mosquitto
import time
import json
topic = prepared_data['topic']
hostname = prepared_data['hostname']
port = prepared_data['port']
client_id = prepared_data['client_id']
retain = prepared_data['retain'] == 'True'
auth = prepared_data['auth'] == 'True'
multi_topic = prepared_data['multi_topic'] == 'True'
# clean up the object
del prepared_data['topic']
del prepared_data['hostname']
del prepared_data['port']
del prepared_data['client_id']
del prepared_data['retain']
del prepared_data['auth']
del prepared_data['multi_topic']
mosquitto_client = mosquitto.Client(client_id, protocol=mosquitto.MQTTv31)
if auth:
self.logger.debug("Username and password configured")
if(self.password == "unknown"):
mosquitto_client.username_pw_set(self.user)
else:
mosquitto_client.username_pw_set(self.user, self.password)
else:
self.logger.debug("Username and password unconfigured, ignoring")
self.logger.debug(
"timestamp: %s. publishing on topic [%s] to hostname [%s] and " +
"port [%s] with a client_id [%s] and retain is %s",
timestamp.isoformat(' '), topic, hostname, port, client_id, retain)
mosquitto_client.connect(hostname, int(port))
mosquitto_client.publish(topic, json.dumps(prepared_data), retain=retain)
if multi_topic:
#Publish a messages, one for each item in prepared_data to separate Subtopics.
for item in prepared_data:
if prepared_data[item] == '':
prepared_data[item] = 'None'
mosquitto_client.publish(topic + "/" + item, prepared_data[item], retain=retain)
#Need to make sure the messages have been flushed to the server.
mosquitto_client.loop(timeout=0.5)
self.logger.debug("published data: %s", prepared_data)
mosquitto_client.disconnect()
return True
[docs] def aprs_send_data(self, timestamp, prepared_data, ignore_last_update=False):
"""Upload a weather data record using APRS.
The :obj:`prepared_data` parameter contains the data to be uploaded.
It should be a dictionary of string keys and string values.
:param timestamp: the timestamp of the data to upload.
:type timestamp: datetime
:param prepared_data: the data to upload.
:type prepared_data: dict
:param ignore_last_update: don't get or set the 'last update'
status.ini entry.
:type ignore_last_update: bool
:return: success status
:rtype: bool
"""
login = 'user %s pass %s vers pywws %s\n' % (
prepared_data['designator'], prepared_data['passcode'], __version__)
packet = '%s>APRS,TCPIP*:@%sz%s/%s_%s/%sg%st%sr%sP%sb%sh%s.pywws-%s\n' % (
prepared_data['designator'], prepared_data['idx'],
prepared_data['latitude'], prepared_data['longitude'],
prepared_data['wind_dir'], prepared_data['wind_ave'],
prepared_data['wind_gust'], prepared_data['temp_out'],
prepared_data['rain_hour'], prepared_data['rain_day'],
prepared_data['rel_pressure'], prepared_data['hum_out'],
__version__
)
self.logger.debug('packet: "%s"', packet)
login = login.encode('ASCII')
packet = packet.encode('ASCII')
sock = socket.socket()
try:
sock.connect(self.server)
try:
response = sock.recv(4096)
self.logger.debug('server software: %s', response.strip())
sock.sendall(login)
response = sock.recv(4096)
self.logger.debug('server login ack: %s', response.strip())
sock.sendall(packet)
sock.shutdown(socket.SHUT_RDWR)
finally:
sock.close()
except Exception as ex:
new_ex = str(ex)
if new_ex == self.old_ex:
log = self.logger.debug
else:
log = self.logger.error
self.old_ex = new_ex
log('exc: %s', new_ex)
return False
if not ignore_last_update:
self.set_last_update(timestamp)
return True
[docs] def http_send_data(self, timestamp, prepared_data, ignore_last_update=False):
"""Upload a weather data record using HTTP.
The :obj:`prepared_data` parameter contains the data to be uploaded.
It should be a dictionary of string keys and string values.
:param timestamp: the timestamp of the data to upload.
:type timestamp: datetime
:param prepared_data: the data to upload.
:type prepared_data: dict
:param ignore_last_update: don't get or set the 'last update'
status.ini entry.
:type ignore_last_update: bool
:return: success status
:rtype: bool
"""
coded_data = urllib.parse.urlencode(prepared_data)
self.logger.debug(coded_data)
new_ex = self.old_ex
ex_info = []
success = False
try:
if self.use_get:
request = urllib.request.Request(self.server + '?' + coded_data)
else:
request = urllib.request.Request(self.server, coded_data.encode('ASCII'))
if self.auth_type == 'basic':
request.add_header('Authorization', self.auth)
if self.http_headers is not None:
for header in self.http_headers:
request.add_header(header[0], header[1])
rsp = urllib.request.urlopen(request)
response = rsp.readlines()
rsp.close()
if response == self.old_response:
log = self.logger.debug
else:
log = self.logger.error
self.old_response = response
for line in response:
log('rsp: %s', line.strip())
for n, expected in enumerate(self.expected_result):
if n < len(response):
actual = response[n].decode('utf-8')
if not re.match(expected, actual):
break
else:
self.old_response = response
if not ignore_last_update:
self.set_last_update(timestamp)
return True
return False
except urllib.error.HTTPError as ex:
if ex.code == 429 and self.service_name == 'metoffice':
# UK Met Office server uses 429 to signal duplicate data
success = True
if sys.version_info >= (2, 7):
new_ex = '[%d]%s' % (ex.code, ex.reason)
else:
new_ex = str(ex)
ex_info = str(ex.info()).split('\n')
try:
for line in ex.readlines():
line = line.decode('utf-8')
ex_info.append(re.sub('<.+?>', '', line))
except Exception:
pass
except urllib.error.URLError as ex:
new_ex = str(ex.reason)
except Exception as ex:
new_ex = str(ex)
if new_ex == self.old_ex:
log = self.logger.debug
else:
log = self.logger.error
self.old_ex = new_ex
log('exc: %s', new_ex)
for extra in ex_info:
extra = extra.strip()
if extra:
log('info: %s', extra)
if success and not ignore_last_update:
self.set_last_update(timestamp)
return success
[docs] def next_data(self, catchup, live_data, ignore_last_update=False):
"""Get weather data records to upload.
This method returns either the most recent weather data
record, or all records since the last upload, according to
the value of :obj:`catchup`.
:param catchup: ``True`` to get all records since last upload,
or ``False`` to get most recent data only.
:type catchup: boolean
:param live_data: a current 'live' data record, or ``None``.
:type live_data: dict
:param ignore_last_update: don't get the 'last update'
status.ini entry.
:type ignore_last_update: bool
:return: yields weather data records.
:rtype: dict
"""
if ignore_last_update:
last_update = None
else:
last_update = self.status.get_datetime(
'last update', self.service_name)
if last_update:
self.next_update = max(self.next_update,
last_update + self.interval)
if catchup:
start = self.next_update
else:
start = self.data.before(datetime.max)
if live_data:
stop = live_data['idx'] - self.interval
else:
stop = None
for data in self.data[start:stop]:
if data['idx'] >= self.next_update:
self.next_update = data['idx'] + self.interval
yield data
if live_data and live_data['idx'] >= self.next_update:
self.next_update = live_data['idx'] + self.interval
yield live_data
[docs] def set_last_update(self, timestamp):
self.status.set(
'last update', self.service_name, timestamp.isoformat(' '))
if self.parent:
last_update = self.status.get_datetime('last update', self.parent)
if last_update and last_update >= timestamp - PARENT_MARGIN:
self.status.set('last update', self.parent,
(timestamp + PARENT_MARGIN).isoformat(' '))
[docs] def Upload(self, catchup=True, live_data=None, ignore_last_update=False):
"""Upload one or more weather data records.
This method uploads either the most recent weather data
record, or all records since the last upload (up to 7 days),
according to the value of :obj:`catchup`.
It sets the ``last update`` configuration value to the time
stamp of the most recent record successfully uploaded.
:param catchup: upload all data since last upload.
:type catchup: bool
:param live_data: current 'live' data. If not present the most
recent logged data is uploaded.
:type live_data: dict
:param ignore_last_update: don't get or set the 'last update'
status.ini entry.
:type ignore_last_update: bool
:return: success status
:rtype: bool
"""
count = 0
for data in self.next_data(catchup, live_data, ignore_last_update):
prepared_data = self.prepare_data(data)
if not prepared_data:
continue
if not self.send_data(data['idx'], prepared_data, ignore_last_update):
return False
count += 1
if count > 1:
self.logger.info('%d records sent', count)
return True
[docs]def main(argv=None):
if argv is None:
argv = sys.argv
try:
opts, args = getopt.getopt(
argv[1:], "hcv", ['help', 'catchup', 'verbose'])
except getopt.error as msg:
print('Error: %s\n' % msg, file=sys.stderr)
print(__usage__.strip(), file=sys.stderr)
return 1
# process options
catchup = False
verbose = 0
for o, a in opts:
if o == '-h' or o == '--help':
print(__usage__.strip())
return 0
elif o == '-c' or o == '--catchup':
catchup = True
elif o == '-v' or o == '--verbose':
verbose += 1
# check arguments
if len(args) != 2:
print("Error: 2 arguments required", file=sys.stderr)
print(__usage__.strip(), file=sys.stderr)
return 2
logger = ApplicationLogger(verbose)
return ToService(
DataStore.params(args[0]), DataStore.status(args[0]),
DataStore.calib_store(args[0]), args[1]).Upload(
catchup=catchup, ignore_last_update=not catchup)
if __name__ == "__main__":
sys.exit(main())