Source code for aisdb.track_gen

''' generation, segmentation, and filtering of vessel trajectories '''

import sqlite3
import types
import warnings
from datetime import timedelta
from functools import reduce

import numpy as np
from aisdb.aisdb import simplify_linestring_idx

from aisdb import Domain
from aisdb.gis import delta_knots
from aisdb.proc_util import _segment_rng, _segment_rng_all

staticcols = set([
    'mmsi', 'vessel_name', 'ship_type', 'ship_type_txt', 'dim_bow', 'maneuver',
    'dim_stern', 'dim_port', 'dim_star', 'imo', 'draught', 'heading', 'rot',
    'destination', 'eta_month', 'eta_day', 'eta_hour', 'eta_minute',
])


def _segment_longitude(track, tolerance=300):
    ''' segment track vectors where difference in longitude exceeds 300 degrees
    '''

    if len(track['time']) == 1:
        yield track
        return

    diff = np.nonzero(
        np.abs(track['lon'][1:] - track['lon'][:-1]) > tolerance)[0] + 1

    if diff.size == 0:
        assert 'time' in track.keys()
        yield track
        return

    segments_idx = reduce(np.append, ([0], diff, [track['time'].size]))
    for i in range(segments_idx.size - 1):
        tracksplit = dict(
            **{k: track[k]
               for k in track['static']},
            **{
                k: track[k][segments_idx[i]:segments_idx[i + 1]]
                for k in track['dynamic']
            },
            static=track['static'],
            dynamic=track['dynamic'],
        )
        assert 'time' in tracksplit.keys()
        yield tracksplit


def _yieldsegments(rows, staticcols, dynamiccols, decimate=0.0001):
    if decimate is True:
        decimate = 0.0001
    lon = np.array([r['longitude'] for r in rows], dtype=float)
    lat = np.array([r['latitude'] for r in rows], dtype=float)
    time = np.array([r['time'] for r in rows], dtype=np.uint32)
    
    if decimate is not False:
        idx = simplify_linestring_idx(lon, lat, precision=decimate)
    else:
        idx = np.array(range(len(lon)))
    trackdict = dict(
        **{col: rows[0][col]
           for col in staticcols},
        dynamic=dynamiccols,
        static=staticcols,
        time=time[idx],
        lon=lon[idx].astype(np.float32),
        lat=lat[idx].astype(np.float32),
        cog=np.array([r['cog'] for r in rows], dtype=np.uint32)[idx],
        sog=np.array([r['sog'] for r in rows], dtype=np.float32)[idx],
        utc_second=np.array([r['utc_second'] for r in rows], dtype=np.uint32)[idx],
    )

    assert 'time' in trackdict.keys()

    for segment in _segment_longitude(trackdict):
        for key in segment['dynamic']:
            assert len(segment[key]) == len(segment['time'])
        yield segment


