Source code for pywws.weatherstation

# pywws - Python software for USB Wireless Weather Stations
# http://github.com/jim-easterbrook/pywws
# Copyright (C) 2008-20  pywws contributors

# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.

# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

"""Get data from WH1080/WH3080 compatible weather stations.

Derived from wwsr.c by Michael Pendec (michael.pendec@gmail.com),
wwsrdump.c by Svend Skafte (svend@skafte.net), modified by Dave Wells,
and other sources.

Introduction
------------

This is the module that actually talks to the weather station base
unit. I don't have much understanding of USB, so copied a lot from
Michael Pendec's C program wwsr.

The weather station memory has two parts: a "fixed block" of 256 bytes
and a circular buffer of 65280 bytes. As each weather reading takes 16
bytes the station can store 4080 readings, or 14 days of 5-minute
interval readings. (The 3080 type stations store 20 bytes per reading,
so store a maximum of 3264.) As data is read in 32-byte chunks, but
each weather reading is 16 or 20 bytes, a small cache is used to
reduce USB traffic. The caching behaviour can be over-ridden with the
``unbuffered`` parameter to ``get_data`` and ``get_raw_data``.

Decoding the data is controlled by the static dictionaries
``_reading_format``, ``lo_fix_format`` and ``fixed_format``. The keys
are names of data items and the values can be an ``(offset, class,
kwds)`` tuple or another dictionary. So, for example, the
_reading_format dictionary entry ``'rain' : (13, WSFloat, {'signed':
False, 'scale': 0.3})`` means that the rain value is an unsigned short
(two bytes), 13 bytes from the start of the block, and should be
multiplied by 0.3 to get a useful value.

The use of nested dictionaries in the ``fixed_format`` dictionary
allows useful subsets of data to be decoded. For example, to decode
the entire block ``get_fixed_block`` is called with no parameters::

  ws = pywws.weatherstation.WeatherStation()
  print(ws.get_fixed_block())

To get the stored minimum external temperature, ``get_fixed_block`` is
called with a sequence of keys::

  ws = pywws.weatherstation.WeatherStation()
  print(ws.get_fixed_block(['min', 'temp_out', 'val']))

Often there is no requirement to read and decode the entire fixed
block, as its first 64 bytes contain the most useful data: the
interval between stored readings, the buffer address where the current
reading is stored, and the current date & time. The
``get_lo_fix_block`` method provides easy access to these.

For more examples of using the pywws.weatherstation module, see the
pywws.testweatherstation module.

Detailed API
------------

"""

from __future__ import absolute_import

__docformat__ = "restructuredtext en"

from ast import literal_eval
from datetime import datetime
import logging
import sys
import time

logger = logging.getLogger(__name__)


