Source code for aisdb.denoising_encoder

import os
import warnings
from functools import reduce
import numpy as np
from aisdb.aisdb import encoder_score_fcn
from aisdb.gis import delta_knots, delta_meters
from aisdb.webdata.shore_dist import download_unzip

import geopandas as gpd
from shapely.geometry import Point, MultiPoint
from shapely import prepare
import pickle

def _score_idx(scores):
    ''' Returns indices of score array where value at index is equal to the
        highest score. In tie cases, the last index will be selected
    '''
    assert len(scores) > 0
    return np.where(scores == np.max(scores))[0][-1]


def _segments_idx(track, distance_threshold, speed_threshold, **_):
    segments_idx1 = reduce(
        np.append, ([0], np.where(delta_knots(track) > speed_threshold)[0] + 1,
                    [track['time'].size]))
    segments_idx2 = reduce(
        np.append,
        ([0], np.where(delta_meters(track) > distance_threshold)[0] + 1,
         [track['time'].size]))

    return reduce(np.union1d, (segments_idx1, segments_idx2))


def _scoresarray(track, *, pathways, i, segments_idx, distance_threshold,
                 speed_threshold, minscore):
    scores = np.array(
        [
            encoder_score_fcn(
                x1=pathway['lon'][-1],
                y1=pathway['lat'][-1],
                t1=pathway['time'][-1],
                x2=track['lon'][segments_idx[i]],
                y2=track['lat'][segments_idx[i]],
                t2=track['time'][segments_idx[i]],
                dist_thresh=distance_threshold,
                speed_thresh=speed_threshold,
            ) for pathway in pathways
        ],
        dtype=np.float32,
    )
    highscore = (scores[np.where(
        scores == np.max(scores))[0][0]] if scores.size > 0 else minscore)
    return scores, highscore


def _append_highscore(track, *, highscoreidx, pathways, i, segments_idx):
    return dict(
        **{k: track[k]
           for k in track['static']},
        **{
            k:
                np.append(pathways[highscoreidx][k],
                          track[k][segments_idx[i]:segments_idx[i + 1]])
            for k in track['dynamic']
        },
        static=track['static'],
        dynamic=track['dynamic'],
    )


def _split_pathway(track, *, i, segments_idx):
    path = dict(
        **{k: track[k]
           for k in track['static']},
        **{
            k: track[k][segments_idx[i]:segments_idx[i + 1]]
            for k in track['dynamic']
        },
        static=track['static'],
        dynamic=track['dynamic'],
    )
    return path


