# pywws - Python software for USB Wireless Weather Stations
# http://github.com/jim-easterbrook/pywws
# Copyright (C) 2018-20 pywws contributors
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Base classes for "service" uploaders.
.. inheritance-diagram:: CatchupDataService FileService LiveDataService
:top-classes: pywws.service.ServiceBase
"""
from __future__ import absolute_import, print_function, unicode_literals
from ast import literal_eval
from collections import deque
from datetime import datetime, timedelta
import os
import sys
import threading
if sys.version_info[0] >= 3:
from io import StringIO
else:
from StringIO import StringIO
import pywws
from pywws.constants import SECOND
import pywws.logger
import pywws.storage
import pywws.template
[docs]class Queue(deque):
def __init__(self, start, *arg, **kw):
super(Queue, self).__init__(*arg, **kw)
self._start = start
[docs] def append(self, x):
super(Queue, self).append(x)
if x is None:
return
self.append = super(Queue, self).append
self._start()
[docs] def full(self):
"""Are there already too many uploads on the queue."""
return len(self) >= 50
[docs]class ServiceBase(threading.Thread):
"""Base class for all service uploaders.
Uploaders use a separate thread to allow the main program thread to
continue even if a service is slow to respond. Items to upload are
passed to the thread via a thread safe queue. The thread is started
when the first item is put on the queue. To shut down the thread put
:py:obj:`None` on the queue, e.g. by calling :py:meth:`stop`.
There are two types of uploader derived from this class.
:py:class:`DataServiceBase` is used by uploaders that send defined
sets of data, typically as an HTML "post" or "get" operation.
:py:class:`FileService` is used to upload files, including free form
text such as a Twitter message.
All service classes must provide a :py:attr:`logger` object so that
logging messages carry the right module name, and define a
:py:attr:`service_name` string. They must also define a
:py:meth:`session` method.
"""
config = {}
"""Defines the user configuration of the uploader. Each item must be
of the form ``name: (default (str), required (bool), fixed_key (str
or None))``. ``name`` is the ``weather.ini`` value name, ``default``
is a default value, ``required`` defines whether a value must be
supplied at run time, and ``fixed_key`` defines if and to where in
:py:attr:`~DataServiceBase.fixed_data` the value should be copied.
"""
interval = timedelta(seconds=40)
"""Sets the minimum period between the timestamps of uploaded data.
For some services this can be less than the weather station's "live"
data period (48 seconds) whereas others may require 5 or 15 minutes
between readings.
"""
logger = None
"""A :py:class:`logging.Logger` object created with the module name.
This is typically done as follows::
logger = logging.getLogger(__name__)
"""
service_name = ''
"""A short name used to refer to the service in weather.ini. It
should be all lower case. The best name to use is the last part of
the module's file name, as follows::
service_name = os.path.splitext(os.path.basename(__file__))[0]
"""
def __init__(self, context, check_params=True):
super(ServiceBase, self).__init__()
self.context = context
self.queue = Queue(self.start)
# get user configuration
self.params = {}
check = []
for key, (default, required, fixed_key) in self.config.items():
self.params[key] = context.params.get(
self.service_name, key, default)
if required:
check.append(key)
if fixed_key and self.params[key]:
# copy fixed_data to avoid changing class definition
self.fixed_data = dict(self.fixed_data)
self.fixed_data[fixed_key] = self.params[key]
# check values
if check_params:
self.check_params(*check)
[docs] def check_params(self, *keys):
"""Ensure user has set required values in weather.ini.
Normally the :py:data:`~ServiceBase.config` names with
``required`` set are checked, but if your uploader has a
``register`` method you may need to check for other data.
:param str keys: the :py:data:`~ServiceBase.config` names to
verify.
"""
for key in keys:
if not self.params[key]:
raise RuntimeError('"{}" not set in weather.ini'.format(key))
[docs] def session(self):
"""Context manager factory function for a batch of one or more
uploads.
This makes it easy to ensure any resources such as an internet
connection are properly closed after a batch of uploads. Use the
:py:func:`contextlib.contextmanager` decorator when you
implement this method.
For a typical example, see the source code of the
:py:mod:`pywws.service.openweathermap` module. If your upload
can't benefit from a session object yield :py:obj:`None`, as in
:py:mod:`pywws.service.copy`.
"""
raise NotImplementedError()
[docs] def run(self):
""" """
self.logger.debug('thread started ' + self.name)
self.old_message = ''
if self.context.live_logging:
polling_interval = self.interval.total_seconds() / 20
polling_interval = min(max(polling_interval, 4.0), 40.0)
else:
polling_interval = 4.0
while not self.context.shutdown.is_set():
OK = True
if self.queue:
try:
OK = self.upload_batch()
except Exception as ex:
self.logger.exception(ex)
OK = False
if OK:
pause = polling_interval
elif self.context.live_logging:
# upload failed, wait before trying again
pause = 40.0
else:
# upload failed or nothing more to do
break
self.context.shutdown.wait(pause)
[docs] def stop(self):
if self.is_alive():
self.logger.debug('stopping thread ' + self.name)
self.queue.append(None)
[docs] def log(self, message):
if message == self.old_message:
self.logger.debug(message)
else:
self.logger.error(message)
self.old_message = message
[docs]class DataServiceBase(ServiceBase):
"""Base class for "data" services.
A "data" service uploader sends defined sets of data, typically as
an HTML "post" or "get" operation. Service classes should be based
on :py:class:`CatchupDataService` or :py:class:`LiveDataService`,
depending on whether the service allows uploading of past data, for
example to fill in gaps if the server (or pywws client) goes down
for a few hours or days.
Data service classes must provide a :py:attr:`template` string to
define how to convert pywws data before uploading. Required methods
are :py:meth:`~ServiceBase.session` and :py:meth:`upload_data`. If
the service has a separate authorisation or registration process
this can be done in a :py:meth:`~pywws.service.mastodon.register`
method. See :py:mod:`pywws.service.mastodon` for an example.
"""
template = ''
"""Defines the conversion of pywws data to key, value pairs required
by the service. The template string is passed to
:py:mod:`pywws.template`, then the result is passed to
:py:func:`~ast.literal_eval` to create a :py:obj:`dict`. This rather
complex process allows great flexibility, but you do have to be
careful with use of quotation marks. """
fixed_data = {}
"""Defines a set of ``key: value`` pairs that are the same for every
data upload. This might include the station's location or the
software name & version. Values set by the user should be included
in the weather.ini config defined in :py:data:`~ServiceBase.config`.
"""
def __init__(self, context, check_params=True):
super(DataServiceBase, self).__init__(context, check_params)
# check config
template = context.params.get(self.service_name, 'template')
if template == 'default':
context.params.unset(self.service_name, 'template')
elif template:
self.logger.critical(
'obsolete item "template" found in weather.ini '
'section [{}]'.format(self.service_name))
# create templater
if self.template:
self.templater = pywws.template.Template(context, use_locale=False)
self.template_file = None
# get time stamp of last uploaded data
self.last_update = context.status.get_datetime(
'last update', self.service_name)
if not self.last_update:
if self.catchup:
self.last_update = datetime.utcnow() - timedelta(
days=self.catchup)
else:
self.last_update = datetime.min
[docs] def upload_data(self, session, prepared_data={}):
"""Upload one data set to the service.
Every data service class must implement this method.
:param object session: the object created by
:py:meth:`~ServiceBase.session`. This is typically used to
communicate with the server and is automatically closed when
a batch of uploads has finished.
:param dict prepared_data: a set of key: value pairs to upload.
The keys and values must all be text strings.
"""
raise NotImplementedError()
[docs] def queue_data(self, timestamp, data):
if not self.valid_data(data):
return False
prepared_data = self.prepare_data(data)
prepared_data.update(self.fixed_data)
self.logger.debug('data: %s', str(prepared_data))
self.queue.append((timestamp, prepared_data))
return True
[docs] def prepare_data(self, data):
if not self.template_file:
self.template_file = StringIO(self.template)
data_str = self.templater.make_text(self.template_file, data)
self.template_file.seek(0)
return literal_eval('{' + data_str + '}')
[docs] def valid_data(self, data):
return True
[docs]class CatchupDataService(DataServiceBase):
catchup = 7
"""Sets the number of days of past data that can be uploaded when a
service is first used.
"""
[docs] def queue_data(self, timestamp, data):
if timestamp and timestamp < self.last_update + self.interval:
return False
OK = super(CatchupDataService, self).queue_data(timestamp, data)
if OK and timestamp:
self.last_update = timestamp
return OK
[docs] def do_catchup(self, do_all=False):
start = self.last_update + self.interval
if do_all:
for data in self.context.calib_data[start:]:
while self.queue.full():
self.context.shutdown.wait(4.0)
if self.context.shutdown.is_set():
return True
self.queue_data(data['idx'], data)
return True
for data in self.context.calib_data[start:]:
if self.queue.full():
return True
if self.queue_data(data['idx'], data):
return False
return True
[docs] def upload(self, live_data=None, test_mode=False, options=()):
if self.queue.full():
return
if test_mode:
start = self.context.calib_data.before(datetime.max)
else:
start = self.last_update + self.interval
for data in self.context.calib_data[start:]:
timestamp = data['idx']
if test_mode:
timestamp = None
if self.queue_data(timestamp, data):
return
if live_data:
self.queue_data(live_data['idx'], live_data)
[docs] def upload_batch(self):
OK = True
count = 0
with self.session() as session:
while self.queue and not self.context.shutdown.is_set():
# send upload without taking it off queue
upload = self.queue[0]
if upload is None:
OK = False
break
timestamp, prepared_data = upload
OK, message = self.upload_data(
session, prepared_data=prepared_data)
self.log(message)
if not OK:
break
count += 1
if timestamp:
self.context.status.set(
'last update', self.service_name, str(timestamp))
# finally remove upload from queue
self.queue.popleft()
if count > 1:
self.logger.warning('{:d} records sent'.format(count))
elif count:
self.logger.info('1 record sent')
return OK
[docs]class LiveDataService(DataServiceBase):
catchup = None
[docs] def do_catchup(self, do_all=False):
return True
[docs] def upload(self, live_data=None, test_mode=False, options=()):
if self.queue.full():
return
if live_data:
data = live_data
else:
idx = self.context.calib_data.before(datetime.max)
if not idx:
return
data = self.context.calib_data[idx]
timestamp = data['idx']
if test_mode:
timestamp = None
self.queue_data(timestamp, data)
[docs] def upload_batch(self):
# get most recent upload on queue
upload = self.queue.popleft()
while self.queue and self.queue[0] is not None:
upload = self.queue.popleft()
if upload is None:
return False
timestamp, prepared_data = upload
# check time since last upload
if timestamp and timestamp < self.last_update + self.interval:
return True
with self.session() as session:
OK, message = self.upload_data(session, prepared_data=prepared_data)
self.log(message)
if OK:
self.logger.info('1 record sent')
if timestamp:
self.last_update = timestamp
self.context.status.set(
'last update', self.service_name, str(timestamp))
return OK
[docs]class FileService(ServiceBase):
"""Base class for "file" services.
"""
[docs] def do_catchup(self, do_all=False):
self.upload(options=literal_eval(
self.context.status.get('pending', self.service_name, '[]')))
return True
[docs] def upload(self, live_data=None, options=()):
for item in options:
if self.queue.full() or (item in self.queue):
continue
self.queue.append(item)
[docs] def upload_batch(self):
pending = literal_eval(
self.context.status.get('pending', self.service_name, '[]'))
OK = True
count = 0
with self.session() as session:
while self.queue and not self.context.shutdown.is_set():
upload = self.queue[0]
if upload is None:
OK = False
break
if os.path.isabs(upload):
path = upload
else:
path = os.path.join(self.context.output_dir, upload)
if not os.path.isfile(path):
if upload in pending:
pending.remove(upload)
self.queue.popleft()
continue
self.logger.debug('file: %s', path)
OK, message = self.upload_file(session, path)
self.log(message)
if OK:
if upload in pending:
pending.remove(upload)
count += 1
else:
if upload not in pending:
pending.append(upload)
break
self.queue.popleft()
self.context.status.set('pending', self.service_name, repr(pending))
if count > 1:
self.logger.info('{:d} uploads'.format(count))
elif count:
self.logger.info('1 upload')
return OK
[docs]def main(class_, argv=None):
import argparse
import inspect
if argv is None:
argv = sys.argv
docstring = inspect.getdoc(sys.modules[class_.__module__])
if sys.version_info[0] < 3:
docstring = docstring.decode('utf-8')
docstring = docstring.split('\n\n')
parser = argparse.ArgumentParser(
description=docstring[0], epilog=docstring[1])
if hasattr(class_, 'register'):
parser.add_argument('-r', '--register', action='store_true',
help='register (or update) with service')
if issubclass(class_, CatchupDataService):
parser.add_argument('-c', '--catchup', action='store_true',
help='upload all data since last upload')
parser.add_argument('-v', '--verbose', action='count',
help='increase amount of reassuring messages')
parser.add_argument('data_dir', help='root directory of the weather data')
if issubclass(class_, FileService):
parser.add_argument('file', nargs='*', help='file to be uploaded')
args = parser.parse_args(argv[1:])
pywws.logger.setup_handler(args.verbose or 0)
with pywws.storage.pywws_context(args.data_dir) as context:
if 'register' in args and args.register:
uploader = class_(context, check_params=False)
uploader.register()
context.flush()
return 0
uploader = class_(context)
if issubclass(class_, FileService):
uploader.upload(options=map(os.path.abspath, args.file))
elif issubclass(class_, CatchupDataService) and args.catchup:
uploader.do_catchup(do_all=True)
else:
uploader.upload(test_mode=True)
uploader.stop()
return 0