[docs]class WSBits(dict):
[docs] @staticmethod def from_int(value, keys): # convert byte to list of 8 booleans mask = 1 values = [] for i in range(8): values.append(value & mask != 0) mask = mask << 1 # merge with keys to make a dict return WSBits(zip(keys, values))
[docs] @staticmethod def from_raw(raw, pos, keys=[]): return WSBits.from_int(raw[pos], keys)
# don't display unknown bits def __repr__(self): result = {} for key in self: if self[key] or not key.startswith('bit'): result[key] = self[key] return repr(result)
[docs]class WSStatus(WSBits): keys = ('bit0', 'bit1', 'bit2', 'bit3', 'bit4', 'bit5', 'lost_connection', 'rain_overflow')
[docs] @classmethod def from_raw(cls, raw, pos): return WSStatus(WSBits.from_int(raw[pos], cls.keys))
# convert to stringified int
[docs] def to_csv(self): mask = 1 value = 0 for key in self.keys: if self[key]: value += mask mask = mask << 1 return str(value)
# convert from stringified int
[docs] @classmethod def from_csv(cls, value): if not value: return None return WSStatus(WSBits.from_int(int(value), cls.keys))
[docs]class WSInt(int):
[docs] @staticmethod def from_1(raw, pos, signed=False): # decode one byte to an int value = raw[pos] if value == 0xFF: return None if signed and value >= 0x80: value = 0x80 - value return WSInt(value)
[docs] @staticmethod def wind_dir(raw, pos): # decode one byte to an int value = raw[pos] # if bit 7 is 1, value is invalid if value & 0x80: return None return WSInt(value)
[docs] @staticmethod def from_2(raw, pos, signed=False): # decode two bytes to an int value = raw[pos] + (raw[pos+1] << 8) if value == 0xFFFF: return None if signed and value >= 0x8000: value = 0x8000 - value return WSInt(value)
[docs] @staticmethod def from_3(raw, pos, signed=False): # decode three bytes to an int value = raw[pos] + (raw[pos+1] << 8) + (raw[pos+2] << 16) if value == 0xFFFFFF: return None if signed and value >= 0x800000: value = 0x800000 - value return WSInt(value)
def _nibble_value(raw, base_shift, nibble_pos=None, nibble_high=False): if nibble_pos is None: return 0 mask = 0x0F shift = base_shift if nibble_high: mask = 0xF0 shift = base_shift-4 return ((raw[nibble_pos] & mask) << shift)
[docs]class WSFloat(float):
[docs] @staticmethod def from_1(raw, pos, signed=False, scale=1.0, nibble_pos=None, nibble_high=False): # decode one byte to an int value = WSInt.from_1(raw, pos, signed=signed) if value is None: return None value += _nibble_value(raw, 8, nibble_pos=nibble_pos, nibble_high=nibble_high) # convert to float return WSFloat(float(value) * scale)
[docs] @staticmethod def from_2(raw, pos, signed=False, scale=1.0, nibble_pos=None, nibble_high=False): # decode two bytes to an int value = WSInt.from_2(raw, pos, signed=signed) if value is None: return None value += _nibble_value(raw, 16, nibble_pos=nibble_pos, nibble_high=nibble_high) # convert to float return WSFloat(float(value) * scale)
[docs] @staticmethod def from_3(raw, pos, signed=False, scale=1.0): # decode three bytes to an int value = WSInt.from_3(raw, pos, signed=signed) if value is None: return None # convert to float return WSFloat(float(value) * scale)
# don't display excessive precision def __str__(self): return '{:.12g}'.format(self) def __repr__(self): return '{:.12g}'.format(self)
def _bcd_decode(byte): hi = (byte & 0xF0) >> 4 lo = byte & 0x0F return (hi * 10) + lo
[docs]class WSTime(str):
[docs] @staticmethod def from_raw(raw, pos): hour = _bcd_decode(raw[pos]) minute = _bcd_decode(raw[pos+1]) try: return WSTime('{:02d}:{:02d}'.format(hour, minute)) except ValueError: return None
[docs]class WSDateTime(datetime): # only save string representation in 'fixed block' in status.ini def __repr__(self): return repr(self.strftime('%Y-%m-%d %H:%M:%S'))
[docs] def to_csv(self): return self.strftime('%Y-%m-%d %H:%M:%S')
[docs] @staticmethod def from_csv(date_string): return WSDateTime(*map(int, (date_string[0:4], date_string[5:7], date_string[8:10], date_string[11:13], date_string[14:16], date_string[17:19])))
[docs] @staticmethod def from_raw(raw, pos): year = _bcd_decode(raw[pos]) month = _bcd_decode(raw[pos+1]) day = _bcd_decode(raw[pos+2]) hour = _bcd_decode(raw[pos+3]) minute = _bcd_decode(raw[pos+4]) try: return WSDateTime(year + 2000, month, day, hour, minute) except ValueError: return None
def _decode(raw, format_): if not raw: return None if isinstance(format_, dict): result = {} for key, value in format_.items(): result[key] = _decode(raw, value) else: pos, factory, kwds = format_ result = factory(raw, pos, **kwds) return result
[docs]class CUSBDrive(object): """Low level interface to weather station via USB. Loosely modeled on a C++ class obtained from http://site.ambientweatherstore.com/easyweather/ws_1080_2080_protocol.zip. I don't know the provenance of this, but it looks as if it may have come from the manufacturer. """ EndMark = 0x20 ReadCommand = 0xA1 WriteCommand = 0xA0 WriteCommandWord = 0xA2 def __init__(self): for module_name in ('device_libusb1', 'device_pyusb1', 'device_pyusb', 'device_ctypes_hidapi', 'device_cython_hidapi'): logger.debug('trying USB module %s', module_name) try: module = __import__(module_name, globals(), locals(), level=1) USBDevice = getattr(module, 'USBDevice') break except ImportError: pass else: raise ImportError('No USB library found') logger.info('using %s', module.__name__) self.dev = USBDevice(0x1941, 0x8021)
[docs] def read_block(self, address): """Read 32 bytes from the weather station. If the read fails for any reason, :obj:`None` is returned. :param address: address to read from. :type address: int :return: the data from the weather station. :rtype: list(int) """ buf = [ self.ReadCommand, address // 256, address % 256, self.EndMark, self.ReadCommand, address // 256, address % 256, self.EndMark, ] if not self.dev.write_data(buf): return None return self.dev.read_data(32)
[docs] def write_byte(self, address, data): """Write a single byte to the weather station. :param address: address to write to. :type address: int :param data: the value to write. :type data: int :return: success status. :rtype: bool """ buf = [ self.WriteCommandWord, address // 256, address % 256, self.EndMark, self.WriteCommandWord, data, 0, self.EndMark, ] if not self.dev.write_data(buf): return False buf = self.dev.read_data(8) if buf is None: return False for byte in buf: if byte != 0xA5: return False return True
[docs]class DriftingClock(object): def __init__(self, name, status, period, margin): self.name = name self.status = status self.period = period self.margin = margin if self.status: self.clock = literal_eval( self.status.get('clock', self.name, 'None')) self.drift = literal_eval( self.status.get('clock', self.name + ' drift', '0.0')) else: self.clock = None self.drift = 0.0 self._set_real_period() self.old_clock = self.clock def _set_real_period(self): self._real_period = self.period * (1.0 + (self.drift / (24.0 * 3600.0)))
[docs] def before(self, now): if not self.clock: return None error = (now - self.clock) % self._real_period return now - error
[docs] def avoid(self): if not self.clock: return 1000.0 now = time.time() phase = now - self.clock if phase > 24 * 3600: # clock was last measured a day ago, so reset it self.clock = None return 1000.0 return (self.margin - phase) % self._real_period
[docs] def set_clock(self, now): if self.clock: diff = (now - self.clock) % self._real_period if diff < 2.0 or diff > self._real_period - 2.0: return logger.error('unexpected %s clock change', self.name) self.clock = now logger.warning('setting %s clock %g', self.name, now % self.period) if self.status: self.status.set('clock', self.name, str(self.clock)) if self.old_clock: diff = now - self.old_clock if diff < 8 * 3600: # drift measurement needs more than 8 hours gap return drift = diff % self.period if drift > self.period / 2: drift -= self.period drift = (float(drift) * 24.0 * 3600.0 / float(diff)) self.drift += max(min(drift - self.drift, 3.0), -3.0) / 4.0 self._set_real_period() logger.warning( '%s clock drift %g %g', self.name, drift, self.drift) if self.status: self.status.set( 'clock', '%s drift' % self.name, str(self.drift)) self.old_clock = self.clock
[docs] def invalidate(self): self.clock = None
[docs]class WeatherStation(object): """Class that represents the weather station to user program.""" # minimum interval between polling for data change min_pause = 0.5 # margin of error for various decisions margin = (min_pause * 2.0) - 0.1 def __init__(self, context=None): """Connect to weather station and prepare to read data.""" # create basic IO object self.cusb = CUSBDrive() # init variables if context: self.status = context.status self.avoid = float( context.params.get('config', 'usb activity margin', '3.0')) self.avoid = max(self.avoid, 0.0) self.ws_type = context.params.get('config', 'ws type', 'Unknown') else: self.status = None self.avoid = 3.0 self.ws_type = '1080' self._fixed_block = None self._data_block = None self._data_pos = None self._current_ptr = None self._station_clock = DriftingClock( 'station', self.status, 60, self.avoid) self._sensor_clock = DriftingClock( 'sensor', self.status, 48, self.avoid) if self.ws_type == '3080': self._solar_clock = DriftingClock( 'solar', self.status, 60, self.avoid) else: self._solar_clock = None self.last_status = {} # check ws_type if self.ws_type not in ('1080', '3080'): if self.get_fixed_block(['lux_wm2_coeff']) == 0.0: guess = '1080' else: guess = '3080' raise ValueError(""" Unknown weather station type. Please edit weather.ini and set 'ws type' to '1080' or '3080', as appropriate. Your station is probably a '{:s}' type. """.format(guess))
[docs] def live_data(self, logged_only=False): # There are two things we want to synchronise to - the data is # updated every 48 seconds and the address is incremented # every 5 minutes (or 10, 15, ..., 30). Rather than getting # data every second or two, we sleep until one of the above is # due. (During initialisation we get data every half second # anyway.) read_period = self.get_fixed_block(['read_period']) logger.debug('read period %d', read_period) log_interval = float(read_period * 60) live_interval = 48.0 old_ptr = self.current_pos() old_data = self.get_data(old_ptr, unbuffered=True) now = time.time() next_live = self._sensor_clock.before(now + live_interval) if next_live: now = next_live - live_interval else: now -= live_interval last_log = now - (old_data['delay'] * 60.0) next_log = self._station_clock.before(last_log + log_interval) ptr_time = 0 data_time = 0 not_logging = False while True: # sleep until just before next reading is due now = time.time() advance = now + max(self.avoid, self.min_pause) + self.min_pause if next_live: pause = next_live - advance else: pause = self.min_pause if not_logging: pass elif next_log: pause = min(pause, next_log - advance) elif old_data['delay'] >= read_period - 1: pause = self.min_pause if (self._solar_clock and self._solar_clock.clock is None and old_data['illuminance'] is not None): pause = self.min_pause pause = max(pause, self.min_pause) logger.debug( 'delay %s, pause %g', str(old_data['delay']), pause) time.sleep(pause) # get new pointer last_ptr_time = ptr_time new_ptr = self.current_pos() ptr_time = time.time() # get new data last_data_time = data_time new_data = self.get_data(old_ptr, unbuffered=True) data_time = time.time() # when ptr changes, internal sensor data gets updated if new_ptr != old_ptr: for key in ('hum_in', 'temp_in', 'abs_pressure'): old_data[key] = new_data[key] # log any change of status if new_data['status'] != self.last_status: logger.warning('status %s', str(new_data['status'])) self.last_status = new_data['status'] if (new_data['status']['lost_connection'] and not old_data['status']['lost_connection']): # 'lost connection' decision can happen at any time old_data = new_data # has data changed? if any(new_data[key] != old_data[key] for key in ( 'hum_in', 'temp_in', 'hum_out', 'temp_out', 'abs_pressure', 'wind_ave', 'wind_gust', 'wind_dir', 'rain', 'status')): logger.debug('live_data new data') if data_time - last_data_time < self.margin: # data has just changed, so definitely at a 48s update time self._sensor_clock.set_clock(data_time) elif next_live and data_time < next_live - self.margin: logger.warning( 'live_data lost sync %g', data_time - next_live) logger.warning('old data %s', str(old_data)) logger.warning('new data %s', str(new_data)) self._sensor_clock.invalidate() next_live = self._sensor_clock.before(data_time + self.margin) if next_live: if not logged_only: result = dict(new_data) result['idx'] = datetime.utcfromtimestamp(int(next_live)) yield result, old_ptr, False next_live += live_interval if not_logging: # simulate logging if station is not logging if not next_log: next_log = next_live if next_live > next_log: result = dict(new_data) result['idx'] = datetime.utcfromtimestamp(int(next_log)) yield result, old_ptr, True next_log += log_interval elif next_live and data_time > next_live + 6.0: logger.info('live_data missed') next_live += live_interval # has solar data changed? elif self._solar_clock and ( new_data['illuminance'] != old_data['illuminance'] or new_data['uv'] != old_data['uv']): logger.debug('live_data new solar data') if data_time - last_data_time < self.margin: # data has just changed, so at a solar update time self._solar_clock.set_clock(data_time) old_data = new_data # has ptr changed? if new_ptr != old_ptr: logger.info('live_data new ptr: %06x', new_ptr) if not_logging: logger.error('station is logging data') not_logging = False last_log = ptr_time - self.margin if ptr_time - last_ptr_time < self.margin: # pointer has just changed, so definitely at a logging time self._station_clock.set_clock(ptr_time) elif next_log: if ptr_time < next_log - self.margin: logger.warning( 'live_data lost log sync %g', ptr_time - next_log) self._station_clock.invalidate() else: logger.info('missed ptr change time') if read_period > new_data['delay']: read_period = new_data['delay'] logger.warning('reset read period %d', read_period) log_interval = float(read_period * 60) result = dict(new_data) next_log = self._station_clock.before(ptr_time + self.margin) if next_log: result['idx'] = datetime.utcfromtimestamp(int(next_log)) next_log += log_interval else: # use best guess of logging time result['idx'] = datetime.utcfromtimestamp( int(ptr_time - (self.avoid / 2))) yield result, old_ptr, True old_ptr = new_ptr old_data['delay'] = 0 data_time = 0 elif ptr_time > last_log + log_interval + 180.0: # if station stops logging data, don't keep reading # USB until it locks up logger.error('station is not logging data') not_logging = True elif next_log and ptr_time > next_log + 6.0: logger.warning('live_data log extended') next_log += 60.0
[docs] def inc_ptr(self, ptr): """Get next circular buffer data pointer.""" result = ptr + self.reading_len[self.ws_type] if result >= 0x10000: result = self.data_start return result
[docs] def dec_ptr(self, ptr): """Get previous circular buffer data pointer.""" result = ptr - self.reading_len[self.ws_type] if result < self.data_start: result = 0x10000 - self.reading_len[self.ws_type] return result
[docs] def get_raw_data(self, ptr, unbuffered=False): """Get raw data from circular buffer. If unbuffered is false then a cached value that was obtained earlier may be returned.""" if unbuffered: self._data_pos = None # round down ptr to a 'block boundary' idx = ptr - (ptr % 0x20) ptr -= idx count = self.reading_len[self.ws_type] if self._data_pos == idx: # cache contains useful data result = self._data_block[ptr:ptr + count] if len(result) >= count: return result else: result = list() if ptr + count > 0x20: # need part of next block, which may be in cache if self._data_pos != idx + 0x20: self._data_pos = idx + 0x20 self._data_block = self._read_block(self._data_pos) result += self._data_block[0:ptr + count - 0x20] if len(result) >= count: return result # read current block self._data_pos = idx self._data_block = self._read_block(self._data_pos) result = self._data_block[ptr:ptr + count] + result return result
[docs] def get_data(self, ptr, unbuffered=False): """Get decoded data from circular buffer. If unbuffered is false then a cached value that was obtained earlier may be returned.""" result = _decode(self.get_raw_data(ptr, unbuffered), self._reading_format[self.ws_type]) return result
[docs] def current_pos(self): """Get circular buffer location where current data is being written.""" new_ptr = _decode( self._read_fixed_block(0x0020), self.lo_fix_format['current_pos']) if new_ptr == self._current_ptr: return self._current_ptr if self._current_ptr and new_ptr != self.inc_ptr(self._current_ptr): logger.error( 'unexpected ptr change %06x -> %06x', self._current_ptr, new_ptr) self._current_ptr = new_ptr return self._current_ptr
[docs] def get_raw_fixed_block(self, unbuffered=False): """Get the raw "fixed block" of settings and min/max data.""" if unbuffered or not self._fixed_block: self._fixed_block = self._read_fixed_block() return self._fixed_block
[docs] def get_fixed_block(self, keys=[], unbuffered=False): """Get the decoded "fixed block" of settings and min/max data. A subset of the entire block can be selected by keys.""" if unbuffered or not self._fixed_block: self._fixed_block = self._read_fixed_block() format = self.fixed_format # navigate down list of keys to get to wanted data for key in keys: format = format[key] return _decode(self._fixed_block, format)
def _wait_for_station(self): # avoid times when station is writing to memory while True: pause = min(self._station_clock.avoid(), self._sensor_clock.avoid()) if self._solar_clock: pause = min(pause, self._solar_clock.avoid()) if pause >= self.avoid * 2.0: return logger.debug('avoid %s', str(pause)) time.sleep(pause) def _read_block(self, ptr, retry=True): # Read block repeatedly until it's stable. This avoids getting corrupt # data when the block is read as the station is updating it. old_block = None while True: self._wait_for_station() new_block = self.cusb.read_block(ptr) if new_block: if (new_block == old_block) or not retry: break if old_block: logger.debug('_read_block changing %06x', ptr) old_block = new_block return new_block def _read_fixed_block(self, hi=0x0100): result = [] for mempos in range(0x0000, hi, 0x0020): result += self._read_block(mempos) return result def _write_byte(self, ptr, value): self._wait_for_station() if not self.cusb.write_byte(ptr, value): raise IOError('_write_byte failed')
[docs] def write_data(self, data): """Write a set of single bytes to the weather station. Data must be an array of (ptr, value) pairs.""" # send data for ptr, value in data: self._write_byte(ptr, value) # set 'data changed' self._write_byte(self.fixed_format['data_changed'][0], 0xAA) # wait for station to clear 'data changed' while True: ack = _decode( self._read_fixed_block(0x0020), self.fixed_format['data_changed']) if ack == 0: break logger.debug('write_data waiting for ack') time.sleep(6)
# Tables of "meanings" for raw weather station data. Each key # specifies an (offset, factory, kwds) tuple that is understood # by _decode. # depends on weather station type _reading_format = {} _reading_format['1080'] = { 'delay' : (0, WSInt.from_1, {'signed': False}), 'hum_in' : (1, WSInt.from_1, {'signed': False}), 'temp_in' : (2, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'hum_out' : (4, WSInt.from_1, {'signed': False}), 'temp_out' : (5, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'abs_pressure' : (7, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'wind_ave' : (9, WSFloat.from_1, {'scale': 0.1, 'nibble_pos':11, 'nibble_high':False}), 'wind_gust' : (10, WSFloat.from_1, {'scale': 0.1, 'nibble_pos':11, 'nibble_high':True}), 'wind_dir' : (12, WSInt.wind_dir, {}), 'rain' : (13, WSFloat.from_2, {'signed': False, 'scale': 0.3}), 'status' : (15, WSStatus.from_raw, {}), } _reading_format['3080'] = { 'illuminance' : (16, WSFloat.from_3, {'signed': False, 'scale': 0.1}), 'uv' : (19, WSInt.from_1, {'signed': False}), } _reading_format['3080'].update(_reading_format['1080']) lo_fix_format = { 'read_period' : (16, WSInt.from_1, {'signed': False}), 'settings_1' : (17, WSBits.from_raw, {'keys': ( 'temp_in_F', 'temp_out_F', 'rain_in', 'bit3', 'bit4', 'pressure_hPa', 'pressure_inHg', 'pressure_mmHg')}), 'settings_2' : (18, WSBits.from_raw, {'keys': ( 'wind_mps', 'wind_kmph', 'wind_knot', 'wind_mph', 'wind_bft', 'bit5', 'bit6', 'bit7')}), 'display_1' : (19, WSBits.from_raw, {'keys': ( 'pressure_rel', 'wind_gust', 'clock_12hr', 'date_mdy', 'time_scale_24', 'show_year', 'show_day_name', 'alarm_time')}), 'display_2' : (20, WSBits.from_raw, {'keys': ( 'temp_out_temp', 'temp_out_chill', 'temp_out_dew', 'rain_hour', 'rain_day', 'rain_week', 'rain_month', 'rain_total')}), 'alarm_1' : (21, WSBits.from_raw, {'keys': ( 'bit0', 'time', 'wind_dir', 'bit3', 'hum_in_lo', 'hum_in_hi', 'hum_out_lo', 'hum_out_hi')}), 'alarm_2' : (22, WSBits.from_raw, {'keys': ( 'wind_ave', 'wind_gust', 'rain_hour', 'rain_day', 'pressure_abs_lo', 'pressure_abs_hi', 'pressure_rel_lo', 'pressure_rel_hi')}), 'alarm_3' : (23, WSBits.from_raw, {'keys': ( 'temp_in_lo', 'temp_in_hi', 'temp_out_lo', 'temp_out_hi', 'wind_chill_lo', 'wind_chill_hi', 'dew_point_lo', 'dew_point_hi')}), 'timezone' : (24, WSInt.from_1, {'signed': True}), 'unknown_01' : (25, WSInt.from_1, {'signed': False}), 'data_changed' : (26, WSInt.from_1, {'signed': False}), 'data_count' : (27, WSInt.from_2, {'signed': False}), 'display_3' : (29, WSBits.from_raw, {'keys': ( 'illuminance_fc', 'alarm_illuminance_hi', 'alarm_uv_hi', 'bit3', 'bit4', 'illuminance_wm2', 'bit6', 'bit7')}), 'current_pos' : (30, WSInt.from_2, {'signed': False}), } fixed_format = { 'magic_0' : (0, WSInt.from_1, {'signed': False}), 'magic_1' : (1, WSInt.from_1, {'signed': False}), 'rain_factor_raw': (2, WSFloat.from_2, {'signed': False}), 'wind_factor_raw': (4, WSFloat.from_2, {'signed': False}), 'rel_pressure' : (32, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'abs_pressure' : (34, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'lux_wm2_coeff' : (36, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'date_time' : (43, WSDateTime.from_raw, {}), 'unknown_18' : (97, WSInt.from_1, {'signed': False}), 'alarm' : { 'hum_in' : {'hi' : (48, WSInt.from_1, {'signed': False}), 'lo' : (49, WSInt.from_1, {'signed': False})}, 'temp_in' : {'hi' : (50, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'lo' : (52, WSFloat.from_2, {'signed': True, 'scale': 0.1})}, 'hum_out' : {'hi' : (54, WSInt.from_1, {'signed': False}), 'lo' : (55, WSInt.from_1, {'signed': False})}, 'temp_out' : {'hi' : (56, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'lo' : (58, WSFloat.from_2, {'signed': True, 'scale': 0.1})}, 'windchill' : {'hi' : (60, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'lo' : (62, WSFloat.from_2, {'signed': True, 'scale': 0.1})}, 'dewpoint' : {'hi' : (64, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'lo' : (66, WSFloat.from_2, {'signed': True, 'scale': 0.1})}, 'abs_pressure' : {'hi' : (68, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'lo' : (70, WSFloat.from_2, {'signed': False, 'scale': 0.1})}, 'rel_pressure' : {'hi' : (72, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'lo' : (74, WSFloat.from_2, {'signed': False, 'scale': 0.1})}, 'wind_ave' : {'bft': (76, WSInt.from_1, {'signed': False}), 'ms' : (77, WSFloat.from_1, {'signed': False, 'scale': 0.1})}, 'wind_gust' : {'bft': (79, WSInt.from_1, {'signed': False}), 'ms' : (80, WSFloat.from_1, {'signed': False, 'scale': 0.1})}, 'wind_dir' : (82, WSInt.wind_dir, {}), 'rain' : {'hour' : (83, WSFloat.from_2, {'signed': False, 'scale': 0.3}), 'day' : (85, WSFloat.from_2, {'signed': False, 'scale': 0.3})}, 'time' : (87, WSTime.from_raw, {}), 'illuminance' : (89, WSFloat.from_3, {'signed': False, 'scale': 0.1}), 'uv' : (92, WSInt.from_1, {'signed': False}), }, 'max' : { 'uv' : {'val' : (93, WSInt.from_1, {'signed': False}), 'date': (6, WSDateTime.from_raw, {})}, 'illuminance' : {'val' : (94, WSFloat.from_3, {'signed': False, 'scale': 0.1}), 'date': (11, WSDateTime.from_raw, {})}, 'hum_in' : {'val' : (98, WSInt.from_1, {'signed': False}), 'date': (141, WSDateTime.from_raw, {})}, 'hum_out' : {'val' : (100, WSInt.from_1, {'signed': False}), 'date': (151, WSDateTime.from_raw, {})}, 'temp_in' : {'val' : (102, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'date': (161, WSDateTime.from_raw, {})}, 'temp_out' : {'val' : (106, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'date': (171, WSDateTime.from_raw, {})}, 'windchill' : {'val' : (110, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'date': (181, WSDateTime.from_raw, {})}, 'dewpoint' : {'val' : (114, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'date': (191, WSDateTime.from_raw, {})}, 'abs_pressure' : {'val' : (118, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'date': (201, WSDateTime.from_raw, {})}, 'rel_pressure' : {'val' : (122, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'date': (211, WSDateTime.from_raw, {})}, 'wind_ave' : {'val' : (126, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'date': (221, WSDateTime.from_raw, {})}, 'wind_gust' : {'val' : (128, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'date': (226, WSDateTime.from_raw, {})}, 'rain' : { 'hour' : {'val' : (130, WSFloat.from_2, {'signed': False, 'scale': 0.3}), 'date': (231, WSDateTime.from_raw, {})}, 'day' : {'val' : (132, WSFloat.from_2, {'signed': False, 'scale': 0.3}), 'date': (236, WSDateTime.from_raw, {})}, 'week' : {'val' : (134, WSFloat.from_2, {'signed': False, 'scale': 0.3}), 'date': (241, WSDateTime.from_raw, {})}, 'month' : {'val' : (136, WSFloat.from_2, {'signed': False, 'scale': 0.3, 'nibble_pos':140, 'nibble_high':True}), 'date': (246, WSDateTime.from_raw, {})}, 'total' : {'val' : (138, WSFloat.from_2, {'signed': False, 'scale': 0.3, 'nibble_pos':140, 'nibble_high':False}), 'date': (251, WSDateTime.from_raw, {})}, }, }, 'min' : { 'hum_in' : {'val' : (99, WSInt.from_1, {'signed': False}), 'date': (146, WSDateTime.from_raw, {})}, 'hum_out' : {'val' : (101, WSInt.from_1, {'signed': False}), 'date': (156, WSDateTime.from_raw, {})}, 'temp_in' : {'val' : (104, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'date': (166, WSDateTime.from_raw, {})}, 'temp_out' : {'val' : (108, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'date': (176, WSDateTime.from_raw, {})}, 'windchill' : {'val' : (112, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'date': (186, WSDateTime.from_raw, {})}, 'dewpoint' : {'val' : (116, WSFloat.from_2, {'signed': True, 'scale': 0.1}), 'date': (196, WSDateTime.from_raw, {})}, 'abs_pressure' : {'val' : (120, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'date': (206, WSDateTime.from_raw, {})}, 'rel_pressure' : {'val' : (124, WSFloat.from_2, {'signed': False, 'scale': 0.1}), 'date': (216, WSDateTime.from_raw, {})}, }, } fixed_format.update(lo_fix_format) # start of readings / end of fixed block data_start = 0x0100 # 256 # bytes per reading, depends on weather station type reading_len = { '1080' : 16, '3080' : 20, }