Source code for pywws.filedata

# pywws - Python software for USB Wireless Weather Stations
# http://github.com/jim-easterbrook/pywws
# Copyright (C) 2008-19  pywws contributors

# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""Store weather data in easy to access files

Introduction
------------

This module is at the core of pywws file based storage.
It stores data on disc, but without the overhead of a full scale
database system. I have designed it to run on a small memory machine
such as a Raspberry Pi or even a router. To minimise memory usage it
only loads one day's worth of raw data at a time into memory.

From a "user" point of view, the data is accessed as a cross between a
list and a dictionary. Each data record is indexed by a
:py:class:`datetime.datetime` object (dictionary behaviour), but
records are stored in order and can be accessed as slices (list
behaviour).

For example, to access the hourly data for Christmas day 2009, one
might do the following::

  from datetime import datetime
  import pywws.filedata
  hourly = pywws.filedata.HourlyStore('weather_data')
  for data in hourly[datetime(2009, 12, 25):datetime(2009, 12, 26)]:
      print(data['idx'], data['temp_out'])

Some more examples of data access::

  # get value nearest 9:30 on Christmas day 2008
  data[data.nearest(datetime(2008, 12, 25, 9, 30))]
  # get entire array, equivalent to data[:]
  data[datetime.min:datetime.max]
  # get last 12 hours worth of data
  data[datetime.utcnow() - timedelta(hours=12):]

Note that the :py:class:`datetime.datetime` index is in UTC. You may
need to apply an offset to convert to local time.

The module provides five classes to store different data.
:py:class:`RawStore` takes "raw" data from the weather station;
:py:class:`CalibStore`, :py:class:`HourlyStore`, :py:class:`DailyStore`
and :py:class:`MonthlyStore` store processed data (see
:py:mod:`pywws.process`). All are derived from the same ``CoreStore``
class, they only differ in the keys and types of data stored in each
record.

Detailed API
------------