[docs] class EmptyRowsException(Exception): pass
[docs] def TrackGen(rowgen: iter, decimate: False) -> dict: ''' generator converting sets of rows sorted by MMSI to a dictionary containing track column vectors. each row contains columns from database: mmsi time lon lat name ... rows must be sorted by first by mmsi, then time args: rowgen (aisdb.database.dbqry.DBQuery.gen_qry()) DBQuery rows generator. Yields rows returned by a database query decimate (bool) if True, linear curve decimation will be applied to reduce the number of unnecessary datapoints yields: dictionary containing track column vectors. static data (e.g. mmsi, name, geometry) will be stored as scalar values >>> import os >>> import numpy as np >>> from datetime import datetime >>> from aisdb import SQLiteDBConn, DBQuery, TrackGen, decode_msgs >>> from aisdb.database import sqlfcn_callbacks >>> # create example database file >>> dbpath = 'track_gen_test.db' >>> filepaths = ['aisdb/tests/testdata/test_data_20210701.csv', ... 'aisdb/tests/testdata/test_data_20211101.nm4'] >>> with SQLiteDBConn(dbpath) as dbconn: ... decode_msgs(filepaths, dbconn=dbconn, source='TESTING', verbose=False) ... q = DBQuery(callback=sqlfcn_callbacks.in_timerange_validmmsi, ... dbconn=dbconn, ... start=datetime(2021, 7, 1), ... end=datetime(2021, 7, 7)) ... rowgen = q.gen_qry() ... for track in TrackGen(rowgen, decimate=True): ... result = (track['mmsi'], track['lon'], track['lat'], track['time']) ... assert result == (204242000, np.array([-8.931666], dtype=np.float32), ... np.array([41.45], dtype=np.float32), np.array([1625176725], dtype=np.uint32)) ... break ''' ''' >>> os.remove(dbpath) ''' firstrow = True assert isinstance(rowgen, types.GeneratorType) for rows in rowgen: if (rows is None or len(rows) == 0): warnings.warn('No results for query!') return dict() # raise EmptyRowsException('rows cannot be empty') assert isinstance( rows[0], (sqlite3.Row, dict)), f'unknown row type: {type(rows[0])}' if firstrow: keys = set(rows[0].keys()) static = keys.intersection(set(staticcols)) dynamiccols = keys ^ static dynamiccols = dynamiccols.difference(set(['longitude', 'latitude'])) dynamiccols = dynamiccols.union(set(['lon', 'lat'])) firstrow = False for track in _yieldsegments(rows, static, dynamiccols, decimate): yield track
[docs] def split_timedelta(tracks, maxdelta=timedelta(weeks=2)): ''' partitions tracks where delta time exceeds maxdelta args: tracks (aisdb.track_gen.TrackGen) track vectors generator maxdelta (datetime.timedelta) threshold at which tracks should be partitioned ''' mmsi_count = {} # Dictionary to keep track of MMSI indices for track in tracks: for rng in _segment_rng(track, maxdelta): assert len(rng) > 0 # Create the segmented track dictionary segmented_track = dict( **{k: track[k] for k in track['static']}, **{ k: np.array(track[k], dtype=type(track[k][0]))[rng] for k in track['dynamic'] }, static=track['static'], dynamic=track['dynamic'], ) # Handle MMSI indexing after segmentation mmsi_value = segmented_track.get("mmsi") if mmsi_value: if mmsi_value not in mmsi_count: mmsi_count[mmsi_value] = 0 else: mmsi_count[mmsi_value] += 1 # Modify the mmsi value to attach an index # segmented_track["mmsi"] = f"{mmsi_value}-{mmsi_count[mmsi_value]}" segmented_track["idx"] = mmsi_count[mmsi_value] # Yield the segmented track with modified mmsi yield segmented_track
[docs] def split_tracks(tracks, max_distance=25000, max_time=timedelta(hours=24), max_speed=50, min_speed=0.2, min_segment_length=15, min_direction_change=45): """ Segments AIS tracks based on multiple criteria such as course changes, speed, distance, and time gaps. Args: tracks (aisdb.track_gen.TrackGen): track vectors generator max_distance (float): Maximum allowable distance (meters) between points in a segment. max_time (timedelta): Maximum allowable time difference between points in a segment. max_speed (float): Maximum allowable speed (knots). min_speed (float): Minimum allowable speed (knots). min_segment_length (int): Minimum number of points required in a segment. min_direction_change (float): Minimum course change (degrees) to start a new segment. """ mmsi_count = {} # Dictionary to keep track of MMSI indices for track in tracks: for rng in _segment_rng_all(track, max_distance, max_time, max_speed, min_speed, min_segment_length, min_direction_change): assert len(rng) > 0 # Create the segmented track dictionary segmented_track = dict( **{k: track[k] for k in track['static']}, **{ k: np.array(track[k], dtype=type(track[k][0]))[rng] for k in track['dynamic'] }, static=track['static'], dynamic=track['dynamic'], ) # Handle MMSI indexing after segmentation mmsi_value = segmented_track.get("mmsi") if mmsi_value: if mmsi_value not in mmsi_count: mmsi_count[mmsi_value] = 0 else: mmsi_count[mmsi_value] += 1 # Modify the mmsi value to attach an index segmented_track["mmsi"] = f"{mmsi_value}-{mmsi_count[mmsi_value]}" # Yield the segmented track with modified mmsi yield segmented_track
[docs] def fence_tracks(tracks, domain): ''' compute points-in-polygons for vessel positions within domain polygons yields track dictionaries Also see zone_mask() ''' assert isinstance(domain, Domain), 'Not a domain object' for track in tracks: assert isinstance(track, dict) if 'in_zone' not in track.keys(): track['in_zone'] = np.array( [ domain.point_in_polygon(x, y) for x, y in zip(track['lon'], track['lat']) ], dtype=object, ) track['dynamic'] = set(track['dynamic']).union(set(['in_zone'])) yield track
[docs] def zone_mask(tracks, domain): ''' compute points-in-polygons for track positions, and filter results to positions within domain. yields track dictionaries. also see fence_tracks() ''' for track in fence_tracks(tracks, domain): mask = track['in_zone'] != 'Z0' yield dict( **{k: track[k] for k in track['static']}, **{k: track[k][mask] for k in track['dynamic']}, static=track['static'], dynamic=track['dynamic'], )
[docs] def min_speed_filter(tracks, minspeed): for track in tracks: if len(track['time']) == 1: yield track continue deltas = delta_knots(track) deltas = np.append(deltas, [deltas[-1]]) mask = deltas >= minspeed yield dict( **{k: track[k] for k in track['static']}, **{k: track[k][mask] for k in track['dynamic']}, static=track['static'], dynamic=track['dynamic'], )
[docs] def min_track_length_filter(tracks, min_length=300): for track in tracks: if len(track['time']) >= min_length: yield track