import asyncio
import http.server
import logging
import multiprocessing
import os
import socketserver
import webbrowser
from datetime import datetime
from functools import partial
from tempfile import SpooledTemporaryFile
import orjson
import websockets.server
logging.getLogger("websockets").setLevel(logging.WARNING)
logging.getLogger("shapely").setLevel(logging.WARNING)
wwwpath = os.path.abspath(
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'aisdb_web',
'dist_map'))
wwwpath_visualearth = os.path.abspath(
os.path.join(os.path.dirname(os.path.dirname(__file__)), 'aisdb_web',
'dist_map_bingmaps'))
def _start_webclient(visualearth=False):
if not visualearth:
path = wwwpath
else:
path = wwwpath_visualearth
class AISDB_HTML(http.server.SimpleHTTPRequestHandler):
extensions_map = {
'': 'application/octet-stream',
'.css': 'text/css',
'.html': 'text/html',
'.jpg': 'image/jpg',
'.js': 'application/x-javascript',
'.json': 'application/json',
'.png': 'image/png',
'.wasm': 'application/wasm',
}
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=path, **kwargs)
socketserver.TCPServer.allow_reuse_address = True
with socketserver.TCPServer(("localhost", 3000), AISDB_HTML) as httpd:
try:
print('Serving HTTP assets on localhost:3000')
httpd.serve_forever()
except KeyboardInterrupt:
httpd.server_close()
httpd.shutdown()
except Exception as e:
httpd.server_close()
httpd.shutdown()
raise e
[docs]
def serialize_zone_json(name, zone) -> bytes:
zone_dict = {
'msgtype': 'zone',
'meta': {
'name': name
},
'x': tuple(zone['geometry'].boundary.xy[0]),
'y': tuple(zone['geometry'].boundary.xy[1]),
't': [],
}
return orjson.dumps(zone_dict)
[docs]
def serialize_track_json(track) -> (bytes, bytes):
''' serializes a single track dictionary to JSON format encoded as UTF8 '''
vector = {
'msgtype': 'track_vector',
# currently, database_server sends all metadata as strings
# reproduce this behaviour by coercion to string type, even for int
'meta': {
'mmsi': str(track['mmsi'])
},
't': track['time'],
'x': track['lon'],
'y': track['lat'],
}
meta = {k: track[k] for k in track['static'] if k != 'marinetraffic_info'}
if 'color' in track.keys():
meta['color'] = track['color']
meta['msgtype'] = 'vesselinfo'
if 'marinetraffic_info' in track.keys():
meta.update({
k: track['marinetraffic_info'][k]
for k in track['marinetraffic_info'].keys()
})
vector_json = orjson.dumps(vector, option=orjson.OPT_SERIALIZE_NUMPY)
meta_json = orjson.dumps(meta)
return (vector_json, meta_json)
async def _send_tracks(websocket, tmp_vectors, tmp_meta, domain=None):
''' send tracks serialized as JSON to the connected websocket client '''
done = {}
async for message_json in websocket:
message = orjson.loads(message_json)
print(
f'{websocket.remote_address[0]}:{websocket.remote_address[1]} - received: {message}'
)
if message == {"msgtype": "validrange"}:
now = datetime.now().timestamp()
validrange = {"msgtype": "validrange", "start": now, "end": now}
await websocket.send(orjson.dumps(validrange))
done['validrange'] = True
elif message == {"msgtype": "zones"}:
await websocket.send(b'{"msgtype": "doneZones"}')
done['zones'] = True
elif message == {"msgtype": "meta"}:
done['meta'] = True
else:
raise RuntimeError(f'unknown request {message_json}')
if 'validrange' in done.keys() and 'zones' in done.keys():
assert len(done.keys()) == 2
if domain is not None:
for name, zone in domain.zones.items():
zone_json = serialize_zone_json(name, zone)
await websocket.send(zone_json)
tmp_vectors.seek(0)
for vector_json in tmp_vectors:
await websocket.send(vector_json)
elif 'meta' in done.keys():
assert len(done.keys()) == 1
tmp_meta.seek(0)
for meta_json in tmp_meta:
await websocket.send(meta_json)
async def _start_webserver(tracks,
domain=None,
visualearth=False,
open_browser=True):
''' Display tracks in the web interface. Serves data to the web client '''
print('Querying database...', end='\t')
with SpooledTemporaryFile(max_size=1024 * 1e6, newline=b'\n') as vectors, \
SpooledTemporaryFile(max_size=256 * 1e6, newline=b'\n') as meta:
for vector, info in map(serialize_track_json, tracks):
vectors.write(vector)
vectors.write(b'\n')
meta.write(info)
meta.write(b'\n')
print('done query')
if open_browser:
print('Opening a new browser window to display track data. '
'Press Ctrl-C to stop the server and close the webpage')
tag = 1 if not visualearth else 2
url = f'http://localhost:3000/index.html?python={tag}&z=2'
if not webbrowser.open_new_tab(url):
print(f'Failed to open webbrowser, instead use URL: {url}')
fcn = partial(_send_tracks,
tmp_vectors=vectors,
tmp_meta=meta,
domain=domain)
async with websockets.server.serve(fcn, 'localhost', 9924) as server:
stop = asyncio.Future()
await stop
await server
[docs]
def visualize(tracks, domain=None, visualearth=False, open_browser=True):
''' Display tracks using the web interface.
Starts the web client HTTP server in a separate process, and
serves track data via websocket on port 9924.
If a domain object is given, zone polygons will be drawn on the map
from domain zone geometries.
If visualearth is True, microsoft visual earth map tiles will be used
for the map background.
If open_browser is True, python will attempt to open the web application
in a new tab using the default browser.
To customize the color of each vessel track, set the 'color' value to
a color string or RGB value string:
>>> def color_tracks(tracks):
... for track in tracks:
... track['color'] = 'red' or 'rgb(255,0,0)'
... yield track
...
>>> tracks = [
... {'mmsi': 204242000, 'lon': [-8.931666], 'lat':[41.45], 'time': [1625176725]},
... {'mmsi': 204814000, 'lon': [-25.668333], 'lat': [37.736668], 'time': [1625147353]},
... ]
>>> tracks_colored = color_tracks(tracks)
'''
proc = multiprocessing.Process(target=_start_webclient, args=[visualearth])
proc.start()
try:
asyncio.run(_start_webserver(tracks, domain, visualearth,
open_browser))
proc.join()
except KeyboardInterrupt:
print('Received KeyboardInterrupt, stopping server...')
proc.terminate()
except Exception as err:
proc.terminate()
raise err