"""

import csv
from datetime import date, datetime, timedelta, MAXYEAR
import logging
import os
import sys
import time

from pywws.constants import DAY
from pywws.weatherstation import WSDateTime, WSFloat, WSInt, WSStatus

logger = logging.getLogger(__name__)


class _Cache(object):
    def __init__(self):
        self.data = []
        self.path = ''
        self.lo = date.max
        self.hi = date.min
        self.dirty = False

    def copy(self, other):
        self.data = other.data
        self.path = other.path
        self.lo = other.lo
        self.hi = other.hi
        self.dirty = False

    def get_ptr(self, idx):
        """Return index at which to insert a record with timestamp idx
        and boolean indicating if there is already data with that
        timestamp.

        """
        hi = len(self.data) - 1
        if hi < 0 or self.data[0]['idx'] > idx:
            return 0, False
        if self.data[0]['idx'] == idx:
            return 0, True
        if self.data[hi]['idx'] < idx:
            return hi + 1, False
        lo = 0
        while hi - lo > 1:
            mid = (lo + hi) // 2
            if self.data[mid]['idx'] < idx:
                lo = mid
            elif self.data[mid]['idx'] == idx:
                return mid, True
            else:
                hi = mid
        return hi, self.data[hi]['idx'] == idx


[docs]class CoreStore(object): def __init__(self, root_dir): self._root_dir = os.path.abspath(os.path.join(root_dir, self.dir_name)) if not os.path.isdir(self._root_dir): os.mkdir(self._root_dir) # initialise caches self._wr_cache = _Cache() self._rd_cache = _Cache() # get conservative first and last days for which data (might) exist self._lo_limit = date.max - timedelta(days=500) self._hi_limit = date.min + timedelta(days=500) for root, dirs, files in os.walk(self._root_dir): files.sort() for file in files: if file[0] == '.': continue path, self._lo_limit, hi = self._get_cache_path( datetime.strptime(file, "%Y-%m-%d.txt").date()) break else: dirs.sort() continue break for root, dirs, files in os.walk(self._root_dir): files.sort() files.reverse() for file in files: if file[0] == '.': continue path, lo, self._hi_limit = self._get_cache_path( datetime.strptime(file, "%Y-%m-%d.txt").date()) break else: dirs.sort() dirs.reverse() continue break self._lo_limit_dt = datetime( self._lo_limit.year, self._lo_limit.month, self._lo_limit.day) self._hi_limit_dt = datetime( self._hi_limit.year, self._hi_limit.month, self._hi_limit.day) def _slice(self, i): if i.step is not None: raise TypeError("slice step not permitted") a = i.start if a is None: a = self._lo_limit_dt elif not isinstance(a, datetime): raise TypeError("slice indices must be %s or None" % (datetime)) elif a < self._lo_limit_dt: a = self._lo_limit_dt b = i.stop if b is None: b = self._hi_limit_dt elif not isinstance(b, datetime): raise TypeError("slice indices must be %s or None" % (datetime)) elif b > self._hi_limit_dt: b = self._hi_limit_dt return a, b def _get_slice(self, i): a, b = self._slice(i) if a > b: return # use separate cache as something might change self._rd_cache # during yield cache = _Cache() # go to start of slice start, exact = self._get_cache_ptr(cache, a) # iterate over complete caches while cache.hi <= b.date(): for data in cache.data[start:]: yield data if cache.hi >= self._hi_limit: return self._load(cache, cache.hi) start = 0 # iterate over part of cache for data in cache.data[start:]: if data['idx'] >= b: return yield data def __getitem__(self, i): """Return the data item or items with index i. i must be a datetime object or a slice. If i is a single datetime then a value with that index must exist.""" if isinstance(i, slice): return self._get_slice(i) if not isinstance(i, datetime): raise TypeError("list indices must be %s" % (datetime)) cache = self._rd_cache ptr, exact = self._get_cache_ptr(cache, i) if not exact: raise KeyError(i) return cache.data[ptr] def __setitem__(self, i, x): """Store a value x with index i. i must be a datetime object. If there is already a value with index i, it is overwritten. """ if not isinstance(i, datetime): raise TypeError("index '%s' is not %s" % (i, datetime)) x['idx'] = i cache = self._wr_cache ptr, exact = self._get_cache_ptr(cache, i) if exact: cache.data[ptr] = x else: cache.data.insert(ptr, x) cache.dirty = True def _del_slice(self, i): a, b = self._slice(i) if a > b: return # go to start of slice cache = self._wr_cache start, exact = self._get_cache_ptr(cache, a) # delete to end of cache while cache.hi <= b.date(): del cache.data[start:] cache.dirty = True if cache.hi >= self._hi_limit: return self._load(cache, cache.hi) start = 0 # delete part of cache stop, exact = cache.get_ptr(b) del cache.data[start:stop] cache.dirty = True def __delitem__(self, i): """Delete the data item or items with index i. i must be a datetime object or a slice. If i is a single datetime then a value with that index must exist.""" if isinstance(i, slice): return self._del_slice(i) if not isinstance(i, datetime): raise TypeError("list indices must be %s" % (datetime)) cache = self._wr_cache ptr, exact = self._get_cache_ptr(cache, i) if not exact: raise KeyError(i) del cache.data[ptr] cache.dirty = True
[docs] def before(self, idx): """Return datetime of newest existing data record whose datetime is < idx. Might not even be in the same year! If no such record exists, return None.""" if not isinstance(idx, datetime): raise TypeError("'%s' is not %s" % (idx, datetime)) day = min(idx.date(), self._hi_limit - DAY) cache = self._rd_cache while day >= self._lo_limit: if day < cache.lo or day >= cache.hi: self._load(cache, day) ptr, exact = cache.get_ptr(idx) if ptr > 0: return cache.data[ptr - 1]['idx'] day = cache.lo - DAY return None
[docs] def after(self, idx): """Return datetime of oldest existing data record whose datetime is >= idx. Might not even be in the same year! If no such record exists, return None.""" if not isinstance(idx, datetime): raise TypeError("'%s' is not %s" % (idx, datetime)) day = max(idx.date(), self._lo_limit) cache = self._rd_cache while day < self._hi_limit: if day < cache.lo or day >= cache.hi: self._load(cache, day) ptr, exact = cache.get_ptr(idx) if ptr < len(cache.data): return cache.data[ptr]['idx'] day = cache.hi return None
[docs] def nearest(self, idx): """Return datetime of record whose datetime is nearest idx.""" hi = self.after(idx) if hi == idx: return hi lo = self.before(idx) if hi is None: return lo if lo is None: return hi if (hi - idx) < (idx - lo): return hi return lo
def _get_cache_ptr(self, cache, i): day = i.date() if day < cache.lo or day >= cache.hi: self._load(cache, day) if cache.lo < self._lo_limit: self._lo_limit = cache.lo self._lo_limit_dt = datetime( cache.lo.year, cache.lo.month, cache.lo.day) if cache.hi > self._hi_limit: self._hi_limit = cache.hi self._hi_limit_dt = datetime( cache.hi.year, cache.hi.month, cache.hi.day) return cache.get_ptr(i) def _load(self, cache, target_date): self._flush(cache) new_path, new_lo, new_hi = self._get_cache_path(target_date) if new_path == self._wr_cache.path: cache.copy(self._wr_cache) return if new_path == self._rd_cache.path: cache.copy(self._rd_cache) return cache.data = [] cache.path, cache.lo, cache.hi = new_path, new_lo, new_hi if not os.path.exists(cache.path): return if sys.version_info[0] >= 3: kwds = {'mode': 'r', 'newline': ''} else: kwds = {'mode': 'rb'} row_lengths = (len(self.key_list), len(self.key_list) - self.solar_items) with open(cache.path, **kwds) as csvfile: reader = csv.reader(csvfile, quoting=csv.QUOTE_NONE) for row in reader: if len(row) not in row_lengths: logger.error('Invalid %s data at %s', self.dir_name, row[0]) continue result = {} for key, value in zip(self.key_list, row): if value == '': result[key] = None else: result[key] = self.conv[key](value) cache.data.append(result)
[docs] def flush(self): self._flush(self._wr_cache) self._flush(self._rd_cache)
def _flush(self, cache): if not cache.dirty: return cache.dirty = False if len(cache.data) == 0: if os.path.exists(cache.path): # existing data has been wiped, so delete file os.unlink(cache.path) return dir = os.path.dirname(cache.path) if not os.path.isdir(dir): os.makedirs(dir) if sys.version_info[0] >= 3: kwds = {'mode': 'w', 'newline': ''} else: kwds = {'mode': 'wb'} conv = { datetime : str, float : lambda x: '{:.12g}'.format(x), int : str, type(None): lambda x: '', WSDateTime: WSDateTime.to_csv, WSFloat : str, WSInt : str, WSStatus : WSStatus.to_csv, } with open(cache.path, **kwds) as csvfile: for data in cache.data: row = [] for key in self.key_list[:len(data)]: value = data[key] row.append(conv[type(value)](value)) csvfile.write(','.join(row) + '\n') def _get_cache_path(self, target_date): # default implementation - one file per day path = os.path.join(self._root_dir, target_date.strftime("%Y"), target_date.strftime("%Y-%m"), target_date.strftime("%Y-%m-%d.txt")) lo = min(target_date, date.max - DAY) hi = lo + DAY return path, lo, hi def __iter__(self): """Return an iterator which yields all items in the data store sequntially. Equivalent to: for item in dataset[:]: yield item""" for item in self[:]: yield item
[docs] def update(self, E): """D.update(E) -> None. Equivelent to: for k in E: D[ k['idx'] ] = k" Update D from list-like iterable E containing dicts. Pre-existing items being overwritten. Dicts are assumed to contain all appropriate keys and values.""" for k in E: self[ k['idx'] ] = k
[docs] def clear(self): """Clears all data from the data store permanently""" for root, dirs, files in os.walk(self._root_dir, topdown=False): for file in files: os.unlink(os.path.join(root, file)) os.rmdir(root) # Get the root dir back and re-initialise to start again root_dir = os.path.abspath( os.path.join(self._root_dir, os.pardir)) self.__init__(root_dir)
[docs]class RawStore(CoreStore): """Stores raw weather station data.""" dir_name = 'raw' key_list = [ 'idx', 'delay', 'hum_in', 'temp_in', 'hum_out', 'temp_out', 'abs_pressure', 'wind_ave', 'wind_gust', 'wind_dir', 'rain', 'status', 'illuminance', 'uv', ] solar_items = 2 conv = { 'idx' : WSDateTime.from_csv, 'delay' : int, 'hum_in' : int, 'temp_in' : float, 'hum_out' : int, 'temp_out' : float, 'abs_pressure' : float, 'wind_ave' : float, 'wind_gust' : float, 'wind_dir' : int, 'rain' : float, 'status' : WSStatus.from_csv, 'illuminance' : float, 'uv' : int, }
[docs]class CalibStore(CoreStore): """Stores 'calibrated' weather station data.""" dir_name = 'calib' key_list = [ 'idx', 'delay', 'hum_in', 'temp_in', 'hum_out', 'temp_out', 'abs_pressure', 'rel_pressure', 'wind_ave', 'wind_gust', 'wind_dir', 'rain', 'status', 'illuminance', 'uv', ] solar_items = 2 conv = { 'idx' : WSDateTime.from_csv, 'delay' : int, 'hum_in' : int, 'temp_in' : float, 'hum_out' : int, 'temp_out' : float, 'abs_pressure' : float, 'rel_pressure' : float, 'wind_ave' : float, 'wind_gust' : float, 'wind_dir' : float, 'rain' : float, 'status' : WSStatus.from_csv, 'illuminance' : float, 'uv' : int, }
[docs]class HourlyStore(CoreStore): """Stores hourly summary weather station data.""" dir_name = 'hourly' key_list = [ 'idx', 'hum_in', 'temp_in', 'hum_out', 'temp_out', 'abs_pressure', 'rel_pressure', 'pressure_trend', 'wind_ave', 'wind_gust', 'wind_dir', 'rain', 'illuminance', 'uv', ] solar_items = 2 conv = { 'idx' : WSDateTime.from_csv, 'hum_in' : int, 'temp_in' : float, 'hum_out' : int, 'temp_out' : float, 'abs_pressure' : float, 'rel_pressure' : float, 'pressure_trend' : float, 'wind_ave' : float, 'wind_gust' : float, 'wind_dir' : float, 'rain' : float, 'illuminance' : float, 'uv' : int, }
[docs]class DailyStore(CoreStore): """Stores daily summary weather station data.""" dir_name = 'daily' key_list = [ 'idx', 'start', 'hum_out_ave', 'hum_out_min', 'hum_out_min_t', 'hum_out_max', 'hum_out_max_t', 'temp_out_ave', 'temp_out_min', 'temp_out_min_t', 'temp_out_max', 'temp_out_max_t', 'hum_in_ave', 'hum_in_min', 'hum_in_min_t', 'hum_in_max', 'hum_in_max_t', 'temp_in_ave', 'temp_in_min', 'temp_in_min_t', 'temp_in_max', 'temp_in_max_t', 'abs_pressure_ave', 'abs_pressure_min', 'abs_pressure_min_t', 'abs_pressure_max', 'abs_pressure_max_t', 'rel_pressure_ave', 'rel_pressure_min', 'rel_pressure_min_t', 'rel_pressure_max', 'rel_pressure_max_t', 'wind_ave', 'wind_gust', 'wind_gust_t', 'wind_dir', 'rain', 'illuminance_ave', 'illuminance_max', 'illuminance_max_t', 'uv_ave', 'uv_max', 'uv_max_t', ] solar_items = 6 conv = { 'idx' : WSDateTime.from_csv, 'start' : WSDateTime.from_csv, 'hum_out_ave' : float, 'hum_out_min' : int, 'hum_out_min_t' : WSDateTime.from_csv, 'hum_out_max' : int, 'hum_out_max_t' : WSDateTime.from_csv, 'temp_out_ave' : float, 'temp_out_min' : float, 'temp_out_min_t' : WSDateTime.from_csv, 'temp_out_max' : float, 'temp_out_max_t' : WSDateTime.from_csv, 'hum_in_ave' : float, 'hum_in_min' : int, 'hum_in_min_t' : WSDateTime.from_csv, 'hum_in_max' : int, 'hum_in_max_t' : WSDateTime.from_csv, 'temp_in_ave' : float, 'temp_in_min' : float, 'temp_in_min_t' : WSDateTime.from_csv, 'temp_in_max' : float, 'temp_in_max_t' : WSDateTime.from_csv, 'abs_pressure_ave' : float, 'abs_pressure_min' : float, 'abs_pressure_min_t' : WSDateTime.from_csv, 'abs_pressure_max' : float, 'abs_pressure_max_t' : WSDateTime.from_csv, 'rel_pressure_ave' : float, 'rel_pressure_min' : float, 'rel_pressure_min_t' : WSDateTime.from_csv, 'rel_pressure_max' : float, 'rel_pressure_max_t' : WSDateTime.from_csv, 'wind_ave' : float, 'wind_gust' : float, 'wind_gust_t' : WSDateTime.from_csv, 'wind_dir' : float, 'rain' : float, 'illuminance_ave' : float, 'illuminance_max' : float, 'illuminance_max_t' : WSDateTime.from_csv, 'uv_ave' : float, 'uv_max' : int, 'uv_max_t' : WSDateTime.from_csv, } def _get_cache_path(self, target_date): # one file per month path = os.path.join(self._root_dir, target_date.strftime("%Y"), target_date.strftime("%Y-%m-01.txt")) lo = target_date.replace(day=1) if lo.month < 12: hi = lo.replace(month=lo.month+1) elif lo.year < MAXYEAR: hi = lo.replace(year=lo.year+1, month=1) else: hi = lo lo = hi.replace(month=hi.month-1) return path, lo, hi
[docs]class MonthlyStore(CoreStore): """Stores monthly summary weather station data.""" dir_name = 'monthly' key_list = [ 'idx', 'start', 'hum_out_ave', 'hum_out_min', 'hum_out_min_t', 'hum_out_max', 'hum_out_max_t', 'temp_out_ave', 'temp_out_min_lo', 'temp_out_min_lo_t', 'temp_out_min_hi', 'temp_out_min_hi_t', 'temp_out_min_ave', 'temp_out_max_lo', 'temp_out_max_lo_t', 'temp_out_max_hi', 'temp_out_max_hi_t', 'temp_out_max_ave', 'hum_in_ave', 'hum_in_min', 'hum_in_min_t', 'hum_in_max', 'hum_in_max_t', 'temp_in_ave', 'temp_in_min_lo', 'temp_in_min_lo_t', 'temp_in_min_hi', 'temp_in_min_hi_t', 'temp_in_min_ave', 'temp_in_max_lo', 'temp_in_max_lo_t', 'temp_in_max_hi', 'temp_in_max_hi_t', 'temp_in_max_ave', 'abs_pressure_ave', 'abs_pressure_min', 'abs_pressure_min_t', 'abs_pressure_max', 'abs_pressure_max_t', 'rel_pressure_ave', 'rel_pressure_min', 'rel_pressure_min_t', 'rel_pressure_max', 'rel_pressure_max_t', 'wind_ave', 'wind_gust', 'wind_gust_t', 'wind_dir', 'rain', 'rain_days', 'illuminance_ave', 'illuminance_max_lo', 'illuminance_max_lo_t', 'illuminance_max_hi', 'illuminance_max_hi_t', 'illuminance_max_ave', 'uv_ave', 'uv_max_lo', 'uv_max_lo_t', 'uv_max_hi', 'uv_max_hi_t', 'uv_max_ave', ] solar_items = 12 conv = { 'idx' : WSDateTime.from_csv, 'start' : WSDateTime.from_csv, 'hum_out_ave' : float, 'hum_out_min' : int, 'hum_out_min_t' : WSDateTime.from_csv, 'hum_out_max' : int, 'hum_out_max_t' : WSDateTime.from_csv, 'temp_out_ave' : float, 'temp_out_min_lo' : float, 'temp_out_min_lo_t' : WSDateTime.from_csv, 'temp_out_min_hi' : float, 'temp_out_min_hi_t' : WSDateTime.from_csv, 'temp_out_min_ave' : float, 'temp_out_max_lo' : float, 'temp_out_max_lo_t' : WSDateTime.from_csv, 'temp_out_max_hi' : float, 'temp_out_max_hi_t' : WSDateTime.from_csv, 'temp_out_max_ave' : float, 'hum_in_ave' : float, 'hum_in_min' : int, 'hum_in_min_t' : WSDateTime.from_csv, 'hum_in_max' : int, 'hum_in_max_t' : WSDateTime.from_csv, 'temp_in_ave' : float, 'temp_in_min_lo' : float, 'temp_in_min_lo_t' : WSDateTime.from_csv, 'temp_in_min_hi' : float, 'temp_in_min_hi_t' : WSDateTime.from_csv, 'temp_in_min_ave' : float, 'temp_in_max_lo' : float, 'temp_in_max_lo_t' : WSDateTime.from_csv, 'temp_in_max_hi' : float, 'temp_in_max_hi_t' : WSDateTime.from_csv, 'temp_in_max_ave' : float, 'abs_pressure_ave' : float, 'abs_pressure_min' : float, 'abs_pressure_min_t' : WSDateTime.from_csv, 'abs_pressure_max' : float, 'abs_pressure_max_t' : WSDateTime.from_csv, 'rel_pressure_ave' : float, 'rel_pressure_min' : float, 'rel_pressure_min_t' : WSDateTime.from_csv, 'rel_pressure_max' : float, 'rel_pressure_max_t' : WSDateTime.from_csv, 'wind_ave' : float, 'wind_gust' : float, 'wind_gust_t' : WSDateTime.from_csv, 'wind_dir' : float, 'rain' : float, 'rain_days' : int, 'illuminance_ave' : float, 'illuminance_max_lo' : float, 'illuminance_max_lo_t' : WSDateTime.from_csv, 'illuminance_max_hi' : float, 'illuminance_max_hi_t' : WSDateTime.from_csv, 'illuminance_max_ave' : float, 'uv_ave' : float, 'uv_max_lo' : int, 'uv_max_lo_t' : WSDateTime.from_csv, 'uv_max_hi' : int, 'uv_max_hi_t' : WSDateTime.from_csv, 'uv_max_ave' : float, } def _get_cache_path(self, target_date): # one file per year path = os.path.join(self._root_dir, target_date.strftime("%Y-01-01.txt")) lo = target_date.replace(month=1, day=1) if lo.year < MAXYEAR: hi = lo.replace(year=lo.year+1) else: hi = lo lo = hi.replace(year=hi.year-1) return path, lo, hi