# pywws - Python software for USB Wireless Weather Stations
# http://github.com/jim-easterbrook/pywws
# Copyright (C) 2008-21 pywws contributors
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Routines to perform common tasks such as plotting gaphs or uploading files."""
from __future__ import absolute_import
from ast import literal_eval
from datetime import datetime, timedelta
import importlib
import logging
import os
import sys
import time
from pywws.calib import Calib
import pywws.plot
import pywws.template
from pywws.timezone import time_zone
import pywws.windrose
logger = logging.getLogger(__name__)
[docs]class RegularTasks(object):
def __init__(self, context):
self.context = context
self.params = context.params
self.status = context.status
self.raw_data = context.raw_data
self.calib_data = context.calib_data
self.hourly_data = context.hourly_data
self.daily_data = context.daily_data
self.monthly_data = context.monthly_data
self.flush = literal_eval(
self.params.get('config', 'frequent writes', 'False'))
# get directories
self.template_dir = self.params.get(
'paths', 'templates', os.path.expanduser('~/weather/templates/'))
self.graph_template_dir = self.params.get(
'paths', 'graph_templates',
os.path.expanduser('~/weather/graph_templates/'))
self.module_dir = self.params.get(
'paths', 'modules', os.path.expanduser('~/weather/modules/'))
# create calibration object
self.calibrator = Calib(self.params, self.raw_data)
# create templater object
self.templater = pywws.template.Template(context)
# create plotter objects
self.plotter = pywws.plot.GraphPlotter(context, self.context.work_dir)
self.roseplotter = pywws.windrose.RosePlotter(
context, self.context.work_dir)
# get daytime end hour
self.day_end_hour, self.use_dst = pywws.process.get_day_end_hour(
self.params)
# parse "cron" sections
self.cron = {}
for section in self.params._config.sections():
if section.split()[0] != 'cron':
continue
import croniter
last_update = self.status.get_datetime('last update', section)
last_update = last_update or datetime.utcnow()
self.cron[section] = croniter.croniter(
self.params.get(section, 'format', ''),
start_time=time_zone.utc_to_local(last_update))
self.cron[section].get_next()
# create service uploader objects
self.services = {}
for section in list(self.cron.keys()) + [
'live', 'logged', 'hourly', '12 hourly', 'daily']:
for name, options in self._parse_templates(section, 'services'):
if name in self.services:
continue
if os.path.exists(os.path.join(self.module_dir, name + '.py')):
sys.path.insert(0, self.module_dir)
mod = importlib.import_module(name)
del sys.path[0]
else:
mod = importlib.import_module('pywws.service.' + name)
self.services[name] = mod.ToService(context)
# check for obsolete entries
if self.params.get(section, 'twitter'):
logger.error(
'Obsolete twitter entry in weather.ini [%s]', section)
if self.params.get(section, 'yowindow'):
logger.error(
'Obsolete yowindow entry in weather.ini [%s]', section)
[docs] def has_live_tasks(self):
if self.cron:
return True
for name in literal_eval(self.params.get('live', 'services', '[]')):
return True
for template in literal_eval(self.params.get('live', 'plot', '[]')):
return True
for template in literal_eval(self.params.get('live', 'text', '[]')):
return True
return False
def _parse_templates(self, section, option):
for template in literal_eval(self.params.get(section, option, '[]')):
if isinstance(template, (list, tuple)):
yield template[0], template[1:]
else:
yield template, ()
def _do_common(self, now, sections, live_data=None):
logger.info('doing task sections {!r}'.format(sections))
# make lists of tasks from all sections, avoiding repeats
service_tasks = []
text_tasks = []
plot_tasks = []
for section in sections:
for task in self._parse_templates(section, 'services'):
if task not in service_tasks:
service_tasks.append(task)
for task in self._parse_templates(section, 'text'):
if task not in text_tasks:
text_tasks.append(task)
for task in self._parse_templates(section, 'plot'):
if task not in plot_tasks:
plot_tasks.append(task)
# do plot templates
for template, flags in plot_tasks:
self.do_plot(template)
# do text templates
for template, flags in text_tasks:
self.do_template(template, data=live_data)
# do service tasks
for name, options in service_tasks:
self.services[name].upload(live_data=live_data, options=options)
# allow all services to sent some catchup records
catchup = list(self.services.keys())
stop = time.time() + 20.0
while catchup and time.time() < stop:
for name in list(catchup):
if self.services[name].do_catchup():
catchup.remove(name)
# update status
for section in sections:
self.status.set('last update', section, now.isoformat(' '))
# save any unsaved data
if self.flush or 'hourly' in sections:
self.context.flush()
def _cron_due(self, now):
if not self.cron:
return []
# make list of due sections
sections = []
for section in self.cron:
if time_zone.local_to_utc(
self.cron[section].get_current(datetime)) > now:
continue
sections.append(section)
while time_zone.local_to_utc(
self.cron[section].get_next(datetime)) <= now:
pass
return sections
def _periodic_due(self, now):
# make list of due sections
sections = []
# hourly
threshold = time_zone.hour_start(now)
last_update = self.status.get_datetime('last update', 'hourly')
if not last_update or last_update < threshold:
sections.append('hourly')
# daily
threshold = time_zone.day_start(
now, self.day_end_hour, use_dst=self.use_dst)
last_update = self.status.get_datetime('last update', 'daily')
if not last_update or last_update < threshold:
sections.append('daily')
# 12 hourly
threshold = max(threshold, time_zone.day_start(
now, (self.day_end_hour + 12) % 24, use_dst=self.use_dst))
last_update = self.status.get_datetime('last update', '12 hourly')
if not last_update or last_update < threshold:
sections.append('12 hourly')
return sections
[docs] def do_live(self, data):
calib_data = self.calibrator.calib(data)
now = calib_data['idx']
sections = ['live'] + self._cron_due(now) + self._periodic_due(now)
self._do_common(now, sections, live_data=calib_data)
[docs] def do_tasks(self):
now = self.calib_data.before(datetime.max)
if not now:
raise RuntimeError('No processed data available')
if not self.context.live_logging:
# do periodic tasks if they would be due by next logging time
now += timedelta(minutes=self.calib_data[now]['delay'])
sections = ['logged'] + self._cron_due(now) + self._periodic_due(now)
self._do_common(now, sections)
if not self.context.live_logging:
# cleanly shut down upload threads
for name in self.services:
self.services[name].stop()
[docs] def do_plot(self, template):
logger.info("Graphing %s", template)
input_file = os.path.join(self.graph_template_dir, template)
output_file = os.path.join(
self.context.output_dir, os.path.splitext(template)[0])
input_xml = pywws.plot.GraphFileReader(input_file)
if (input_xml.get_children(self.plotter.plot_name) and
self.plotter.do_plot(input_xml, output_file) == 0):
return output_file
if (input_xml.get_children(self.roseplotter.plot_name) and
self.roseplotter.do_plot(input_xml, output_file) == 0):
return output_file
logger.warning('nothing to graph in %s', input_file)
return None
[docs] def do_template(self, template, data=None):
logger.info("Templating %s", template)
input_file = os.path.join(self.template_dir, template)
output_file = os.path.join(self.context.output_dir, template)
self.templater.make_file(input_file, output_file, live_data=data)
return output_file