[docs] def encode_score(track, distance_threshold, speed_threshold, minscore): ''' Encodes likelihood of persistent track membership when given distance, speed, and score thresholds, using track speed deltas computed using distance computed by haversine function divided by elapsed time A higher distance threshold will increase the maximum distance in meters allowed between pings for same trajectory membership. A higher speed threshold will allow vessels travelling up to this value in knots to be kconsidered for persistent track membership. The minscore assigns a minimum score needed to be considered for membership, typically 0 or very close to 0 such as 1e-5. For example: a vessel travelling at a lower speed with short intervals between pings will have a higher likelihood of persistence. A trajectory with higher average speed or long intervals between pings may indicate two separate trajectories and will be segmented forming alternate trajectories according to highest likelihood of membership. ''' assert 'time' in track.keys() assert len(track['time']) > 0 params = dict(distance_threshold=distance_threshold, speed_threshold=speed_threshold, minscore=minscore) segments_idx = _segments_idx(track, **params) pathways = [] warned = False for i in range(segments_idx.size - 1): if len(pathways) == 0: path = _split_pathway(track, i=i, segments_idx=segments_idx) assert path is not None pathways.append(path) continue elif not warned and len(pathways) > 100: warnings.warn( f'excessive number of pathways! mmsi={track["mmsi"]}') warned = True assert len(track['time']) > 0, f'{track=}' scores, highscore = _scoresarray(track, pathways=pathways, i=i, segments_idx=segments_idx, **params) assert len(scores) > 0, f'{track}' if (highscore >= minscore): highscoreidx = _score_idx(scores) pathways[highscoreidx] = _append_highscore( track, highscoreidx=highscoreidx, pathways=pathways, i=i, segments_idx=segments_idx) else: path = _split_pathway(track, i=i, segments_idx=segments_idx) assert path is not None pathways.append(path.copy()) for pathway, label in zip(pathways, range(len(pathways))): pathway['label'] = label pathway['static'] = set(pathway['static']).union({'label'}) assert 'label' in pathway.keys() assert 'time' in pathway.keys(), f'{pathway=}' yield pathway
[docs] def encode_greatcircledistance( tracks, *, distance_threshold, speed_threshold=50, minscore=1e-6, ): ''' Partitions tracks where delta speeds exceed speed_threshold or delta_meters exceeds distance_threshold. concatenates track segments with the highest likelihood of being sequential, as encoded by the encode_score function args: tracks (aisdb.track_gen.TrackGen) track vectors generator distance_threshold (int) distance in meters that will be used as a speed score numerator speed_threshold (float) maximum speed in knots that should be considered a continuous trajectory minscore (float) minimum score threshold at which to allow track segments to be linked. Value range: (0, 1). A minscore closer to 0 will be less restrictive towards trajectory grouping. A reasonable value for this is 1e-6. This score is computed by the function :func:`aisdb.denoising_encoder.encode_score` >>> import os >>> from datetime import datetime, timedelta >>> from aisdb import SQLiteDBConn, DBQuery, TrackGen >>> from aisdb import decode_msgs, encode_greatcircledistance, sqlfcn_callbacks >>> # create example database file >>> dbpath = 'encoder_test.db' >>> filepaths = ['aisdb/tests/testdata/test_data_20210701.csv', ... 'aisdb/tests/testdata/test_data_20211101.nm4'] >>> with SQLiteDBConn(dbpath) as dbconn: ... decode_msgs(filepaths=filepaths, dbconn=dbconn, ... source='TESTING', verbose=False) >>> with SQLiteDBConn(dbpath) as dbconn: ... q = DBQuery(callback=sqlfcn_callbacks.in_timerange_validmmsi, ... dbconn=dbconn, ... start=datetime(2021, 7, 1), ... end=datetime(2021, 7, 7)) ... tracks = TrackGen(q.gen_qry(), decimate=True) ... for track in encode_greatcircledistance( ... tracks, ... distance_threshold=250000, # metres ... speed_threshold=50, # knots ... minscore=0, ... ): ... print(track['mmsi']) ... print(track['lon'], track['lat']) ... break ''' ''' >>> os.remove(dbpath) ''' for track in tracks: assert isinstance(track, dict), f'got {type(track)} {track}' for path in encode_score(track, distance_threshold, speed_threshold, minscore): yield path
[docs] def remove_pings_wrt_speed(tracks, speed_threshold): ''' Remove pings from tracks where the speed of a vessel is lesser than equal to speed_threshold. In most cases, the archored vessel tracks are removed through this technique args: tracks (aisdb.track_gen.TrackGen) track vectors generator return generator ''' tr1_ = next(tracks) columns_ = [ky for ky, vall in tr1_.items() if isinstance(vall, np.ndarray)] def update_dict_(tr_): indexes_ = np.where(tr_['sog'] <= speed_threshold) if len(tr_['sog']) != len(indexes_[0]): if len(indexes_[0]) > 0: for col in columns_: tr_[col] = np.delete(tr_[col], indexes_) yield tr_ yield from update_dict_(tr1_) for tr__ in tracks: assert isinstance(tr__, dict), f'got {type(tr__)} {tr__}' yield from update_dict_(tr__)
[docs] class InlandDenoising: data_url = "http://bigdata5.research.cs.dal.ca/geo_land_water_NorthAmerica.7z" def __init__(self, data_dir, land_cache='land.pkl', water_cache='water.pkl'): download_unzip(self.data_url, data_dir, bytesize=337401807) self.land_path = os.path.join(data_dir, land_cache) self.water_path = os.path.join(data_dir, water_cache) assert os.path.isfile(self.land_path), f"Land file not found at {self.land_path}" assert os.path.isfile(self.water_path), f"Water file not found at {self.water_path}" # Load geometries during initialization with open(self.land_path, 'rb') as f1, open(self.water_path, 'rb') as f2: self.land_geom = pickle.load(f1) self.water_geom = pickle.load(f2) prepare(self.land_geom) prepare(self.water_geom) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): # Cleanup if needed pass
[docs] def filter_noisy_points(self, tracks: iter) -> dict: """ Filter points that fall in land but not in water features. """ print("Processing trajectories...") for i, traj in enumerate(tracks): # Create points for batch processing try: points = MultiPoint([(lon, lat) for lon, lat in zip(traj['lon'], traj['lat'])]) except Exception as e: print(f"Error creating MultiPoint at trajectory {i}: {e}") continue # Find points in land but not in water try: if hasattr(points, 'geoms'): noisy_mask = [ self.land_geom.contains(point) and not self.water_geom.contains(point) for point in points.geoms ] else: noisy_mask = [self.land_geom.contains(points) and not self.water_geom.contains(points)] except Exception as e: print(f"Error checking points at trajectory {i}: {e}") continue noisy_indices = np.where(noisy_mask)[0] # Create boolean mask for clean points clean_mask = np.ones(len(traj['time']), dtype=bool) clean_mask[noisy_indices] = False # Create cleaned trajectory cleaned_traj = dict( **{k: traj[k] for k in traj['static']}, **{k: traj[k][clean_mask] for k in traj['dynamic']}, static=traj['static'], dynamic=traj['dynamic'] ) if len(cleaned_traj['time']) > 0: yield cleaned_traj